Skip to main content
Version: 2.x

ZIO Pulsar

ZIO Pulsar is the Apache Pulsar client for Scala with ZIO and ZIO Streams integration.

Introduction​

ZIO Pulsar is a purely functional Scala wrapper over the official Pulsar client. Some key features of this library:

  • Type-safe — Utilizes Scala type system to reduce runtime exceptions present in the official Java client.
  • Streaming-enabled — Naturally integrates with ZIO Streams.
  • ZIO integrated — Uses common ZIO primitives like ZIO effect and ZManaged to reduce the boilerplate and increase expressiveness.

Installation​

In order to use this library, we need to add the following line in our build.sbt file for Scala 3:

libraryDependencies += "com.github.jczuchnowski" %% "zio-pulsar" % "0.1"

Example​

First of all we need to create an instance of Apache Pulsar and run that:

docker run -it \
-p 6650:6650 \
-p 8080:8080 \
--mount source=pulsardata,target=/pulsar/data \
--mount source=pulsarconf,target=/pulsar/conf \
--network pulsar \
apachepulsar/pulsar:2.7.0 \
bin/pulsar standalone

Now we can run the following example:

import org.apache.pulsar.client.api.{PulsarClientException, Schema}
import zio._
import zio.blocking._
import zio.clock._
import zio.console._
import zio.pulsar._
import zio.stream._

import java.nio.charset.StandardCharsets

object StreamingExample extends zio.App {
val topic = "my-topic"

val producer: ZManaged[Has[PulsarClient], PulsarClientException, Unit] =
for {
sink <- Producer.make(topic, Schema.STRING).map(_.asSink)
_ <- Stream.fromIterable(0 to 100).map(i => s"Message $i").run(sink).toManaged_
} yield ()

val consumer: ZManaged[Has[PulsarClient] with Blocking with Console, PulsarClientException, Unit] =
for {
builder <- ConsumerBuilder.make(Schema.STRING).toManaged_
consumer <- builder
.subscription(Subscription("my-subscription", SubscriptionType.Exclusive))
.topic(topic)
.build
_ <- consumer.receiveStream.take(10).foreach { e =>
consumer.acknowledge(e.getMessageId) *>
putStrLn(new String(e.getData, StandardCharsets.UTF_8)).orDie
}.toManaged_
} yield ()

val myApp =
for {
f <- consumer.fork
_ <- producer
_ <- f.join.toManaged_
} yield ()

def run(args: List[String]): URIO[ZEnv, ExitCode] =
myApp
.provideCustom(
(Console.live ++ Clock.live) >+>
PulsarClient.live("localhost", 6650)
).useNow.exitCode
}

Resources​

  • ZIO World - ZIO PULSAR by Jakub Czuchnowski (March 2020) — A new library that offers a native, first-class ZIO experience for Pulsar, the Kafka competitor gaining traction for some use cases.