How to Interop with Reactive Streams?
Checkout the interop-reactive-streams module for inter-operation support.
Reactive Streams Producer and Subscriber​
ZIO integrates with Reactive Streams by providing conversions from zio.stream.Stream to org.reactivestreams.Publisher
and from zio.stream.Sink to org.reactivestreams.Subscriber and vice versa. Simply import import zio.interop.reactivestreams._ to make the
conversions available.
Examples​
First, let's get a few imports out of the way.
import org.reactivestreams.example.unicast._
import zio._
import zio.interop.reactivestreams._
import zio.stream._
val runtime = Runtime.default
We use the following Publisher and Subscriber for the examples:
val publisher = new RangePublisher(3, 10)
val subscriber = new SyncSubscriber[Int] {
override protected def whenNext(v: Int): Boolean = {
print(s"$v, ")
true
}
}
Publisher to Stream​
A Publisher used as a Stream buffers up to qSize elements. If possible, qSize should be
a power of two for best performance. The default is 16.
val streamFromPublisher = publisher.toStream(qSize = 16)
runtime.unsafeRun(
streamFromPublisher.run(Sink.collectAll[Integer])
)
Subscriber to Sink​
When running a Stream to a Subscriber, a side channel is needed for signalling failures.
For this reason toSink returns a tuple of Promise and Sink. The Promise must be failed
on Stream failure. The type parameter on toSink is the error type of the Stream.
val asSink = subscriber.toSink[Throwable]
val failingStream = Stream.range(3, 13) ++ Stream.fail(new RuntimeException("boom!"))
runtime.unsafeRun(
asSink.flatMap { case (errorP, sink) =>
failingStream.run(sink).catchAll(errorP.fail)
}
)
Stream to Publisher​
val stream = Stream.range(3, 13)
runtime.unsafeRun(
stream.toPublisher.flatMap { publisher =>
UIO(publisher.subscribe(subscriber))
}
)
Sink to Subscriber​
toSubscriber returns a Subscriber and an IO which completes with the result of running the
Sink or the error if the Publisher fails.
A Sink used as a Subscriber buffers up to qSize elements. If possible, qSize should be
a power of two for best performance. The default is 16.
val sink = Sink.collectAll[Integer]
runtime.unsafeRun(
sink.toSubscriber(qSize = 16).flatMap { case (subscriber, result) =>
UIO(publisher.subscribe(subscriber)) *> result
}
)