Skip to main content
Version: 2.x

Example of Consuming, Producing and Committing Offsets

This example shows how to consume messages from topic topic_a and produce transformed messages to topic_b, after which consumer offsets are committed. Processing is done in chunks using ZStreamChunk for more efficiency. Please note: ZIO consumer does not support automatic offset committing. As a result, it ignores the Kafka consumer setting Developers should manually commit offsets using the provided commit methods, typically after processing messages or at appropriate points in their application logic.

import zio.ZLayer
import zio.kafka.consumer._
import zio.kafka.producer._
import zio.kafka.serde._
import org.apache.kafka.clients.producer.ProducerRecord

val consumerSettings: ConsumerSettings = ConsumerSettings(List("localhost:9092")).withGroupId("group")
val producerSettings: ProducerSettings = ProducerSettings(List("localhost:9092"))

val consumerAndProducer =
ZLayer.scoped(Consumer.make(consumerSettings)) ++
ZLayer.scoped(Producer.make(producerSettings,, Serde.string))

val consumeProduceStream = Consumer
.plainStream(Subscription.topics("my-input-topic"),, Serde.long)
.map { record =>
val key: Int = record.record.key()
val value: Long = record.record.value()
val newValue: String = value.toString

val producerRecord: ProducerRecord[Int, String] = new ProducerRecord("my-output-topic", key, newValue)
(producerRecord, record.offset)
.mapChunksZIO { chunk =>
val records =
val offsetBatch = OffsetBatch(

Producer.produceChunk[Any, Int, String](records) *>