Queue
Queue
is a lightweight in-memory queue built on ZIO with composable and transparent back-pressure. It is fully asynchronous (no locks or blocking), purely-functional and type-safe.
A Queue[A]
contains values of type A
and has two basic operations: offer
, which places an A
in the Queue
, and take
which removes and returns the oldest value in the Queue
.
import zio._
val res: UIO[Int] = for {
queue <- Queue.bounded[Int](100)
_ <- queue.offer(1)
v1 <- queue.take
} yield v1
Creating a queue
A Queue
can be bounded (with a limited capacity) or unbounded.
There are several strategies to process new values when the queue is full:
- The default
bounded
queue is back-pressured: when full, any offering fiber will be suspended until the queue is able to add the item; - A
dropping
queue will drop new items when the queue is full; - A
sliding
queue will drop old items when the queue is full.
To create a back-pressured bounded queue:
val boundedQueue: UIO[Queue[Int]] = Queue.bounded[Int](100)
To create a dropping queue:
val droppingQueue: UIO[Queue[Int]] = Queue.dropping[Int](100)
To create a sliding queue:
val slidingQueue: UIO[Queue[Int]] = Queue.sliding[Int](100)
To create an unbounded queue:
val unboundedQueue: UIO[Queue[Int]] = Queue.unbounded[Int]
Adding items to a queue
The simplest way to add a value to the queue is offer
:
val res1: UIO[Unit] = for {
queue <- Queue.bounded[Int](100)
_ <- queue.offer(1)
} yield ()
When using a back-pressured queue, offer might suspend if the queue is full: you can use fork
to wait in a different fiber.
val res2: UIO[Unit] = for {
queue <- Queue.bounded[Int](1)
_ <- queue.offer(1)
f <- queue.offer(1).fork // will be suspended because the queue is full
_ <- queue.take
_ <- f.join
} yield ()
It is also possible to add multiple values at once with offerAll
:
val res3: UIO[Unit] = for {
queue <- Queue.bounded[Int](100)
items = Range.inclusive(1, 10).toList
_ <- queue.offerAll(items)
} yield ()
Consuming Items from a Queue
The take
operation removes the oldest item from the queue and returns it. If the queue is empty, this will suspend, and resume only when an item has been added to the queue. As with offer
, you can use fork
to wait for the value in a different fiber.
val oldestItem: UIO[String] = for {
queue <- Queue.bounded[String](100)
f <- queue.take.fork // will be suspended because the queue is empty
_ <- queue.offer("something")
v <- f.join
} yield v
You can consume the first item with poll
. If the queue is empty you will get None
, otherwise the top item will be returned wrapped in Some
.
val polled: UIO[Option[Int]] = for {
queue <- Queue.bounded[Int](100)
_ <- queue.offer(10)
_ <- queue.offer(20)
head <- queue.poll
} yield head
You can consume multiple items at once with takeUpTo
. If the queue doesn't have enough items to return, it will return all the items without waiting for more offers.
val taken: UIO[List[Int]] = for {
queue <- Queue.bounded[Int](100)
_ <- queue.offer(10)
_ <- queue.offer(20)
list <- queue.takeUpTo(5)
} yield list
Similarly, you can get all items at once with takeAll
. It also returns without waiting (an empty list if the queue is empty).
val all: UIO[List[Int]] = for {
queue <- Queue.bounded[Int](100)
_ <- queue.offer(10)
_ <- queue.offer(20)
list <- queue.takeAll
} yield list
Shutting Down a Queue
It is possible with shutdown
to interrupt all the fibers that are suspended on offer*
or take*
. It will also empty the queue and make all future calls to offer*
and take*
terminate immediately.
val takeFromShutdownQueue: UIO[Unit] = for {
queue <- Queue.bounded[Int](3)
f <- queue.take.fork
_ <- queue.shutdown // will interrupt f
_ <- f.join // Will terminate
} yield ()
You can use awaitShutdown
to execute an effect when the queue is shut down. This will wait until the queue is shut down. If the queue is already shutdown, it will resume right away.
val awaitShutdown: UIO[Unit] = for {
queue <- Queue.bounded[Int](3)
p <- Promise.make[Nothing, Boolean]
f <- queue.awaitShutdown.fork
_ <- queue.shutdown
_ <- f.join
} yield ()
Transforming queues
A Queue[A]
is in fact a type alias for ZQueue[Any, Any, Nothing, Nothing, A, A]
.
The signature for the expanded version is:
trait ZQueue[RA, RB, EA, EB, A, B]
Which is to say:
- The queue may be offered values of type
A
. The enqueueing operations require an environment of typeRA
and may fail with errors of typeEA
; - The queue will yield values of type
B
. The dequeueing operations require an environment of typeRB
and may fail with errors of typeEB
.
Note how the basic Queue[A]
cannot fail or require any environment for any of its operations.
With separate type parameters for input and output, there are rich composition opportunities for queues:
ZQueue#map
The output of the queue may be mapped:
val mapped: UIO[String] =
for {
queue <- Queue.bounded[Int](3)
mapped = queue.map(_.toString)
_ <- mapped.offer(1)
s <- mapped.take
} yield s
ZQueue#mapM
We may also use an effectful function to map the output. For example, we could annotate each element with the timestamp at which it was dequeued:
import java.util.concurrent.TimeUnit
import zio.clock._
val currentTimeMillis = currentTime(TimeUnit.MILLISECONDS)
val annotatedOut: UIO[ZQueue[Any, Clock, Nothing, Nothing, String, (Long, String)]] =
for {
queue <- Queue.bounded[String](3)
mapped = queue.mapM { el =>
currentTimeMillis.map((_, el))
}
} yield mapped
ZQueue#contramapM
Similarly to mapM
, we can also apply an effectful function to
elements as they are enqueued. This queue will annotate the elements
with their enqueue timestamp:
val annotatedIn: UIO[ZQueue[Clock, Any, Nothing, Nothing, String, (Long, String)]] =
for {
queue <- Queue.bounded[(Long, String)](3)
mapped = queue.contramapM { el: String =>
currentTimeMillis.map((_, el))
}
} yield mapped
This queue has the same type as the previous one, but the timestamp is attached to the elements when they are enqueued. This is reflected in the type of the environment required by the queue for enqueueing.
To complete this example, we could combine this queue with mapM
to
compute the time that the elements stayed in the queue:
import zio.duration._
val timeQueued: UIO[ZQueue[Clock, Clock, Nothing, Nothing, String, (Duration, String)]] =
for {
queue <- Queue.bounded[(Long, String)](3)
enqueueTimestamps = queue.contramapM { el: String =>
currentTimeMillis.map((_, el))
}
durations = enqueueTimestamps.mapM { case (enqueueTs, el) =>
currentTimeMillis
.map(dequeueTs => ((dequeueTs - enqueueTs).millis, el))
}
} yield durations
ZQueue#bothWith
We may also compose two queues together into a single queue that broadcasts offers and takes from both of the queues:
val fromComposedQueues: UIO[(Int, String)] =
for {
q1 <- Queue.bounded[Int](3)
q2 <- Queue.bounded[Int](3)
q2Mapped = q2.map(_.toString)
both = q1.bothWith(q2Mapped)((_, _))
_ <- both.offer(1)
iAndS <- both.take
(i, s) = iAndS
} yield (i, s)