ZTransducer
Introduction
A ZTransducer[R, E, I, O]
is a stream transformer. Transducers accept a stream as input, and return the transformed stream as output.
ZTransducers can be thought of as a recipe for calling a bunch of methods on a source stream, to yield a new (transformed) stream. A nice mental model is the following type alias:
type ZTransducer[Env, Err, In, Out] = ZStream[Env, Err, In] => ZStream[Env, Err, Out]
There is no fundamental requirement for transducers to exist, because everything transducers do can be done directly on a stream. However, because transducers separate the stream transformation from the source stream itself, it becomes possible to abstract over stream transformations at the level of values, creating, storing, and passing around reusable transformation pipelines that can be applied to many different streams.
Creation
From Effect
The ZTransducer.fromEffect
creates a transducer that always evaluates the specified effect. Let's write a transducer that fails with a message:
val error: ZTransducer[Any, String, Any, Nothing] = ZTransducer.fromEffect(IO.fail("Ouch"))
From Function
By using ZTransducer.fromFunction
we convert a function into a transducer. Let's create a transducer which converts a stream of strings into a stream of characters:
val chars: ZTransducer[Any, Nothing, String, Char] =
ZTransducer
.fromFunction[String, Chunk[Char]](s => Chunk.fromArray(s.toArray))
.mapChunks(_.flatten)
There is also a ZTransducer.fromFunctionM
which is an effecful version of this constructor.
Built-in Transducers
Identity
The identity transducer passes elements through without any modification:
ZStream(1,2,3).transduce(ZTransducer.identity)
// Ouput: 1, 2, 3
head and last
The ZTransducer.head
and ZTransducer.last
are two transducers that return the first and last element of a stream:
val stream: UStream[Int] = ZStream(1, 2, 3, 4)
val head: UStream[Option[Int]] = stream.transduce(ZTransducer.head)
val last: UStream[Option[Int]] = stream.transduce(ZTransducer.last)
Splitting
ZTransducer.splitOn — A transducer that splits strings on a delimiter:
ZStream("1-2-3", "4-5", "6", "7-8-9-10")
.transduce(ZTransducer.splitOn("-"))
.map(_.toInt)
// Ouput: 1, 2, 3, 4, 5, 6, 7, 8, 9 10
ZTransducer.splitLines — A transducer that splits strings on newlines. Handles both Windows newlines (\r\n
) and UNIX newlines (\n
):
ZStream("This is the first line.\nSecond line.\nAnd the last line.")
.transduce(ZTransducer.splitLines)
// Output: "This is the first line.", "Second line.", "And the last line."
ZTransducer.splitOnChunk — A transducer that splits elements on a delimiter and transforms the splits into desired output:
ZStream(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.transduce(ZTransducer.splitOnChunk(Chunk(4, 5, 6)))
// Output: Chunk(1, 2, 3), Chunk(7, 8, 9, 10)
Dropping
ZTransducer.dropWhile — Creates a transducer that starts consuming values as soon as one fails the given predicate:
ZStream(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.transduce(ZTransducer.dropWhile(_ <= 5))
// Output: 6, 7, 8, 9, 10
The ZTransducer
also has dropWhileM
which takes an effectful predicate p: I => ZIO[R, E, Boolean]
.
Folding
ZTransducer.fold — Using ZTransudcer.fold
we can fold incoming elements until we reach the false predicate, then the transducer emits the computed value and restarts the folding process:
ZStream
.range(0, 8)
.transduce(
ZTransducer.fold[Int, Chunk[Int]](Chunk.empty)(_.length < 3)((s, i) =>
s ++ Chunk(i)
)
)
// Ouput: Chunk(0, 1, 2), Chunk(3, 4, 5), Chunk(6, 7)
Note that the ZTransducer.foldM
is like fold
, but it folds effectfully.
ZTransducer.foldWeighted — Creates a transducer that folds incoming elements until reaches the max
worth of elements determined by the costFn
, then the transducer emits the computed value and restarts the folding process:
object ZTransducer {
def foldWeighted[I, O](z: O)(costFn: (O, I) => Long, max: Long)(
f: (O, I) => O
): ZTransducer[Any, Nothing, I, O] = ???
}
In the following example, each time we consume a new element we return one as the weight of that element using cost function. After three times, the sum of the weights reaches to the max
number, and the folding process restarted. So we expect this transducer to group each three elements in one Chunk
:
ZStream(3, 2, 4, 1, 5, 6, 2, 1, 3, 5, 6)
.aggregate(
ZTransducer
.foldWeighted(Chunk[Int]())(
(_, _: Int) => 1,
3
) { (acc, el) =>
acc ++ Chunk(el)
}
)
// Output: Chunk(3,2,4),Chunk(1,5,6),Chunk(2,1,3),Chunk(5,6)
Another example is when we want to group element which sum of them equal or less than a specific number:
ZStream(1, 2, 2, 4, 2, 1, 1, 1, 0, 2, 1, 2)
.aggregate(
ZTransducer
.foldWeighted(Chunk[Int]())(
(_, i: Int) => i.toLong,
5
) { (acc, el) =>
acc ++ Chunk(el)
}
)
// Output: Chunk(1,2,2),Chunk(4),Chunk(2,1,1,1,0),Chunk(2,1,2)
Note
The
ZTransducer.foldWeighted
cannot decompose elements whose weight is more than themax
number. So elements that have an individual cost larger thanmax
will force the transducer to cross themax
cost. In the last example, if the source stream wasZStream(1, 2, 2, 4, 2, 1, 6, 1, 0, 2, 1, 2)
the output would beChunk(1,2,2),Chunk(4),Chunk(2,1),Chunk(6),Chunk(1,0,2,1),Chunk(2)
. As we see, the6
element crossed themax
cost.To decompose these elements, we should use
ZTransducer.foldWeightedDecompose
function.
ZTransducer.foldWeightedDecompose — As we saw in the previous section, we need a way to decompose elements — whose cause the output aggregate cross the max
— into smaller elements. This version of fold takes decompose
function and enables us to do that:
object ZTransducer {
def foldWeightedDecompose[I, O](
z: O
)(costFn: (O, I) => Long, max: Long, decompose: I => Chunk[I])(
f: (O, I) => O
): ZTransducer[Any, Nothing, I, O] = ???
}
In the following example, we are break down elements that are bigger than 5, using decompose
function:
ZStream(1, 2, 2, 2, 1, 6, 1, 7, 2, 1, 2)
.aggregate(
ZTransducer
.foldWeightedDecompose(Chunk[Int]())(
(_, i: Int) => i.toLong,
5,
(i: Int) =>
if (i > 5) Chunk(i - 1, 1) else Chunk(i)
)((acc, el) => acc ++ Chunk.succeed(el))
)
// Ouput: Chunk(1,2,2),Chunk(2,1),Chunk(5),Chunk(1,1),Chunk(5),Chunk(1,1,2,1),Chunk(2)
ZTransducer.foldUntil — Creates a transducer that folds incoming element until specific max
elements have been folded:
ZStream(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.transduce(ZTransducer.foldUntil(0, 3)(_ + _))
// Output: 6, 15, 24, 10
ZTransducer.foldLeft — This transducer will fold the inputs until the stream ends, resulting in a stream with one element:
val stream: ZStream[Any, Nothing, Int] =
ZStream(1, 2, 3, 4).transduce(ZTransducer.foldLeft(0)(_ + _))
// Output: 10
Prepending
The ZTransducer.prepend
creates a transducer that emits the provided chunks before emitting any other values:
ZStream(2, 3, 4).transduce(
ZTransducer.prepend(Chunk(0, 1))
)
// Output: 0, 1, 2, 3, 4
Branching/Switching
The ZTransducer.branchAfter
takes n
as an input and creates a transducer that reads the first n
values from the stream and uses them to choose the transducer that will be used for the rest of the stream.
In the following example, we are prompting the user to enter a series of numbers. If the sum of the first three elements is less than 5, we continue to emit the remaining elements by using ZTransducer.identity
, otherwise, we retry prompting the user to enter another series of numbers:
ZStream
.fromEffect(
putStr("Enter numbers separated by comma: ") *> getStrLn
)
.mapConcat(_.split(","))
.map(_.trim.toInt)
.transduce(
ZTransducer.branchAfter(3) { elements =>
if (elements.sum < 5)
ZTransducer.identity
else
ZTransducer.fromEffect(
putStrLn(s"received elements are not applicable: $elements")
) >>> ZTransducer.fail("boom")
}
)
.retry(Schedule.forever)
Collecting
ZTransducer.collectAllN — Collects incoming values into chunk of maximum size of n
:
ZStream(1, 2, 3, 4, 5).transduce(
ZTransducer.collectAllN(3)
)
// Output: Chunk(1,2,3), Chunk(4,5)
ZTransducer.collectAllWhile — Accumulates incoming elements into a chunk as long as they verify the given predicate:
ZStream(1, 2, 0, 4, 0, 6, 7).transduce(
ZTransducer.collectAllWhile(_ != 0)
)
// Output: Chunk(1,2), Chunk(4), Chunk(6,7)
ZTransducer.collectAllToMapN — Creates a transducer accumulating incoming values into maps of up to n
keys. Elements are mapped to keys using the function key
; elements mapped to the same key will be merged with the function f
:
object ZTransducer {
def collectAllToMapN[K, I](n: Long)(key: I => K)(
f: (I, I) => I
): ZTransducer[Any, Nothing, I, Map[K, I]] = ???
}
Let's do an example:
ZStream(1, 2, 0, 4, 5).transduce(
ZTransducer.collectAllToMapN[Int, Int](10)(_ % 3)(_ + _)
)
// Output: Map(1 -> 5, 2 -> 7, 0 -> 0)
ZTransducer.collectAllToSetN — Creates a transducer accumulating incoming values into sets of maximum size n
:
ZStream(1, 2, 1, 2, 1, 3, 0, 5, 0, 2).transduce(
ZTransducer.collectAllToSetN(3)
)
// Output: Set(1,2,3), Set(0,5,2), Set(1)
Compression
ZTransducer.deflate — The deflate
transducer compresses a stream of bytes as specified by RFC 1951.
import zio.stream.ZStream
import zio.stream.Transducer.deflate
import zio.stream.compression.{CompressionLevel, CompressionStrategy, FlushMode}
def compressWithDeflate(clearText: ZStream[Any, Nothing, Byte]): ZStream[Any, Nothing, Byte] = {
val bufferSize: Int = 64 * 1024 // Internal buffer size. Few times bigger than upstream chunks should work well.
val noWrap: Boolean = false // For HTTP Content-Encoding should be false.
val level: CompressionLevel = CompressionLevel.DefaultCompression
val strategy: CompressionStrategy = CompressionStrategy.DefaultStrategy
val flushMode: FlushMode = FlushMode.NoFlush
clearText.transduce(deflate(bufferSize, noWrap, level, strategy, flushMode))
}
def deflateWithDefaultParameters(clearText: ZStream[Any, Nothing, Byte]): ZStream[Any, Nothing, Byte] =
clearText.transduce(deflate())
ZTransducer.gzip — The gzip
transducer compresses a stream of bytes as using gzip method:
import zio.stream.compression._
ZStream
.fromFile(Paths.get("file.txt"))
.transduce(
ZTransducer.gzip(
bufferSize = 64 * 1024,
level = CompressionLevel.DefaultCompression,
strategy = CompressionStrategy.DefaultStrategy,
flushMode = FlushMode.NoFlush
)
)
.run(
ZSink.fromFile(Paths.get("file.gz"))
)
Decompression
If we are reading Content-Encoding: deflate
, Content-Encoding: gzip
streams, or other such streams of compressed data, the following transducers can be helpful. Both decompression methods will fail with CompressionException
when input wasn't properly compressed:
ZTransducer.inflate — This transducer allows decompressing stream of deflated inputs, according to RFC 1951.
import zio.stream.ZStream
import zio.stream.Transducer.{ gunzip, inflate }
import zio.stream.compression.CompressionException
def decompressDeflated(deflated: ZStream[Any, Nothing, Byte]): ZStream[Any, CompressionException, Byte] = {
val bufferSize: Int = 64 * 1024 // Internal buffer size. Few times bigger than upstream chunks should work well.
val noWrap: Boolean = false // For HTTP Content-Encoding should be false.
deflated.transduce(inflate(bufferSize, noWrap))
}
ZTransducer.gunzip — This transducer can be used to decompress stream of gzipped inputs, according to RFC 1952:
import zio.stream.ZStream
import zio.stream.Transducer.{ gunzip, inflate }
import zio.stream.compression.CompressionException
def decompressGzipped(gzipped: ZStream[Any, Nothing, Byte]): ZStream[Any, CompressionException, Byte] = {
val bufferSize: Int = 64 * 1024 // Internal buffer size. Few times bigger than upstream chunks should work well.
gzipped.transduce(gunzip(bufferSize))
}
Decoders
ZIO stream has a wide variety of transducers to decode chunks of bytes into strings:
Decoder | Input | Output |
---|---|---|
ZTransducer.utfDecode | Unicode bytes | String |
ZTransducer.utf8Decode | UTF-8 bytes | String |
ZTransducer.utf16Decode | UTF-16 | String |
ZTransducer.utf16BEDecode | UTF-16BE bytes | String |
ZTransducer.utf16LEDecode | UTF-16LE bytes | String |
ZTransducer.utf32Decode | UTF-32 bytes | String |
ZTransducer.utf32BEDecode | UTF-32BE bytes | String |
ZTransducer.utf32LEDecode | UTF-32LE bytes | String |
ZTransducer.usASCIIDecode | US-ASCII bytes | String |
Operations
Filtering
Transducers have two types of filtering operations, the ZTransducer#filter
used for filtering outgoing elements and the ZTransducer#filterInput
is used for filtering incoming elements:
ZStream(1, -2, 0, 1, 3, -3, 4, 2, 0, 1, -3, 1, 1, 6)
.transduce(
ZTransducer
.collectAllN[Int](3)
.filterInput[Int](_ > 0)
.filter(_.sum > 5)
)
// Output: Chunk(4,2,1), Chunk(1,1,6)
Input Transformation (Mapping)
To transform the outputs of the transducer, we can use the ZTransducer#map
combinator for the success channel, and the ZTransducer#mapError
combinator for the failure channel. Also, the ZTransducer.mapChunks
takes a function of type Chunk[O] => Chunk[O2]
and transforms chunks emitted by the transducer.
Output Transformation (Contramap)
To transform the inputs of the transducer, we can use the ZTransducer#contramap
combinator. It takes a map function of type J => I
and convert a ZTransducer[R, E, I, O]
to ZTransducer[R, E, J, O]
:
class ZTransducer[-R, +E, -I, +O] {
final def contramap[J](f: J => I): ZTransducer[R, E, J, O] = ???
}
Let's create an integer parser transducer using ZTransducer.contramap
:
val numbers: ZStream[Any, Nothing, Int] =
ZStream("1-2-3-4-5")
.mapConcat(_.split("-"))
.transduce(
ZTransducer.identity[Int].contramap[String](_.toInt)
)
Composing
We can compose transducers in two ways:
- Composing Two Transducers — One transducer can be composed with another transducer, resulting in a composite transducer:
val lines: ZStream[Blocking, Throwable, String] =
ZStream
.fromFile(Paths.get("file.txt"))
.transduce(
ZTransducer.utf8Decode >>> ZTransducer.splitLines
)
- Composing ZTransducer with ZSink — One transducer can be composed with a sink, resulting in a sink that processes elements by piping them through the transducer and piping the results into the sink:
val refine: ZIO[Blocking, Throwable, Long] =
ZStream
.fromFile(Paths.get("file.txt"))
.run(
ZTransducer.utf8Decode >>> ZTransducer.splitLines.filter(_.contains('₿')) >>>
ZSink
.fromFile(Paths.get("file.refined.txt"))
.contramapChunks[String](
_.flatMap(line => (line + System.lineSeparator()).getBytes())
)
)