Queue
Queue
is a lightweight in-memory queue built on ZIO with composable and transparent back-pressure. It is fully asynchronous (no locks or blocking), purely-functional and type-safe.
A Queue[A]
contains values of type A
and has two basic operations: offer
, which places an A
in the Queue
, and take
which removes and returns the oldest value in the Queue
.
import zio._
val res: UIO[Int] = for {
queue <- Queue.bounded[Int](100)
_ <- queue.offer(1)
v1 <- queue.take
} yield v1
Creating a queue​
A Queue
can be bounded (with a limited capacity) or unbounded.
There are several strategies to process new values when the queue is full:
- The default
bounded
queue is back-pressured: when full, any offering fiber will be suspended until the queue is able to add the item - A
dropping
queue will drop new items when the queue is full - A
sliding
queue will drop old items when the queue is full
To create a back-pressured bounded queue:
val boundedQueue: UIO[Queue[Int]] = Queue.bounded[Int](100)
To create a dropping queue:
val droppingQueue: UIO[Queue[Int]] = Queue.dropping[Int](100)
To create a sliding queue:
val slidingQueue: UIO[Queue[Int]] = Queue.sliding[Int](100)
To create an unbounded queue:
val unboundedQueue: UIO[Queue[Int]] = Queue.unbounded[Int]
Adding items to a queue​
The simplest way to add a value to the queue is offer
:
val res1: UIO[Unit] = for {
queue <- Queue.bounded[Int](100)
_ <- queue.offer(1)
} yield ()
When using a back-pressured queue, offer might suspend if the queue is full: you can use fork
to wait in a different fiber.
val res2: UIO[Unit] = for {
queue <- Queue.bounded[Int](1)
_ <- queue.offer(1)
f <- queue.offer(1).fork // will be suspended because the queue is full
_ <- queue.take
_ <- f.join
} yield ()
It is also possible to add multiple values at once with offerAll
:
val res3: UIO[Unit] = for {
queue <- Queue.bounded[Int](100)
items = Range.inclusive(1, 10).toList
_ <- queue.offerAll(items)
} yield ()
Consuming Items from a Queue​
The take
operation removes the oldest item from the queue and returns it. If the queue is empty, this will suspend, and resume only when an item has been added to the queue. As with offer
, you can use fork
to wait for the value in a different fiber.
val oldestItem: UIO[String] = for {
queue <- Queue.bounded[String](100)
f <- queue.take.fork // will be suspended because the queue is empty
_ <- queue.offer("something")
v <- f.join
} yield v
You can consume the first item with poll
. If the queue is empty you will get None
, otherwise the top item will be returned wrapped in Some
.
val polled: UIO[Option[Int]] = for {
queue <- Queue.bounded[Int](100)
_ <- queue.offer(10)
_ <- queue.offer(20)
head <- queue.poll
} yield head
You can consume multiple items at once with takeUpTo
. If the queue doesn't have enough items to return, it will return all the items without waiting for more offers.
val taken: UIO[Chunk[Int]] = for {
queue <- Queue.bounded[Int](100)
_ <- queue.offer(10)
_ <- queue.offer(20)
chunk <- queue.takeUpTo(5)
} yield chunk
Similarly, you can get all items at once with takeAll
. It also returns without waiting (an empty collection if the queue is empty).
val all: UIO[Chunk[Int]] = for {
queue <- Queue.bounded[Int](100)
_ <- queue.offer(10)
_ <- queue.offer(20)
chunk <- queue.takeAll
} yield chunk
Shutting Down a Queue​
It is possible with shutdown
to interrupt all the fibers that are suspended on offer*
or take*
. It will also empty the queue and make all future calls to offer*
and take*
terminate immediately.
val takeFromShutdownQueue: UIO[Unit] = for {
queue <- Queue.bounded[Int](3)
f <- queue.take.fork
_ <- queue.shutdown // will interrupt f
_ <- f.join // Will terminate
} yield ()
You can use awaitShutdown
to execute an effect when the queue is shut down. This will wait until the queue is shut down. If the queue is already shut down, it will resume right away.
val awaitShutdown: UIO[Unit] = for {
queue <- Queue.bounded[Int](3)
p <- Promise.make[Nothing, Boolean]
f <- queue.awaitShutdown.fork
_ <- queue.shutdown
_ <- f.join
} yield ()