Skip to main content
Version: 1.0.18

Hub

A Hub[A] is an asynchronous message hub. Publishers can publish messages of type A to the hub and subscribers can subscribe to receive messages of type A from the hub.

Unlike a Queue, where each value offered to the queue can be taken by one taker, each value published to a hub can be received by all subscribers. Whereas a Queue represents the optimal solution to the problem of how to distribute values, a Hub represents the optimal solution to the problem of how to broadcast them.

The fundamental operators on a Hub are publish and subscribe:

import zio._

trait Hub[A] {
def publish(a: A): UIO[Boolean]
def subscribe: ZManaged[Any, Nothing, Dequeue[A]]
}

The publish operator returns a ZIO effect that publishes a message of type A to the hub and succeeds with a value describing whether the message was successfully published to the hub.

The subscribe operator returns a ZManaged effect where the acquire action of the ZManaged subscribes to the hub and the release action unsubscribes from the hub. Within the context of the ZManaged we have access to a Dequeue, which is a Queue that can only be dequeued from, that allows us to take messages published to the hub.

For example, we can use a hub to broadcast a message to multiple subscribers like this:

Hub.bounded[String](2).flatMap { hub =>
hub.subscribe.zip(hub.subscribe).use { case (left, right) =>
for {
_ <- hub.publish("Hello from a hub!")
_ <- left.take.flatMap(console.putStrLn(_))
_ <- right.take.flatMap(console.putStrLn(_))
} yield ()
}
}

A subscriber will only receive messages that are published to the hub while it is subscribed. So if we want to make sure that a particular message is received by a subscriber we must take care that the subscription has completed before publishing the message to the hub.

We can do this by publishing a message to the hub within the scope of the subscription as in the example above or by using other coordination mechanisms such as completing a Promise when the acquire action of the ZManaged has completed.

Of course, in many cases such as subscribing to receive real time data we may not care about this because we are happy to just pick up with the most recent messages after we have subscribed. But for testing and simple applications this can be an important point to keep in mind.

Constructing Hubs​

The most common way to create a hub is with the bounded constructor, which returns an effect that creates a new hub with the specified requested capacity.

def bounded[A](requestedCapacity: Int): UIO[Hub[A]] =
???

For maximum efficiency you should create hubs with capacities that are powers of two.

Just like a bounded queue, a bounded hub applies back pressure to publishers when it is at capacity, so publishers will semantically block on calls to publish if the hub is full.

The advantage of the back pressure strategy is that it guarantees that all subscribers will receive all messages published to the hub while they are subscribed. However, it does create the risk that a slow subscriber will slow down the rate at which messages are published and received by other subscribers.

If you do not want this you can create a hub with the dropping constructor.

def dropping[A](requestedCapacity: Int): UIO[Hub[A]] =
???

A dropping hub will simply drop values published to it if the hub is at capacity, returning false on calls to publish if the hub is full to signal that the value was not successfully published.

The advantage of the dropping strategy is that publishers can continue to publish new values so when there is space in the hub the newest values can be published to the hub. However, subscribers are no longer guaranteed to receive all values published to the hub and a slow subscriber can still prevent messages from being published to the hub and received by other subscribers.

You can also create a hub with the sliding constructor.

def sliding[A](requestedCapacity: Int): UIO[Hub[A]] =
???

A sliding hub will drop the oldest value if a new value is published to it and the hub is at capacity, so publishing will always succeed immediately.

The advantage of the sliding strategy is that a slow subscriber cannot slow down that rate at which messages are published to the hub or received by other subscribers. However, it creates the risk that slow subscribers may not receive all messages published to the hub.

Finally, you can create a hub with the unbounded constructor.

def unbounded[A]: UIO[Hub[A]] =
???

An unbounded hub is never at capacity so publishing to an unbounded hub always immediately succeeds.

The advantage of an unbounded hub is that it combines the guarantees that all subscribers will receive all messages published to the hub and that a slow subscriber will not slow down the rate at which messages are published and received by other subscribers. However, it does this at the cost of potentially growing without bound if messages are published to the hub more quickly than they are taken by the slowest subscriber.

In general you should prefer bounded, dropping, or sliding hubs for this reason. However, unbounded hubs can be useful in certain situations where you do not know exactly how many values will be published to the hub but are confident that it will not exceed a reasonable size or want to handle that concern at a higher level of your application.

Operators On Hubs​

In addition to publish and subscribe, many of the same operators that are available on queues are available on hubs.

We can publish multiple values to the hub using the publishAll operator.

trait Hub[A] {
def publishAll(as: Iterable[A]): UIO[Boolean]
}

We can check the capacity of the hub as well as the number of messages currently in the hub using the size and capacity operators.

trait Hub[A] {
def capacity: Int
def size: UIO[Int]
}

Note that capacity returns an Int because the capacity is set at hub creation and never changes. In contrast, size returns a ZIO effect that determines the current size of the hub since the number of messages in the hub can change over time.

We can also shut down the hub, check whether it has been shut down, or await its shut down. Shutting down a hub will shut down all the queues associated with subscriptions to the hub, properly propagating the shut down signal.

trait Hub[A] {
def awaitShutdown: UIO[Unit]
def isShutdown: UIO[Boolean]
def shutdown: UIO[Unit]
}

As you can see, the operators on Hub are identical to the ones on Queue with the exception of publish and subscribe replacing offer and take. So if you know how to use a Queue you already know how to use a Hub.

In fact, a Hub can be viewed as a Queue that can only be written to.

trait Hub[A] {
def toQueue[A]: Enqueue[A]
}

Here the Enqueue type represents a queue that can only be enqueued. Enqueing to the queue publishes a value to the hub, shutting down the queue shuts down the hub, and so on.

This can be extremely useful because it allows us to use a Hub anywhere we are currently using a Queue that we only write to.

For example, say we are using the into operator on ZStream to send all elements of a stream of financial transactions to a Queue for processing by a downstream consumer.

import zio.stream._

trait ZStream[-R, +E, +O] {
def into[R1 <: R, E1 >: E](
queue: ZEnqueue[R1, Nothing, Take[E1, O]]
): ZIO[R1, E1, Unit]
}

We would now like to have multiple downstream consumers process each of these transactions, for example to persist them and log them in addition to applying our business logic to them. With Hub this is easy because we can just use the toQueue operator to view any Hub as a Queue that can only be written to.

type Transaction = ???

val transactionStream: ZStream[Any, Nothing, Transaction] =
???

val hub: Hub[Transaction] =
???

transactionStream.into(hub.toQueue)

All of the elements from the transaction stream will now be published to the hub. We can now have multiple downstream consumers process elements from the financial transactions stream with the guarantee that all downstream consumers will see all transactions in the stream, changing the topology of our data flow from one-to-one to one-to-many with a single line change.

Polymorphic Hubs​

Like many of the other data structures in ZIO, a Hub is actually a type alias for a more polymorphic data structure called a ZHub.

trait ZHub[-RA, -RB, +EA, +EB, -A, B] {
def publish(a: A): ZIO[RA, EA, Boolean]
def subscribe: ZManaged[Any, Nothing, ZDequeue[RB, EB, B]]
}

type Hub[A] = ZHub[Any, Any, Nothing, Nothing, A, A]

A ZHub allows publishers to publish messages of type A to the hub and subscribers to subscribe to receive messages of type B from the hub. Publishing messages to the hub can require an environment of type RA and fail with an error of type EA and taking messages from the hub can require an environment of type RB and fail with an error of type EB.

Defining hubs polymorphically like this allows us to describe hubs that potentially transform their inputs or outputs in some way.

To create a polymorphic hub we begin with a normal hub as described above and then add logic to it for transforming its inputs or outputs.

We can transform the type of messages received from the hub using the map and mapM operators.

trait ZHub[-RA, -RB, +EA, +EB, -A, +B] {
def map[C](f: B => C): ZHub[RA, RB, EA, EB, A, C]
def mapM[RC <: RB, EC >: EB, C](f: B => ZIO[RC, EC, C]): ZHub[RA, RC, EA, EC, A, C]
}

The map operator allows us to transform the type of messages received from the hub with the specified function. Conceptually, every time a message is taken from the hub by a subscriber it will first be transformed with the function f before being received by the subscriber.

The mapM operator works the same way except it allows us to perform an effect each time a value is taken from the hub. We could use this for example to log each time a message is taken from the hub.

import zio.clock._

val hub: Hub[Int] = ???

val hubWithLogging: ZHub[Any, Clock with Console, Nothing, Nothing, Int, Int] =
hub.mapM { n =>
clock.currentDateTime.orDie.flatMap { currentDateTime =>
console.putStrLn(s"Took message $n from the hub at $currentDateTime").orDie
}.as(n)
}

Note that the specified function in map or mapM will be applied each time a message is taken from the hub by a subscriber. Thus, if there are n subscribers to the hub the function will be evaluated n times for each message published to the hub.

This can be useful if we want to, for example, observe the different times that different subscribers are taking messages from the hub as in the example above. However, it is less efficient if we want to apply a transformation once for each value published to the hub.

For this we can use the contramap and contramapM operators defined on ZHub.

trait ZHub[-RA, -RB, +EA, +EB, -A, +B] {
def contramap[C](
f: C => A
): ZHub[RA, RB, EA, EB, C, B]
def contramapM[RC <: RA, EC >: EA, C](
f: C => ZIO[RC, EC, A]
): ZHub[RC, RB, EC, EB, C, B]
}

The contramap operator allows us to transform each value published to the hub by applying the specified function. Conceptually it returns a new hub where every time we publish a value we first transform it with the specified function before publishing it to the original hub.

The contramapM operator works the same way except it allows us to perform an effect each time a message is published to the hub.

Using these operators, we could describe a hub that validates its inputs, allowing publishers to publish raw data and subscribers to receive validated data while signaling to publishers when data they attempt to publish is not valid.

import zio.clock._

val hub: Hub[Int] = ???

val hubWithLogging: ZHub[Any, Any, String, Nothing, String, Int] =
hub.contramapM { (s: String) =>
ZIO.effect(s.toInt).orElseFail(s"$s is not a valid message")
}

We can also transform inputs and outputs at the same time using the dimap or dimapM operators.

trait ZHub[-RA, -RB, +EA, +EB, -A, +B] {
def dimap[C, D](
f: C => A,
g: B => D
): ZHub[RA, RB, EA, EB, C, D]
def dimapM[RC <: RA, RD <: RB, EC >: EA, ED >: EB, C, D](
f: C => ZIO[RC, EC, A],
g: B => ZIO[RD, ED, D]
): ZHub[RC, RD, EC, ED, C, D]
}

These correspond to transforming the inputs and outputs of a hub at the same time using the specified functions. This is the same as transforming the outputs with map or mapM and the inputs with contramap or contramapM.

In addition to just transforming the inputs and outputs of a hub we can also filter the inputs or outputs of a hub.

trait ZHub[-RA, -RB, +EA, +EB, -A, +B] {
def filterInput[A1 <: A](
f: A1 => Boolean
): ZHub[RA, RB, EA, EB, A1, B]
def filterInputM[RA1 <: RA, EA1 >: EA, A1 <: A](
f: A1 => ZIO[RA1, EA1, Boolean]
): ZHub[RA1, RB, EA1, EB, A1, B]
def filterOutput(
f: B => Boolean
): ZHub[RA, RB, EA, EB, A, B]
def filterOutputM[RB1 <: RB, EB1 >: EB](
f: B => ZIO[RB1, EB1, Boolean]
): ZHub[RA, RB1, EA, EB1, A, B]
}

Filtering the inputs to a hub conceptually "throws away" messages that do not meet the filter predicate before they are published to the hub. The publish operator will return false to signal that such a message was not successfully published to the hub.

Similarly, filtering the outputs from a hub causes subscribers to ignore messages that do not meet the filter predicate, continuing to take messages from the hub until they find one that does meet the filter predicate.

We could, for example, create a hub that only handles tweets containing a particular term.

final case class Tweet(text: String)

val hub: Hub[Tweet] = ???

val zioHub: Hub[Tweet] =
hub.filterInput(_.text.contains("zio"))

In most cases the hubs we work with in practice will be monomorphic hubs and we will use the hub purely to broadcast values, performing any necessary effects before publishing values to the hub or after taking values from the hub. But it is nice to know that we have this kind of power if we need it.

Hubs And Streams​

Hubs play extremely well with streams.

We can create a ZStream from a subscription to a hub using the fromHub operator.

import zio.stream._

object ZStream {
def fromHub[R, E, O](hub: ZHub[Nothing, R, Any, E, Nothing, O]): ZStream[R, E, O] =
???
}

This will return a stream that subscribes to receive values from a hub and then emits every value published to the hub while the subscription is active. When the stream ends the subscriber will automatically be unsubscribed from the hub.

There is also a fromHubManaged operator that returns the stream in the context of a managed effect.

object ZStream {
def fromHubManaged[R, E, O](
hub: ZHub[Nothing, R, Any, E, Nothing, O]
): ZManaged[Any, Nothing, ZStream[R, E, O]] =
???
}

The managed effect here describes subscribing to receive messages from the hub while the stream describes taking messages from the hub. This can be useful when we need to ensure that a consumer has subscribed before a producer begins publishing values.

Here is an example of using it:

for {
promise <- Promise.make[Nothing, Unit]
hub <- Hub.bounded[String](2)
managed = ZStream.fromHubManaged(hub).tapM(_ => promise.succeed(()))
stream = ZStream.unwrapManaged(managed)
fiber <- stream.take(2).runCollect.fork
_ <- promise.await
_ <- hub.publish("Hello")
_ <- hub.publish("World")
_ <- fiber.join
} yield ()

Notice that in this case we used a Promise to ensure that the subscription had completed before publishing to the hub. The ZManaged in the return type of fromHubManaged made it easy for us to signal when the subscription had occurred by using tapM and completing the Promise.

Of course in many real applications we don't need this kind of sequencing and just want to subscribe to receive new messages. In this case we can use the fromHub operator to return a ZStream that will automatically handle subscribing and unsubscribing for us.

There is also a fromHubWithShutdown variant that shuts down the hub itself when the stream ends. This is useful when the stream represents your main application logic and you want to shut down other subscriptions to the hub when the stream ends.

Each of these constructors also has Chunk variants, fromChunkHub and fromChunkHubWithShutdown, that allow you to preserve the chunked structure of data when working with hubs and streams.

In addition to being able to create streams from subscriptions to hubs, there are a variety of ways to send values emitted by streams to hubs to build more complex data flow graphs.

The simplest of these is the toHub operator, which constructs a new hub and publishes each element emitted by the stream to that hub.

trait ZStream[-R, +E, +O] {
def toHub(
capacity: Int
): ZManaged[R, Nothing, ZHub[Nothing, Any, Any, Nothing, Nothing, Take[E, O]]]
}

The hub will be constructed with the bounded constructor using the specified capacity.

If you want to send values emitted by a stream to an existing hub or a hub created using one of the other hub constructors you can use the intoHub operator.

trait ZStream[-R, +E, +O] {
def intoHub[R1 <: R, E1 >: E](
hub: ZHub[R1, Nothing, Nothing, Any, Take[E1, O], Any]
): ZIO[R1, E1, Unit]
}

There is an intoHubManaged variant of this if you want to send values to the hub in the context of a ZManaged instead of a ZIO effect.

You can also create a sink that sends values to a hub.

object ZSink {
def fromHub[R, E, I](
hub: ZHub[R, Nothing, E, Any, I, Any]
): ZSink[R, E, I, Nothing, Unit] =
???
}

The sink will publish each value sent to the sink to the specified hub. Again there is a fromHubWithShutdown variant that will shut down the hub when the stream ends.

Finally, ZHub is used internally to provide a highly efficient implementation of the broadcast family of operators, including broadcast and broadcastDynamic.

trait ZStream[-R, +E, +O] {
def broadcast(
n: Int,
maximumLag: Int
): ZManaged[R, Nothing, List[ZStream[Any, E, O]]]
def broadcastDynamic(
maximumLag: Int
): ZManaged[R, Nothing, ZManaged[Any, Nothing, ZStream[Any, E, O]]]
}

The broadcast operator generates the specified number of new streams and broadcasts each value from the original stream to each of the new streams. The broadcastDynamic operator returns a new ZManaged value that you can use to dynamically subscribe and unsubscribe to receive values broadcast from the original stream.

You don't have to do anything with ZHub to take advantage of these operators other than enjoy their optimized implementation in terms of ZHub.

With broadcast and other ZStream operators that model distributing values to different streams and combining values from different streams it is straightforward to build complex data flow graphs, all while being as performant as possible.