Skip to main content
Version: 2.x

ZIO AMQP

ZIO AMQP is a ZIO-based AMQP client for Scala.

Introduction

ZIO AMQP is a ZIO-based wrapper around the RabbitMQ client. It provides a streaming interface to AMQP queues and helps to prevent us from shooting ourselves in the foot with thread-safety issues.

Installation

In order to use this library, we need to add the following line in our build.sbt file:

libraryDependencies += "nl.vroste" %% "zio-amqp" % "0.3.0"

Example

First, let's create an instance of RabbitMQ:

docker run -d --name some-rabbit -p 5672:5672 -p 5673:5673 -p 15672:15672 rabbitmq:3-management

Then we need to create my_exchange and my_queue and bind the queue to the exchange via the RabbitMQ management dashboard (localhost:15672).

Now we can run the example below:

import nl.vroste.zio.amqp._
import nl.vroste.zio.amqp.model._
import zio._

import java.net.URI

object ZIOAMQPExample extends ZIOAppDefault {

val channel: ZIO[Scope, Throwable, Channel] = for {
connection <- Amqp.connect(URI.create("amqp://localhost:5672"))
channel <- Amqp.createChannel(connection)
} yield channel

val myApp: ZIO[Any, Throwable, Unit] =
ZIO.scoped {
for {
channel <- channel
producer: ZIO[Any, Throwable, Long] =
Random.nextUUID
.flatMap(uuid => channel.publish(ExchangeName("my_exchange"), uuid.toString.getBytes).unit)
.schedule(Schedule.spaced(1.seconds))

consumer: ZIO[Any, Throwable, Unit] =
channel
.consume(queue = QueueName("my_queue"), consumerTag = ConsumerTag("my_consumer"))
.mapZIO { record =>
val deliveryTag = record.getEnvelope.getDeliveryTag
Console.printLine(s"Received $deliveryTag: ${new String(record.getBody)}") *>
channel.ack(DeliveryTag(deliveryTag))
}
.runDrain
p <- producer.fork
c <- consumer.fork
_ <- p.zip(c).join
} yield ()
}

override def run: ZIO[Environment with ZIOAppArgs with Scope, Any, Any] = myApp
}