Skip to main content
Version: 2.x

Avoiding chunk-breakers

A warning about mapZIO and other chunk-breakers​

ZIO streams are not just a simple sequence of elements. The elements are grouped into chunks, which makes many operations faster. Zio-kafka helps with this by guaranteeing that all records (for a given partition) fetched together from the broker are grouped into one chunk.

Be careful when using mapZIO, tap and some other stream operators that break the chunking structure of the stream (or more precisely, the resulting stream has chunks with a single element). The throughput may be significantly lower than with the original chunking structure intact.

Chunk-breaking operators can be found by looking at their scaladocs. Starting with zio-streams 2.1.17, these scaladocs contain the words This combinator destroys the chunking structure.

You can regain full throughput by processing all elements in a chunk together in one go. However, there is a catch: the order of processing changes. For example, given a stream with elements a and b in the same chunk, for stream.mapZIO(f).mapZIO(g) the evaluation order is f(a), g(a), f(b), g(b). For the alternatives listed below the evaluation order changes to f(a), f(b), g(a), g(b). Now imagine that g(a) fails, with mapZIO f(b) is not executed, but with the alternatives it is executed. It is up to you to decide if this is a problem.

Use mapZIOChunked​

Available since zio-streams 2.1.17.

The simplest alternative is the stream operator mapZIOChunked; it has the same signature as mapZIO .

def f(a: A): ZIO[R, E, B]

stream // ZStream[R, E, A]
.mapZIOChunked(f) // ZStream[R, E, B]

Use chunksWith​

If you have a single processing step that needs to work on a chunk, you can use chunksWith.

def f(a: A): ZIO[R, E, B]

stream // ZStream[R, E, A]
.chunksWith { chunkStream =>
chunkStream.mapZIO(chunk => ZIO.foreach(chunk)(f))
} // ZStream[R, E, B]

Expose chunking structure with chunks​

Use chunks when you have multiple processing steps that can all work on a chunk at a time. Since chunks exposes the chunking structure explicitly, the program can no longer accidentally break the chunking structure (unless flattenChunks is also used).

def f(a: A): ZIO[R, E, B]
def g(b: B): ZIO[R, E, C]

stream // ZStream[R, E, A]
.chunks // ZStream[R, E, Chunk[A]]
.mapZIO { chunk => ZIO.foreach(chunk)(f) } // ZStream[R, E, Chunk[B]]
.mapZIO { chunk => ZIO.foreach(chunk)(g) } // ZStream[R, E, Chunk[C]]
.flattenChunks // ZStream[R, E, C]

Side effects per chunk with tapChunks​

Available since zio-streams 2.1.17.

Unlike tap', the tapChunks' stream operator preserves the chunking structure and allows side effects per chunk.

stream                                                     // ZStream[R, E, A]
.tapChunks(c => ZIO.logInfo(s"Chunk of size ${c.size}")) // ZStream[R, E, A]

Avoid rechunk​

ZStream also provides the rechunk operator. This operator rebuilds the chunking structure by grouping elements into chunks of fixed size. If there are not enough elements available for a chunk, the operator waits for these elements to arrive. Only when the stream ends a smaller chunk is emitted.

In low-volume kafka applications, waiting for enough elements to arrive can take a long time! In high-volume kafka applications, this operator causes data from multiple polls to be processed together. This results in increased processing latency and memory usage.

Therefore, we recommend that you avoid rechunk in kafka applications.