Skip to main content
Version: 2.0.x

Channel Operations

Piping

The values from the output port of the first channel are passed to the input port of the second channel when we pipe a channel to another channel:

import zio.stream._

(ZChannel.writeAll(1,2,3) >>> (ZChannel.read[Int] <*> ZChannel.read[Int])).runCollect.debug

Sequencing

In order to sequence channels, we can use the ZChannel#flatMap operator. When we use the flatMap operator, we have the ability to chain two channels together. After the first channel is finished, we can create a new channel based on the terminal value of the first channel:

import zio._
import zio.stream._

ZChannel
.fromZIO(
Console.readLine("Please enter a number: ").map(_.toInt)
)
.flatMap {
case n if n < 0 => ZChannel.fail("Number must be positive")
case n => ZChannel.writeAll((0 to n): _*)
}
.runCollect
.debug

Concatenating

Suppose there is a channel that creates a new channel for each element of the outer channel and emits them to the output port. We can use concatOut to concatenate all the inner channels into a single channel:

import zio.stream._

ZChannel
.writeAll("a", "b", "c")
.mapOut { l =>
ZChannel.writeAll((1 to 3).map(i => s"$l$i"):_*)
}
.concatOut
.runCollect
.debug

We can do the same with ZChannel.concatAll:

import zio.stream._

ZChannel
.concatAll(
ZChannel
.writeAll("a", "b", "c")
.mapOut { l =>
ZChannel.writeAll((1 to 3).map(i => s"$l$i"): _*)
}
)
.runCollect
.debug

Zipping

We have two categories of zip operators: ordinary zipXYZ operators which run sequentially, and parallel zipXYZ operators which run in parallel.

  1. zip/<*> operator:
import zio.stream._

val first = ZChannel.write(1,2,3) *> ZChannel.succeed("Done!")
val second = ZChannel.write(4,5,6) *> ZChannel.succeed("Bye!")

(first <*> second).runCollect.debug
// Output: (Chunk((1,2,3),(4,5,6)),(Done!,Bye!))
  1. zipRight/*> operator:
(first *> second).runCollect.debug
  1. zipLeft/<* operator:
(first <* second).runCollect.debug

Mapping

Mapping The Terminal Done Value (OutDone)

The ordinary map operator is used to map the done value of a channel:

import zio.stream._

ZChannel.writeAll(1, 2, 3).map(_ => 5).runCollect.debug

Mapping The Done Value of The Input Port (InDone)

To map the done value of the input port, we use the contramap operator:

import zio.stream._

(ZChannel.succeed("5") >>>
ZChannel
.readWith(
(i: Int) => ZChannel.write(ZChannel.write(i)),
(_: Any) => ZChannel.unit,
(d: Int) => ZChannel.succeed(d * 2)
)
.contramap[String](_.toInt)).runCollect.debug

Mapping The Error Value of The Output Port (OutErr)

To map the failure value of a channel, we use the mapError operator:

import zio._
import zio.stream._

val channel =
ZChannel
.fromZIO(Console.readLine("Please enter you name: "))
.mapError(_.toString)

Mapping The Output Elements of a Channel (OutElem)

To map the output elements of a channel, we use the mapOutput operator:

import zio.stream._

ZChannel.writeAll(1,2,3).mapOut(_ * 2).runCollect.debug

Mapping The Input Elements of a Channel (InElem)

To map the input elements of a channel, we use the contramapIn operator:

import zio.stream._

(ZChannel.write("123") >>> ZChannel.read[Int].contramapIn[String](_.toInt * 2)).runCollect.debug

Merging

Merge operators are used to merging multiple channels into a single channel. They are used to combine the output port of channels concurrently. Every time any of the channels produces a value, the output port of the resulting channel will produce a value.

Assume we have the following channel:

import zio._
import zio.stream._

def iterate(
from: Int,
to: Int
): ZChannel[Any, Any, Any, Any, Nothing, Int, Unit] =
if (from <= to)
ZChannel.write(from) *>
ZChannel.fromZIO(
Random
.nextLongBounded(1000)
.flatMap(delay => ZIO.sleep(Duration.fromMillis(delay)))
) *> iterate(from + 1, to)
else ZChannel.unit

Now let's merge some channels:

import zio._
import zio.stream._

ZChannel
.mergeAllUnbounded(
ZChannel.writeAll(
iterate(1, 3),
iterate(4, 6),
iterate(6, 9)
)
)
.mapOutZIO(i => Console.print(i + " "))
.runDrain

The ZChannel.mergeAllUnbounded uses the maximum buffer size, which is Int.MaxValue by default. This means that if we use this operator for long-running channels, which produce a lot of values, it can cause the program to run out of memory.

We have another operator called ZChannel.mergeAll, which allows us to specify the buffer size, the concurrency level, and also the strategy for merging the channels.

Note that if we want to merge channels sequentially, we can use the zip or flatMap operators:

import zio.stream._

(iterate(1, 3) <*> iterate(4, 6) <*> iterate(6, 9)).runCollect.debug
// Output: (Chunk(1,2,3,4,5,6,7,8,9),())

Collecting

  1. collectElements collects all the elements of the channel along with its done value as a tuple and returns a new channel with a terminal value of that tuple:
import zio.stream._

ZChannel.writeAll(1,2,3,4,5)
.collectElements
.runCollect
.debug
  1. emitCollect is like the collectElements operator, but it emits the result of the collection to the output port of the new channel:
import zio.stream._

ZChannel.writeAll(1,2,3,4,5)
.emitCollect
.runCollect
.debug

Converting

We can convert a channel to other data types using the ZChannel.toXYZ methods:

  • ZChannel#toStream
  • ZChannel#toPipeline
  • ZChannel#toSink
  • ZChannel#toPull
  • ZChannel#toQueue

concatMap

concatMap is a combination of two operators: mapping and concatenation. Using this operator, we can map every emitted element of a channel (outer channel) to a new channel (inner channels), and then concatenate all the inner channels into a single channel. The concatenation is done sequentially, so we use this operator when the order of the elements is important:

import zio.stream._

ZChannel
.writeAll("a", "b", "c")
.concatMap { l =>
def inner(from: Int, to: Int): ZChannel[Any, Any, Any, Any, Nothing, String, Unit] =
if (from <= to) ZChannel.write(s"$l$from") *> inner(from + 1, to)
else ZChannel.unit
inner(0, 5)
}
.runCollect
.debug

In the above example, we create a new channel for every element of the outer channel. The new inner channel is responsible for emitting from zero to five with the label of the outer channel. When an inner channel is done, it moves to the next inner channel sequentially. There is a similar operator called mergeMap that works in parallel and doesn't preserve the order of the elements.

mergeMap

mergeMap is a combination of two operators: mapping and merging. Using this operator, we can map every emitted element of a channel (outer channel) to a new channel (inner channel), and then run all the inner channels in parallel and merge them into a single channel. The merge operation is done in parallel, so we use this operator when the order of the elements is not important, and we want to process all inner channels in parallel:

import zio.stream._
import zio.stream.ZChannel._

ZChannel
.writeAll("a", "b", "c")
.mergeMap(8, 1, MergeStrategy.BackPressure) { l =>
def inner(
from: Int,
to: Int
): ZChannel[Any, Any, Any, Any, Nothing, String, Unit] =
if (from <= to) ZChannel.write(s"$l$from") *> inner(from + 1, to)
else ZChannel.unit
inner(0, 5)
}
.runCollect
.debug

collect

collect is a combination of two operations: filtering and mapping. Using this operator, we can filter the elements of a channel using a partial function, and then map the filtered elements:

import zio.stream._

ZChannel
.writeAll((1 to 10): _*)
.collect { case i if i % 3 == 0 => i * 2 }
.runCollect
.debug