Skip to main content
Version: 2.x

How to Interop with Reactive Streams?

Checkout the interop-reactive-streams module for inter-operation support.

Reactive Streams Producer and Subscriber​

ZIO integrates with Reactive Streams by providing conversions from zio.stream.Stream to org.reactivestreams.Publisher and from zio.stream.Sink to org.reactivestreams.Subscriber and vice versa. Simply import import zio.interop.reactivestreams._ to make the conversions available.

Examples​

First, let's get a few imports out of the way.

import org.reactivestreams.example.unicast._
import zio._
import zio.interop.reactivestreams._
import zio.stream._

val runtime = Runtime.default

We use the following Publisher and Subscriber for the examples:

val publisher = new RangePublisher(3, 10)
val subscriber = new SyncSubscriber[Int] {
override protected def whenNext(v: Int): Boolean = {
print(s"$v, ")
true
}
}

Publisher to Stream​

A Publisher used as a Stream buffers up to qSize elements. If possible, qSize should be a power of two for best performance. The default is 16.

val streamFromPublisher = publisher.toStream(qSize = 16)
runtime.unsafeRun(
streamFromPublisher.run(Sink.collectAll[Integer])
)

Subscriber to Sink​

When running a Stream to a Subscriber, a side channel is needed for signalling failures. For this reason toSink returns a tuple of Promise and Sink. The Promise must be failed on Stream failure. The type parameter on toSink is the error type of the Stream.

val asSink = subscriber.toSink[Throwable]
val failingStream = Stream.range(3, 13) ++ Stream.fail(new RuntimeException("boom!"))
runtime.unsafeRun(
asSink.flatMap { case (errorP, sink) =>
failingStream.run(sink).catchAll(errorP.fail)
}
)

Stream to Publisher​

val stream = Stream.range(3, 13)
runtime.unsafeRun(
stream.toPublisher.flatMap { publisher =>
UIO(publisher.subscribe(subscriber))
}
)

Sink to Subscriber​

toSubscriber returns a Subscriber and an IO which completes with the result of running the Sink or the error if the Publisher fails. A Sink used as a Subscriber buffers up to qSize elements. If possible, qSize should be a power of two for best performance. The default is 16.

val sink = Sink.collectAll[Integer]
runtime.unsafeRun(
sink.toSubscriber(qSize = 16).flatMap { case (subscriber, result) =>
UIO(publisher.subscribe(subscriber)) *> result
}
)