Skip to main content
Version: 2.x

Creating Sinks

The zio.stream provides numerous kinds of sinks to use.

Common Constructors

ZSink.head — It creates a sink containing the first element, returns None for empty streams:

val sink: ZSink[Any, Nothing, Int, Int, Option[Int]] = ZSink.head[Int]
val head: ZIO[Any, Nothing, Option[Int]] = ZStream(1, 2, 3, 4).run(sink)
// Result: Some(1)

ZSink.last — It consumes all elements of a stream and returns the last element of the stream:

val sink: ZSink[Any, Nothing, Int, Int, Option[Int]] = ZSink.last[Int]
val last: ZIO[Any, Nothing, Option[Int]] = ZStream(1, 2, 3, 4).run(sink)
// Result: Some(4)

ZSink.count — A sink that consumes all elements of the stream and counts the number of elements fed to it:

val sink : ZSink[Any, Nothing, Int, Nothing, Long] = ZSink.count
val count: ZIO[Any, Nothing, Long] = ZStream(1, 2, 3, 4, 5).run(sink)
// Result: 5

ZSink.sum — A sink that consumes all elements of the stream and sums incoming numeric values:

val sink : ZSink[Any, Nothing, Int, Nothing, Int] = ZSink.sum[Int]
val sum: ZIO[Any, Nothing, Int] = ZStream(1, 2, 3, 4, 5).run(sink)
// Result: 15

ZSink.take — A sink that takes the specified number of values and result in a Chunk data type:

val sink  : ZSink[Any, Nothing, Int, Int, Chunk[Int]] = ZSink.take[Int](3)
val stream: ZIO[Any, Nothing, Chunk[Int]] = ZStream(1, 2, 3, 4, 5).run(sink)
// Result: Chunk(1, 2, 3)

ZSink.drain — A sink that ignores its inputs:

val drain: ZSink[Any, Nothing, Any, Nothing, Unit] = ZSink.drain

ZSink.timed — A sink that executes the stream and times its execution:

val timed: ZSink[Any, Nothing, Any, Nothing, Duration] = ZSink.timed
val stream: ZIO[Any, Nothing, Long] =
ZStream(1, 2, 3, 4, 5)
.schedule(Schedule.fixed(2.seconds))
.run(timed)
.map(_.getSeconds)
// Result: 10

ZSink.foreach — A sink that executes the provided effectful function for every element fed to it:

val printer: ZSink[Any, IOException, Int, Int, Unit] =
ZSink.foreach((i: Int) => printLine(i))
val stream : ZIO[Any, IOException, Unit] =
ZStream(1, 2, 3, 4, 5).run(printer)

From Success and Failure

Similar to the ZStream data type, we can create a ZSink using fail and succeed methods.

A sink that doesn't consume any element from its upstream and succeeds with a value of Int type:

val succeed: ZSink[Any, Any, Any, Nothing, Int] = ZSink.succeed(5)

A sink that doesn't consume any element from its upstream and intentionally fails with a message of String type:

val failed : ZSink[Any, String, Any, Nothing, Nothing] = ZSink.fail("fail!")

Collecting

To create a sink that collects all elements of a stream into a Chunk[A], we can use ZSink.collectAll:

val stream    : UStream[Int]    = ZStream(1, 2, 3, 4, 5)
val collection: UIO[Chunk[Int]] = stream.run(ZSink.collectAll[Int])
// Output: Chunk(1, 2, 3, 4, 5)

We can collect all elements into a Set:

val collectAllToSet: ZSink[Any, Nothing, Int, Nothing, Set[Int]] = ZSink.collectAllToSet[Int]
val stream: ZIO[Any, Nothing, Set[Int]] = ZStream(1, 3, 2, 3, 1, 5, 1).run(collectAllToSet)
// Output: Set(1, 3, 2, 5)

Or we can collect and merge them into a Map[K, A] using a merge function. In the following example, we use (_:Int) % 3 to determine map keys and, we provide _ + _ function to merge multiple elements with the same key:

val collectAllToMap: ZSink[Any, Nothing, Int, Nothing, Map[Int, Int]] = ZSink.collectAllToMap((_: Int) % 3)(_ + _)
val stream: ZIO[Any, Nothing, Map[Int, Int]] = ZStream(1, 3, 2, 3, 1, 5, 1).run(collectAllToMap)
// Output: Map(1 -> 3, 0 -> 6, 2 -> 7)

ZSink.collectAllN — Collects incoming values into chunk of maximum size of n:

ZStream(1, 2, 3, 4, 5).run(
ZSink.collectAllN(3)
)
// Output: Chunk(1,2,3), Chunk(4,5)

ZSink.collectAllWhile — Accumulates incoming elements into a chunk as long as they verify the given predicate:

ZStream(1, 2, 0, 4, 0, 6, 7).run(
ZSink.collectAllWhile(_ != 0)
)
// Output: Chunk(1,2), Chunk(4), Chunk(6,7)

ZSink.collectAllToMapN — Creates a sink accumulating incoming values into maps of up to n keys. Elements are mapped to keys using the function key; elements mapped to the same key will be merged with the function f:

object ZSink {
def collectAllToMapN[Err, In, K](
n: Long
)(key: In => K)(f: (In, In) => In): ZSink[Any, Err, In, Err, In, Map[K, In]]
}

Let's do an example:

ZStream(1, 2, 0, 4, 5).run(
ZSink.collectAllToMapN[Nothing, Int, Int](10)(_ % 3)(_ + _)
)
// Output: Map(1 -> 5, 2 -> 7, 0 -> 0)

ZSink.collectAllToSetN — Creates a sink accumulating incoming values into sets of maximum size n:

ZStream(1, 2, 1, 2, 1, 3, 0, 5, 0, 2).run(
ZSink.collectAllToSetN(3)
)
// Output: Set(1,2,3), Set(0,5,2), Set(1)

Folding

Basic fold accumulation of received elements:

ZSink.foldLeft[Int, Int](0)(_ + _)

A fold with short-circuiting has a termination predicate that determines the end of the folding process:

ZStream.iterate(0)(_ + 1).run(
ZSink.fold(0)(sum => sum <= 10)((acc, n: Int) => acc + n)
)
// Output: 15

ZSink.foldWeighted — Creates a sink that folds incoming elements until reaches the max worth of elements determined by the costFn, then the pipeline emits the computed value and restarts the folding process:

object ZSink {
def foldWeighted[Err, In, S](z: S)(costFn: (S, In) => Long, max: Long)(
f: (S, In) => S
): ZSink[Any, Err, In, Err, In, S] = ???
}

In the following example, each time we consume a new element we return one as the weight of that element using cost function. After three times, the sum of the weights reaches to the max number, and the folding process restarted. So we expect this pipeline to group each three elements in one Chunk:

ZStream(3, 2, 4, 1, 5, 6, 2, 1, 3, 5, 6)
.transduce(
ZSink
.foldWeighted(Chunk[Int]())(
(_, _: Int) => 1,
3
) { (acc, el) =>
acc ++ Chunk(el)
}
)
// Output: Chunk(3,2,4),Chunk(1,5,6),Chunk(2,1,3),Chunk(5,6)

Another example is when we want to group element which sum of them equal or less than a specific number:

ZStream(1, 2, 2, 4, 2, 1, 1, 1, 0, 2, 1, 2)
.transduce(
ZSink
.foldWeighted(Chunk[Int]())(
(_, i: Int) => i.toLong,
5
) { (acc, el) =>
acc ++ Chunk(el)
}
)
// Output: Chunk(1,2,2),Chunk(4),Chunk(2,1,1,1,0),Chunk(2,1,2)
caution

The ZSink.foldWeighted cannot decompose elements whose weight is more than the max number. So elements that have an individual cost larger than max will force the pipeline to cross the max cost. In the last example, if the source stream was ZStream(1, 2, 2, 4, 2, 1, 6, 1, 0, 2, 1, 2) the output would be Chunk(1,2,2),Chunk(4),Chunk(2,1),Chunk(6),Chunk(1,0,2,1),Chunk(2). As we see, the 6 element crossed the max cost.

To decompose these elements, we should use ZSink.foldWeightedDecompose function.

ZSink.foldWeightedDecompose — As we saw in the previous section, we need a way to decompose elements — whose cause the output aggregate cross the max — into smaller elements. This version of fold takes decompose function and enables us to do that:

object ZSink {
def foldWeightedDecompose[Err, In, S](
z: S
)(costFn: (S, In) => Long, max: Long, decompose: In => Chunk[In])(
f: (S, In) => S
): ZSink[Any, Err, In, Err, In, S] = ???
}

In the following example, we are break down elements that are bigger than 5, using decompose function:

ZStream(1, 2, 2, 2, 1, 6, 1, 7, 2, 1, 2)
.transduce(
ZSink
.foldWeightedDecompose(Chunk[Int]())(
(_, i: Int) => i.toLong,
5,
(i: Int) =>
if (i > 5) Chunk(i - 1, 1) else Chunk(i)
)((acc, el) => acc ++ Chunk.succeed(el))
)
// Ouput: Chunk(1,2,2),Chunk(2,1),Chunk(5),Chunk(1,1),Chunk(5),Chunk(1,1,2,1),Chunk(2)

ZSink.foldUntil — Creates a sink that folds incoming element until specific max elements have been folded:

ZStream(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.run(ZSink.foldUntil(0, 3)(_ + _))
// Output: 6, 15, 24, 10

ZSink.foldLeft — This sink will fold the inputs until the stream ends, resulting in one element:

val stream: ZIO[Any, Nothing, Int] = 
ZStream(1, 2, 3, 4).run(ZSink.foldLeft[Int, Int](0)(_ + _))
// Output: 10

From ZIO

The ZSink.fromZIO creates a single-value sink produced from a ZIO workflow:

val sink = ZSink.fromZIO(ZIO.succeed(1))

From File

The ZSink.fromPath creates a file sink that consumes byte chunks and writes them to the specified file:

def fileSink(path: Path): ZSink[Any, Throwable, String, Byte, Long] =
ZSink
.fromPath(path)
.contramapChunks[String](_.flatMap(_.getBytes))

val result = ZStream("Hello", "ZIO", "World!")
.intersperse("\n")
.run(fileSink(Paths.get("file.txt")))

From OutputStream

The ZSink.fromOutputStream creates a sink that consumes byte chunks and write them to the OutputStream:

ZStream("Application", "Error", "Logs")
.intersperse("\n")
.run(
ZSink
.fromOutputStream(java.lang.System.err)
.contramapChunks[String](_.flatMap(_.getBytes))
)

From Queue

A queue has a finite or infinite buffer size, so they are useful in situations where we need to consume streams as fast as we can, and then do some batching operations on consumed messages. By using ZSink.fromQueue we can create a sink that is backed by a queue; it enqueues each element into the specified queue:

val myApp: IO[IOException, Unit] =
for {
queue <- Queue.bounded[Int](32)
producer <- ZStream
.iterate(1)(_ + 1)
.schedule(Schedule.fixed(200.millis))
.run(ZSink.fromQueue(queue))
.fork
consumer <- queue.take.flatMap(printLine(_)).forever
_ <- producer.zip(consumer).join
} yield ()

From Hub

Hub is an asynchronous data type in which publisher can publish their messages to that and subscribers can subscribe to take messages from the Hub. The ZSink.fromHub takes a Hub and returns a ZSink which publishes each element to that Hub.

In the following example, the sink consumes elements of the producer stream and publishes them to the hub. We have two consumers that are subscribed to that hub and they are taking its elements forever:

val myApp: ZIO[Any, IOException, Unit] =
for {
promise <- Promise.make[Nothing, Unit]
hub <- Hub.bounded[Int](1)
sink <- ZIO.succeed(ZSink.fromHub(hub))
producer <- ZStream
.iterate(0)(_ + 1)
.schedule(Schedule.fixed(1.seconds))
.run(sink)
.fork
consumers <- ZIO.scoped {
hub.subscribe.zip(hub.subscribe).flatMap { case (left, right) =>
for {
_ <- promise.succeed(())
f1 <- left.take.flatMap(e => printLine(s"Left Queue: $e")).forever.fork
f2 <- right.take.flatMap(e => printLine(s"Right Queue: $e")).forever.fork
_ <- f1.zip(f2).join
} yield ()
}
}.fork
_ <- promise.await
_ <- producer.zip(consumers).join
} yield ()