Introduction to ZIO Interop Reactive Streams
This library provides an interoperability layer between ZIO and reactive streams.
Introduction
ZIO integrates with Reactive Streams by providing conversions from zio.stream.Stream
to org.reactivestreams.Publisher
and from zio.stream.Sink
to org.reactivestreams.Subscriber
and vice versa. Simply import import zio.interop.reactivestreams._
to make the conversions available.
Installation
In order to use this library, we need to add the following line in our build.sbt
file:
libraryDependencies += "dev.zio" %% "zio-interop-reactive-streams" % "2.0.2"
Examples
First, let's get a few imports out of the way.
import org.reactivestreams.example.unicast._
import zio._
import zio.interop.reactivestreams._
import zio.stream._
We use the following Publisher
and Subscriber
for the examples:
val publisher = new RangePublisher(3, 10)
val subscriber = new SyncSubscriber[Int] {
override protected def whenNext(v: Int): Boolean = {
print(s"$v, ")
true
}
}
Publisher to Stream
A Publisher
used as a Stream
buffers up to qSize
elements. If possible, qSize
should be
a power of two for best performance. The default is 16.
val streamFromPublisher = publisher.toZIOStream(qSize = 16)
streamFromPublisher.run(Sink.collectAll[Integer])
Subscriber to Sink
When running a Stream
to a Subscriber
, a side channel is needed for signalling failures.
For this reason toZIOSink
returns a tuple of a callback and a Sink
. The callback must be used to signal Stream
failure. The type parameter on toZIOSink
is the error type of the Stream.
val asSink = subscriber.toZIOSink[Throwable]
val failingStream = ZStream.range(3, 13) ++ ZStream.fail(new RuntimeException("boom!"))
ZIO.scoped {
asSink.flatMap { case (signalError, sink) => // FIXME
failingStream.run(sink).catchAll(signalError)
}
}
Stream to Publisher
val stream = Stream.range(3, 13)
stream.toPublisher.flatMap { publisher =>
UIO(publisher.subscribe(subscriber))
}
Sink to Subscriber
toSubscriber
returns a Subscriber
and an IO
which completes with the result of running the Sink
or the error if the Publisher
fails.
A Sink
used as a Subscriber
buffers up to qSize
elements. If possible, qSize
should be a power of two for best performance. The default is 16.
val sink = Sink.collectAll[Integer]
ZIO.scoped {
sink.toSubscriber(qSize = 16).flatMap { case (subscriber, result) =>
UIO(publisher.subscribe(subscriber)) *> result
}
}