Skip to main content
Version: 2.x

Example of Consuming, Producing and Committing Offsets

This example shows how to consume messages from topic topic_a and produce transformed messages to topic_b, after which consumer offsets are committed. Processing is done in chunks using ZStreamChunk for more efficiency. Please note: ZIO consumer does not support automatic offset committing. As a result, it ignores the Kafka consumer setting enable.auto.commit=true. Developers should manually commit offsets using the provided commit methods, typically after processing messages or at appropriate points in their application logic.

import zio.ZLayer
import zio.kafka.consumer._
import zio.kafka.producer._
import zio.kafka.serde._
import org.apache.kafka.clients.producer.ProducerRecord

val consumerSettings: ConsumerSettings = ConsumerSettings(List("localhost:9092")).withGroupId("group")
val producerSettings: ProducerSettings = ProducerSettings(List("localhost:9092"))

val consumerAndProducer =
ZLayer.scoped(Consumer.make(consumerSettings)) ++
ZLayer.scoped(Producer.make(producerSettings, Serde.int, Serde.string))

val consumeProduceStream = Consumer
.plainStream(Subscription.topics("my-input-topic"), Serde.int, Serde.long)
.map { record =>
val key: Int = record.record.key()
val value: Long = record.record.value()
val newValue: String = value.toString

val producerRecord: ProducerRecord[Int, String] = new ProducerRecord("my-output-topic", key, newValue)
(producerRecord, record.offset)
}
.mapChunksZIO { chunk =>
val records = chunk.map(_._1)
val offsetBatch = OffsetBatch(chunk.map(_._2).toSeq)

Producer.produceChunk[Any, Int, String](records) *> offsetBatch.commit.as(Chunk(()))
}
.runDrain
.provideSomeLayer(consumerAndProducer)