Skip to main content
Version: 2.0.x

Sink Operations

Having created the sink, we can transform it with provided operations.

contramap

Contramap is a simple combinator to change the domain of an existing function. While map changes the co-domain of a function, the contramap changes the domain of a function. So the contramap takes a function and maps over its input.

This is useful when we have a fixed output, and our existing function cannot consume those outputs. So we can use contramap to create a new function that can consume that fixed output. Assume we have a ZSink.sum that sums incoming numeric values, but we have a ZStream of String values. We can convert the ZSink.sum to a sink that can consume String values;

import zio._
import zio.stream._

val numericSum: ZSink[Any, Nothing, Int, Nothing, Int] =
ZSink.sum[Int]
val stringSum : ZSink[Any, Nothing, String, Nothing, Int] =
numericSum.contramap((x: String) => x.toInt)

val sum: ZIO[Any, Nothing, Int] =
ZStream("1", "2", "3", "4", "5").run(stringSum)
// Output: 15

dimap

A dimap is an extended contramap that additionally transforms sink's output:

import zio._
import zio.stream._

// Convert its input to integers, do the computation and then convert them back to a string
val sumSink: ZSink[Any, Nothing, String, Nothing, String] =
numericSum.dimap[String, String](_.toInt, _.toString)

val sum: ZIO[Any, Nothing, String] =
ZStream("1", "2", "3", "4", "5").run(sumSink)
// Output: 15

Filtering

Sinks have ZSink#filterInput for filtering incoming elements:

import zio._
import zio.stream._

ZStream(1, -2, 0, 1, 3, -3, 4, 2, 0, 1, -3, 1, 1, 6)
.transduce(
ZSink
.collectAllN[Int](3)
.filterInput[Int](_ > 0)
)
// Output: Chunk(Chunk(1,1,3),Chunk(4,2,1),Chunk(1,1,6),Chunk())