Skip to main content
Version: ZIO 2.x

Queue

Queue is a lightweight in-memory queue built on ZIO with composable and transparent back-pressure. It is fully asynchronous (no locks or blocking), purely-functional and type-safe.

A Queue[A] contains values of type A and has two basic operations: offer, which places an A in the Queue, and take which removes and returns the oldest value in the Queue.

import zio._

val res: UIO[Int] = for {
queue <- Queue.bounded[Int](100)
_ <- queue.offer(1)
v1 <- queue.take
} yield v1

Creating a queue

A Queue can be bounded (with a limited capacity) or unbounded.

There are several strategies to process new values when the queue is full:

  • The default bounded queue is back-pressured: when full, any offering fiber will be suspended until the queue is able to add the item
  • A dropping queue will drop new items when the queue is full
  • A sliding queue will drop old items when the queue is full

To create a back-pressured bounded queue:

val boundedQueue: UIO[Queue[Int]] = Queue.bounded[Int](100)

To create a dropping queue:

val droppingQueue: UIO[Queue[Int]] = Queue.dropping[Int](100)

To create a sliding queue:

val slidingQueue: UIO[Queue[Int]] = Queue.sliding[Int](100)

To create an unbounded queue:

val unboundedQueue: UIO[Queue[Int]] = Queue.unbounded[Int]

Adding items to a queue

The simplest way to add a value to the queue is offer:

val res1: UIO[Unit] = for {
queue <- Queue.bounded[Int](100)
_ <- queue.offer(1)
} yield ()

When using a back-pressured queue, offer might suspend if the queue is full: you can use fork to wait in a different fiber.

val res2: UIO[Unit] = for {
queue <- Queue.bounded[Int](1)
_ <- queue.offer(1)
f <- queue.offer(1).fork // will be suspended because the queue is full
_ <- queue.take
_ <- f.join
} yield ()

It is also possible to add multiple values at once with offerAll:

val res3: UIO[Unit] = for {
queue <- Queue.bounded[Int](100)
items = Range.inclusive(1, 10).toList
_ <- queue.offerAll(items)
} yield ()

Consuming Items from a Queue

The take operation removes the oldest item from the queue and returns it. If the queue is empty, this will suspend, and resume only when an item has been added to the queue. As with offer, you can use fork to wait for the value in a different fiber.

val oldestItem: UIO[String] = for {
queue <- Queue.bounded[String](100)
f <- queue.take.fork // will be suspended because the queue is empty
_ <- queue.offer("something")
v <- f.join
} yield v

You can consume the first item with poll. If the queue is empty you will get None, otherwise the top item will be returned wrapped in Some.

val polled: UIO[Option[Int]] = for {
queue <- Queue.bounded[Int](100)
_ <- queue.offer(10)
_ <- queue.offer(20)
head <- queue.poll
} yield head

You can consume multiple items at once with takeUpTo. If the queue doesn't have enough items to return, it will return all the items without waiting for more offers.

val taken: UIO[Chunk[Int]] = for {
queue <- Queue.bounded[Int](100)
_ <- queue.offer(10)
_ <- queue.offer(20)
chunk <- queue.takeUpTo(5)
} yield chunk

Similarly, you can get all items at once with takeAll. It also returns without waiting (an empty collection if the queue is empty).

val all: UIO[Chunk[Int]] = for {
queue <- Queue.bounded[Int](100)
_ <- queue.offer(10)
_ <- queue.offer(20)
chunk <- queue.takeAll
} yield chunk

Shutting Down a Queue

It is possible with shutdown to interrupt all the fibers that are suspended on offer* or take*. It will also empty the queue and make all future calls to offer* and take* terminate immediately.

val takeFromShutdownQueue: UIO[Unit] = for {
queue <- Queue.bounded[Int](3)
f <- queue.take.fork
_ <- queue.shutdown // will interrupt f
_ <- f.join // Will terminate
} yield ()

You can use awaitShutdown to execute an effect when the queue is shut down. This will wait until the queue is shut down. If the queue is already shut down, it will resume right away.

val awaitShutdown: UIO[Unit] = for {
queue <- Queue.bounded[Int](3)
p <- Promise.make[Nothing, Boolean]
f <- queue.awaitShutdown.fork
_ <- queue.shutdown
_ <- f.join
} yield ()

Additional Resources