Skip to main content
Version: 2.0.x

Writing Tests with the `zio-kafka-testkit` library

zio-kafka provides a zio-kafka-testkit library to help you test your code using zio-kafka.

To add it in your project, add the following dependency in your build.sbt:

libraryDependencies += "dev.zio" %% "zio-kafka-testkit" % "<latest-version>" % Test

Let's study some examples of tests you can write with the zio-kafka-testkit and zio-test and let's see what this library provides you.

Testing a producer

import org.apache.kafka.clients.producer.ProducerRecord
import zio._
import zio.kafka.producer.Producer
import zio.kafka.serde.Serde
import zio.kafka.testkit.KafkaTestUtils._ // An object containing several utilities to simplify writing your tests
import zio.kafka.testkit.Kafka // A trait representing a Kafka instance in your tests
import zio.test.TestAspect.{ sequential, timeout }
import zio.test._

object ProducerSpec extends ZIOSpecDefault {
override def spec: Spec[TestEnvironment & Scope, Any] =
(
suite("Producer test suite")(
test("minimal example") {
for {
_ <- Producer.produce(new ProducerRecord("topic", "boo", "baa"), Serde.string, Serde.string)
} yield assertCompletes
}
// ... more tests ...
)
.provideSome[Kafka](producer) // Here, we provide a new instance of Producer per test
.provideSomeShared[Scope](Kafka.embedded) // Here, we provide an instance of Kafka for the entire suite
) @@ timeout(2.minutes) @@ sequential
}

This test is a very minimal example.
It uses the Producer.produce method from zio-kafka to produce a record to the Kafka cluster.
The assertCompletes assertion from zio-test is used to check that the effect completes successfully.

In this example, we decided to instantiate a new Producer for each test, with the .provideSome[Kafka](producer).
We could have decided to share one instance of Producer between all the tests of this suite by moving the producer layer to the provideSomeShared, which would have looked like this:

suite("producer test suite")(
// ... tests ...
).provideSomeShared[Scope](Kafka.embedded, producer)

This producer layer comes from the KafkaTestUtils object in zio-kafka-testkit. It is a layer that bootstraps a Producer instance.

In this example, we decided to share an instance of Kafka for the entire suite, with the .provideSomeShared[Scope](Kafka.embedded).
Kafka is slow to start, so it is better to only start it once and share it between all tests of the suite.

We could have decided to instantiate a new instance of Kafka for each test by moving the Kafka.embedded layer to the provideSome, which would have looked like this:

suite("producer test suite")(
// ... tests ...
).provideSome[Scope](Kafka.embedded, producer)

We could also have decided to share one instance of Kafka between different test suites (i.e. between different test files) by mixing the ZIOSpecWithKafka trait, which would have looked like this:

object ProducerSpec extends ZIOSpecWithKafka { // Note the `ZIOSpecWithKafka` trait usage here instead of `ZIOSpecDefault`
override def spec: Spec[TestEnvironment & Kafka, Any] =
(
suite("Producer test suite")(
// ... tests ...
)
.provideSome[Kafka](producer) // No need here to provide a Kafka instance, it is already provided by the `ZIOSpecWithKafka` trait
) @@ timeout(2.minutes) @@ sequential
}

More details about this ZIOSpecWithKafka trait below.

Finally, we annotate the suite with the timeout and sequential aspects.
The timeout aspect from zio-test is used to specify a timeout for the entire suite. If the suite takes more than 5 minutes to run, it will fail.
The sequential aspect from zio-test is used to specify that the tests in the suite must be run sequentially. This is necessary because Kafka is a shared resource. We don't want tests to interfere with each other.

Testing a consumer

import zio._
import zio.kafka.consumer.{ Consumer, Subscription }
import zio.kafka.serde.Serde
import zio.kafka.testkit.KafkaTestUtils.{ consumer, produceMany, producer }
import zio.kafka.testkit._
import zio.test.Assertion.hasSameElements
import zio.test.TestAspect.{ sequential, timeout }
import zio.test._

object ConsumerSpec extends ZIOSpecDefault with KafkaRandom {
override def kafkaPrefix: String = "consumer-spec"

override def spec: Spec[TestEnvironment & Scope, Any] =
(
suite("Consumer test suite")(
test("minimal example") {
val kvs: List[(String, String)] = (1 to 5).toList.map(i => (s"key-$i", s"msg-$i"))
for {
topic <- randomTopic
client <- randomClient
group <- randomGroup

_ <- produceMany(topic, kvs) // Comes from `KafkaTestUtils`. Produces messages to the topic.

records <- Consumer
.plainStream(Subscription.Topics(Set(topic)), Serde.string, Serde.string)
.take(5)
.runCollect
.provideSome[Kafka](
// Comes from `KafkaTestUtils`
consumer(clientId = client, groupId = Some(group))
)
consumed = records.map(r => (r.record.key, r.record.value)).toList
} yield assert(consumed)(hasSameElements(kvs))
},
// ... more tests ...
)
.provideSome[Kafka](producer) // Here, we provide a new instance of Producer per test
.provideSomeShared[Scope](Kafka.embedded) // Here, we provide an instance of Kafka for the entire suite
) @@ timeout(2.minutes) @@ sequential
}

This test is also a quite minimal example.
We produce 5 messages thanks to the KafkaTestUtils.produceMany method from zio-kafka-testkit, then we consume them with the Consumer.plainStream method from zio-kafka.
Finally, we use the hasSameElements assertion from zio-test to check that the consumed records are the same as the ones we produced.

In this example, we're reusing the producer and the Kafka.embedded layers we've seen in the Producer test example.
We're also using the KafkaTestUtils.consumer layer from zio-kafka-testkit to instantiate a new Consumer.

Finally, we use the KafkaRandom trait from zio-kafka-testkit and its methods to generate random values for the Consumer client ID, the Consumer group ID and the topic name.
More details about this KafkaRandom trait later in this page.
Using random values for these parameters is important to avoid conflicts between tests as we share one Kafka instance between all the tests of the suite.

Utilities provided by the zio-kafka-testkit library

Kafka service

This trait represents a Kafka instance in your tests.
It is used to provide the bootstrap servers to the Producer and Consumer layers.

trait Kafka {
def bootstrapServers: List[String]
def stop(): UIO[Unit]
}

The companion object provides a few layers to provide a Kafka instance in your tests:

object Kafka {
/**
* Creates an in-memory Kafka instance with a random port.
*/
val embedded: ZLayer[Any, Throwable, Kafka]

/**
* Will connect to a Kafka instance running on localhost:9092 (with Docker, for example).
*/
val local: ULayer[Kafka]

/**
* Creates an in-memory Kafka instance with a random port and SASL authentication configured.
*/
val saslEmbedded: ZLayer[Any, Throwable, Kafka.Sasl]

/**
* Creates an in-memory Kafka instance with a random port and SSL authentication configured.
*/
val sslEmbedded: ZLayer[Any, Throwable, Kafka]
}

The in-memory Kafka instances are created using embedded-kafka.

KafkaTestUtils utilities

This object provides several utilities to simplify writing your tests, like layers to boot a Producer, a Consumer, or an AdminClient.
It also provides several functions to produce records, and more.
Each utility function is documented in the source code. Please have a look at the source code for more details.
You can also look at zio-katka tests in the zio-kafka-test module to have examples on how to use these utilities.

ZIOSpecWithKafka trait

This trait can be used if you want to share one Kafka instance between different test suites.
This allows you to speed up your tests by booting a Kafka instance only once for all your test suites using this trait.

Usage example:

// In `src/test/scala/io/example/producer/ProducerSpec.scala`
object ProducerSpec extends ZIOSpecWithKafka { // Note the `ZIOSpecWithKafka` trait usage here instead of `ZIOSpecDefault`
override def spec: Spec[TestEnvironment & Kafka, Any] =
(
suite("Producer test suite")(
// ... tests ...
)
.provideSome[Kafka](producer) // No need here to provide a Kafka instance, it is already provided by the `ZIOSpecWithKafka` trait
) @@ timeout(2.minutes) @@ sequential
}

// In `src/test/scala/io/example/consumer/ConsumerSpec.scala`
object ConsumerSpec extends ZIOSpecWithKafka { // Note the `ZIOSpecWithKafka` trait usage here instead of `ZIOSpecDefault`
override def spec: Spec[TestEnvironment & Kafka, Any] =
(
suite("Consumer test suite")(
// ... tests ...
)
.provideSome[Kafka](producer) // No need here to provide a Kafka instance, it is already provided by the `ZIOSpecWithKafka` trait
) @@ timeout(2.minutes) @@ sequential
}

This is a capability offered by ZIO2.
See related zio-test documentation: https://zio.dev/reference/test/sharing-layers-between-multiple-files/

KafkaRandom trait

The KafkaRandom trait provides a few methods to generate random values. To use it, you need to mix it in your test suite, like this:

import zio.kafka.consumer.Consumer
import zio.kafka.testkit.KafkaRandom
import zio.kafka.testkit.Kafka
import zio.kafka.testkit.KafkaTestUtils.consumer
import zio.test.{ assertTrue, Spec, TestEnvironment, ZIOSpecDefault }
import zio._

object MyServiceSpec extends ZIOSpecDefault with KafkaRandom {
// Required when mixing in the `KafkaRandom` trait
// The best is to use a different prefix for each test suite
override def kafkaPrefix: String = "my-service"

override def spec: Spec[TestEnvironment & Scope, Any] =
suite("MyService")(
test("minimal example") {
for {
group <- randomGroup // Comes from `KafkaRandom`
clientId <- randomClient // Comes from `KafkaRandom`
metrics <- Consumer.metrics
.provideSome[Kafka](
// Comes from `KafkaTestUtils`
consumer(clientId = clientId, groupId = Some(group))
)
} yield assertTrue(metrics.nonEmpty)
}
).provideSomeShared[Scope](Kafka.embedded)
}