Skip to main content
Version: 2.0.x

Sharing a Consumer between multiple streams

Often in a single application, you want to consume from multiple Kafka topics and process each topic in a distinct way. With zio-kafka you can use a single Consumer instance for multiple streams from different topics. It is not only easier to create a single Consumer layer instead of one for each topic, but it may be more resource efficient as well. The underlying Apache Kafka consumer, its thread pool and communication with the Kafka brokers will be shared, resulting in less resource consumption compared to when you create a Consumer instance for every topic.

For each of the topics/patterns you subscribe to, you can define a dedicated stream to process the records, a dedicated Deserializer for the keys, and a dedicated Deserializer for the values. Other settings like poll interval and offset strategy are common to all subscriptions. For example, the value of the max.poll.records setting is the maximum number of records returned in each poll for all the subscriptions combined. If you need different settings for each topic/pattern, you need to create a Consumer instance per topic/pattern.

import zio._
import zio.Console.printLine
import zio.kafka.consumer._

// Create a single Consumer instance
val consumerSettings: ConsumerSettings = ConsumerSettings(List("localhost:9092")).withGroupId("group")
val consumer: ZLayer[Any, Throwable, Consumer] = ZLayer.scoped(Consumer.make(consumerSettings))

// Define two streams, each for a different topic and key-value types
val stream1 = Consumer.plainStream(Subscription.topics("topic1"), Serde.string, Serde.string)
.tap(cr => printLine(s"For topic1, got key: ${cr.record.key}, value: ${cr.record.value}"))
.map(_.offset)
.aggregateAsync(Consumer.offsetBatches)
.mapZIO(_.commit)
.runDrain

val stream2 = Consumer.plainStream(Subscription.topics("topic2"), Serde.uuid, Serde.int)
.tap(cr => printLine(s"For topic2, got key: ${cr.record.key}, value: ${cr.record.value}"))
.map(_.offset)
.aggregateAsync(Consumer.offsetBatches)
.mapZIO(_.commit)
.runDrain

// Run both streams with the same consumer instance
(stream1 zipPar stream2).provideSomeLayer(consumer)

Each Stream can have a different lifetime. Subscriptions are tied to their Stream's scope, so when a Stream completes (successfully, via failure or interruption), the corresponding subscription is removed.

Consumer sharing is only possible when using the same type of Subscription: Topics, Pattern or Manual. Mixing different types will result in a failed stream. Note that every Topic subscription can also be written as a Pattern subscription, so if you want to mix Topic and Pattern subscriptions, you can just use Pattern.

If your subscriptions overlap, eg Subscription.topics("topic1", "topic2") and Subscription.topics("topic2", "topic3"), which stream will get the partitions of topic2 is not deterministic.

Note that upon starting or ending subscriptions, Kafka will also reassign all partitions of the other subscriptions. Record fetching will resume from the last committed offset, so records may be processed twice when not all offsets are committed yet. This works the same as for for regular rebalancing. To minimize this issue, commit frequently.