Skip to main content
Version: 2.x

How to Interop with Cats Effect?

Introduction​

interop-cats has instances for the Cats, Cats MTL and Cats Effect libraries, which allow you to use ZIO with any libraries that rely on these, like Doobie, Http4s, FS2 or Circe

Depends on which version of Cats Effect we are using, we should pick the right version of zio-interop-cats. In this tutorial, whenever we're working with Cats Effect 2.x, we are using:

libraryDependencies += "dev.zio" %% "zio-interop-cats" % "2.5.1.0"

And whenever we are using Cats Effect 3.x instances, we are using:

libraryDependencies += "dev.zio" %% "zio-interop-cats" % "3.1.1.0"

Most of the interop functionality resides in the following package:

import zio.interop.catz._

Cats Effect Instances​

ZIO integrates with Typelevel libraries by providing instances of Cats Effect type classes. These type classes are used by fs2, doobie, http4s, and a variety of other libraries in the functional Scala ecosystem.

Due to the limitations of the Cats Effect, ZIO cannot provide instances for arbitrary error types. Instead, we can obtain instances only for effects whose error type extends Throwable.

For convenience, ZIO includes the Task and RIO type aliases, which fix the error type to Throwable, and may be useful for interop with Cats Effect:

import zio.{ZIO, Task, RIO}
type Task[+A]    = ZIO[Any, Throwable, A]
type RIO[-R, +A] = ZIO[ R, Throwable, A]

Providing Runtime Manually​

To use Cats Effect instances for these types, we should have an implicit Runtime[R] in scope for the environment type of our effects. The following code snippet creates an implicit Runtime for all the modules built into ZIO:

import cats.implicits._
import zio.interop.catz._

object ZioCatsEffectInterop extends scala.App {
def catsEffectApp[F[_]: cats.effect.Sync]: F[Unit] =
cats.effect.Sync[F].delay(
println("Hello from Cats Effect World!")
)

implicit val runtime: zio.Runtime[Any] = zio.Runtime.default

val zioApp: zio.Task[Unit] = catsEffectApp[zio.Task]
runtime.unsafeRun(zioApp.exitCode)
}

If we are working with Cats Effect 3.x, the catsEffectApp[Task] will be expanded as if we called the following code explicitly:

import ZioCatsEffectInterop.catsEffectApp
object ZioCatsEffectInterop extends scala.App {
val runtime: zio.Runtime[Any] = zio.Runtime.default

val zioApp: zio.Task[Unit] = catsEffectApp[zio.Task](
zio.interop.catz.asyncRuntimeInstance(runtime)
)

runtime.unsafeRun(zioApp.exitCode)
}

And if we are working with Cats Effect 2.x, it will be expanded as if we called following code explicitly:

object ZioCatsEffectInterop extends scala.App {
val runtime: zio.Runtime[Any] = zio.Runtime.default

val zioApp = catsEffectApp[zio.Task](zio.interop.catz.taskConcurrentInstance)
runtime.unsafeRun(zioApp.exitCode)
}

If we are using RIO for a custom environment R, then we will have to create our own Runtime[R], and ensure that implicit wherever we need Cats Effect instances.

Using CatsApp Runtime​

As a convenience, our application can extend CatsApp, which automatically brings an implicit Runtime[Any] into our scope:

import zio.interop.catz._
import cats.implicits._

object ZioCatsEffectInteropWithCatsApp extends CatsApp {
def catsEffectApp[F[_]: cats.effect.Sync]: F[Unit] =
cats.effect.Sync[F].delay(println("Hello from Cats Effect World!"))

override def run(args: List[String]): zio.URIO[Any, zio.ExitCode] =
catsEffectApp[zio.Task].exitCode
}

This example works properly in both Cats Effect 2.x and 3.x versions.

Cats Effect 2.x​

Timer Instance​

In order to get a cats.effect.Timer[zio.Task] instance, we need an extra import (zio.interop.catz.implicits._):

import java.util.concurrent.TimeUnit

import cats.implicits._
import zio.interop.catz._
import zio.interop.catz.implicits._ // Provides `zio.Task instance` for `cats.effect.Time` type class
import zio.{ ExitCode, Task, URIO }

import scala.concurrent.duration.DurationInt

object ZioCatsEffectTimerInterop extends zio.interop.catz.CatsApp {
override def run(args: List[String]): zio.URIO[Any, zio.ExitCode] =
catsEffectTimerApp[zio.Task].exitCode

def catsEffectTimerApp[F[_]: cats.effect.Clock: cats.effect.Timer: cats.effect.Sync]: F[Unit] = for {
t2 <- cats.effect.Clock[F].monotonic(TimeUnit.SECONDS)
_ <- cats.effect.Timer[F].sleep(2.seconds)
t1 <- cats.effect.Clock[F].monotonic(TimeUnit.SECONDS)
_ <- cats.effect.Sync[F].delay(println(t1 - t2))
} yield ()
}

The reason a cats.effect.Timer[zio.Task] instance is not provided by the default interop import is that it makes testing programs that require timing capabilities very difficult. The extra import (wherever needed) makes reasoning about timing-related effects much easier.

If we're using RIO for a custom environment then our environment must use the Clock service, e.g. R <: Clock to get a timer.

Converting Resource to ZManaged​

We have an extension method defined on Resource called Resource#toManaged which converts Resource to ZManaged. For example, assume we have the following File API:

case class File[F[_]: cats.effect.Sync]() {
import cats.syntax.apply._
def read: F[String] =
cats.effect.Sync[F].delay(println("Reading file.")) *>
cats.effect.Sync[F].pure("Hello, World!")
def close: F[Unit] =
cats.effect.Sync[F].delay(println("Closing file."))
}

object File {
import cats.syntax.apply._
def open[F[_]: cats.effect.Sync](name: String): F[File[F]] =
cats.effect.Sync[F].delay(println(s"opening $name file")) *>
cats.effect.Sync[F].delay(File())
}

And, also assume we have fileResource defined as follows:

def fileResource[F[_]: cats.effect.Sync](name: String): cats.effect.Resource[F, File[F]] =
cats.effect.Resource.make(File.open[F](name))(_.close)

Let's convert that to ZManaged:

val resource: zio.ZManaged[Any, Throwable, File[zio.Task]] =
fileResource[zio.Task]("log.txt").toManaged

Here is a complete working example:

import zio.interop.catz._

object CatsEffectResourceInterop extends CatsApp {
def fileResource[F[_]: cats.effect.Sync](name: String): cats.effect.Resource[F, File[F]] =
cats.effect.Resource.make(File.open[F](name))(_.close)

def myApp: zio.ZIO[Any, Throwable, Unit] = for {
c <- fileResource[zio.Task]("log.txt").toManaged.use(_.read)
_ <- zio.Console.printLine(s"file content: $c")
} yield ()

override def run(args: List[String]): zio.URIO[Any, zio.ExitCode] =
myApp.exitCode
}

Converting ZManaged to Resource​

We have an extension method on ZManaged called ZManaged#toResource which converts a ZIO managed resource to Cats Effect resource:

final class ZManagedSyntax[R, E, A](private val managed: ZManaged[R, E, A]) {
def toResource[F[_]](implicit
F: Async[F],
ev: Effect[ZIO[R, E, *]]
): Resource[F, A] = ???
}

Let's try an example:

import zio.interop.catz._

object ZManagedToResource extends cats.effect.IOApp {
implicit val zioRuntime: zio.Runtime[Any] = zio.Runtime.default

val resource: cats.effect.Resource[cats.effect.IO, java.io.InputStream] =
zio.ZManaged
.fromAutoCloseable(
zio.ZIO.effect(
java.nio.file.Files.newInputStream(
java.nio.file.Paths.get("file.txt")
)
)
)
.toResource[cats.effect.IO]

val effect: cats.effect.IO[Unit] =
resource
.use { is =>
cats.effect.IO.delay(is.readAllBytes())
}
.flatMap(bytes =>
cats.effect.IO.delay(
println(s"file length: ${bytes.length}")
)
)

override def run(args: List[String]): cats.effect.IO[cats.effect.ExitCode] =
effect.as(cats.effect.ExitCode.Success)
}

Converting Resource to Scoped​

We have an extension method defined on Resource called Resource#toScoped which converts Resource to ZIO with Scope.

For example, assume we have the following File API:

case class File[F[_]: cats.effect.Sync]() {
import cats.syntax.apply._
def read: F[String] =
cats.effect.Sync[F].delay(println("Reading file.")) *>
cats.effect.Sync[F].pure("Hello, World!")
def close: F[Unit] =
cats.effect.Sync[F].delay(println("Closing file."))
}

object File {
import cats.syntax.apply._
def open[F[_]: cats.effect.Sync](name: String): F[File[F]] =
cats.effect.Sync[F].delay(println(s"opening $name file")) *>
cats.effect.Sync[F].delay(File())
}

And, also assume we have fileResource defined as follows:

def fileResource[F[_]: cats.effect.Sync](name: String): cats.effect.Resource[F, File[F]] =
cats.effect.Resource.make(File.open[F](name))(_.close)

Let's convert that to scoped ZIO:

val scoped: ZIO[Scope, Throwable, File[zio.Task]] =
fileResource[zio.Task]("log.txt").toScoped

Here is a complete working example:

import zio.interop.catz._

object CatsEffectResourceInterop extends CatsApp {
def fileResource[F[_]: cats.effect.Sync](name: String): cats.effect.Resource[F, File[F]] =
cats.effect.Resource.make(File.open[F](name))(_.close)

def myApp: zio.ZIO[Scope, Throwable, Unit] = for {
c <- fileResource[zio.Task]("log.txt").toScoped
_ <- zio.Console.printLine(s"file content: $c")
} yield ()

override def run(args: List[String]): zio.URIO[Scope, zio.ExitCode] =
myApp.exitCode
}

Converting Scoped to Resource​

We have an extension method defined on Resource called Resource#scoped which creates a Resource from ZIO with Scope.

Let's try an example:

import zio.interop.catz._

object ZManagedToResource extends cats.effect.IOApp {
implicit val zioRuntime: zio.Runtime[Any] = zio.Runtime.default

val resource: cats.effect.Resource[cats.effect.IO, java.io.InputStream] = {
val scopedZIO: ZIO[Any with Scope, Throwable, InputStream]= ZIO
.fromAutoCloseable(
zio.ZIO.attempt(
java.nio.file.Files.newInputStream(
java.nio.file.Paths.get("crawl.log")
)
)
)

Resource.scoped[IO, Any](scopedZIO)
}

val effect: cats.effect.IO[Unit] =
resource
.use { is =>
cats.effect.IO.delay(is.readAllBytes())
}
.flatMap(bytes =>
cats.effect.IO.delay(
println(s"file length: ${bytes.length}")
)
)

override def run(args: List[String]): cats.effect.IO[cats.effect.ExitCode] =
effect.as(cats.effect.ExitCode.Success)
}

Cats Effect 3.x​

Type class Instances for ZIO's Task​

ZIO integrates with Cats Effect 3.x as well as 2.x. The interop-cats module provides Concurrent, Temporal and Async for zio.Task.

An example of ZIO interoperability with CE3:

import cats.implicits._
import zio.interop.catz._
import scala.concurrent.duration.DurationInt

object ZioCatsEffectInterop extends zio.interop.catz.CatsApp {

def catsEffectTimerApp[F[_]: cats.effect.Async]: F[Unit] = for {
t2 <- cats.effect.Clock[F].monotonic
_ <- cats.effect.Temporal[F].sleep(2.seconds)
t1 <- cats.effect.Clock[F].monotonic
_ <- cats.effect.Sync[F].delay(println(t1 - t2))
} yield ()

override def run(args: List[String]): zio.URIO[Any, zio.ExitCode] = {
catsEffectTimerApp[zio.Task].exitCode
}
}

Cats Core​

There is another package in interop-cats module called zio.interop.catz.core._ which helps us to interop with core data types. This package contains instances of zio.Chunk data type for Cats Core module like cats.Order, cats.Hash, cats.Traverse, and so forth.

In the following example, we are going to use zio.Chunk in a Cats Effect application:

import cats.implicits._
import zio.interop.catz.core._

object ZioInteropWithCatsCore extends cats.effect.IOApp {
val chunk = zio.Chunk("1", "2", "3", "4", "5")

def parseInt(s: String): Option[Int] =
Either.catchOnly[NumberFormatException](s.toInt).toOption

val parseAll = cats.Traverse[zio.Chunk].traverse(chunk)(parseInt)

override def run(args: List[String]): cats.effect.IO[cats.effect.ExitCode] =
cats.effect.IO.println(parseAll).as(cats.effect.ExitCode.Success)
}

This package also contains utilities to support zio.NonEmptyChunk interoperability with Cats Core module.

FS2 Streams​

The interop-cats module contains extension methods to convert FS2 Stream to ZStream and vice versa. These methods support both FS2 series, 2.x and 3.x:

From FS2 Stream to ZStream​

By importing zio.stream.interop.fs2z._ into our application, the fs2.Stream#toZStream extension method converts a fs2.Stream to ZStream:

import zio.stream.ZStream
import zio.stream.interop.fs2z._
val zstream: ZStream[Any, Throwable, Int] = fs2.Stream.range(1, 10).toZStream()

From ZStream to FS2 Stream​

Also, the ZStream#toFs2Stream converts a ZIO Stream into FS2 Stream:

import zio.stream.ZStream
import zio.Chunk
import zio.stream.interop.fs2z._
val fs2stream = ZStream.fromChunks(Chunk(1, 2, 3, 4)).toFs2Stream

Using Queue with Cats Effect​

The interop-cats library has an import zio.interop.Queue package to lift creation of Queue effect from UIO[Queue[A]] to F[Queue[F, A]] which enables us to run Queue under Cats Effect runtime. It supports all variants of Queue like bounded, unbounded, sliding and dropping.

def bounded[F[+_], A](capacity: Int)(implicit R: Runtime[ZEnv], F: LiftIO[F]): F[Queue[F, A]]

Cats Effect 2.x​

In the following example, we are going to lift the Queue creation effect to Cats IO effect. If we are integrating with 2.x Cats Effect library, this snippet works properly:

import zio.interop.Queue
import cats.effect.IO

implicit val runtime = Runtime.default
def liftedToIO: IO[List[Int]] = for {
q <- Queue.bounded[IO, Int](100)
_ <- q.offer(1)
_ <- q.offer(2)
r <- q.takeAll
} yield (r)

Cats Effect 3.x​

To run Queue with Cats Effect 3.x we also need to provide an instance of Dispatcher to our contextual environment:

import zio.interop.Queue

object ZioQueueInteropWithCats extends scala.App {

implicit val ceRuntime: cats.effect.unsafe.IORuntime =
cats.effect.unsafe.IORuntime.global

implicit val zioRuntime: zio.Runtime[Any] =
zio.Runtime.default

implicit val ec: scala.concurrent.ExecutionContextExecutor =
scala.concurrent.ExecutionContext.global

implicit val dispatcher: cats.effect.std.Dispatcher[cats.effect.IO] =
cats.effect.std
.Dispatcher[cats.effect.IO]
.allocated
.unsafeRunSync()
._1

def liftedToIO: cats.effect.IO[List[Int]] = for {
q <- Queue.bounded[cats.effect.IO, Int](100)
_ <- q.offer(1)
_ <- q.offer(2)
r <- q.takeAll
} yield (r)

val catsApp = liftedToIO
.flatMap { e =>
cats.effect.IO.println(s"List of elements retrieved from Queue: $e")
}
.as(cats.effect.ExitCode.Success)

catsApp.unsafeRunSync()
}

Using STM with Cats Effect​

The zio.interop.stm provides a wrapper data type on STM[Throwable, A] which enables us to run STM with the Cats Effect library.

Currently, the interop-cats support TRef, TPromise, TQueue and TSemaphore data types.

Let's try a working example using STM and TRef:

import cats.effect.IO
import cats.effect.unsafe.IORuntime
import zio.interop.stm.{STM, TRef}

implicit val zioRuntime: zio.Runtime[Any] = zio.Runtime.default
implicit val catsRuntime: IORuntime = IORuntime.global

def transferMoney(
from: TRef[IO, Long],
to: TRef[IO, Long],
amount: Long
): STM[IO, Long] =
for {
senderBal <- from.get
_ <-
if (senderBal < amount)
STM.fail[IO](new Exception("Not enough money"))
else
STM.unit[IO]
_ <- from.update(existing => existing - amount)
_ <- to.update(existing => existing + amount)
recvBal <- to.get
} yield recvBal

val program: IO[Long] = for {
sndAcc <- STM.atomically[cats.effect.IO, TRef[IO, Long]](
TRef.make[IO, Long](1000)
)
rcvAcc <- STM.atomically[cats.effect.IO, TRef[IO, Long]](
TRef.make[IO, Long](200)
)
recvAmt <- STM.atomically(transferMoney(sndAcc, rcvAcc, 500L))
} yield recvAmt

program
.flatMap(amount =>
IO.println(s"Balance of second account after transaction: $amount")
)
.unsafeRunSync()

Examples​

Cats Effect and Type-Level libraries are older than the ZIO ecosystem. So there are very nice libraries like doobie and http4s, that a ZIO user would like to use in its application.

We have provided some full working example of using these important libraries:

Using ZIO with Doobie​

The following example shows how to use ZIO with Doobie (a library for JDBC access) and FS2 (a streaming library), which both rely on Cats Effect instances:

// This snippet works with both CE2 and CE3
import doobie._
import doobie.implicits._
import fs2.Stream
import zio.Task
import zio.interop.catz._

implicit val zioRuntime: zio.Runtime[Any] = zio.Runtime.default

case class User(id: String, name: String, age: Int)

def xa: Transactor[Task] =
Transactor.fromDriverManager[Task](
"org.h2.Driver",
"jdbc:h2:mem:users;DB_CLOSE_DELAY=-1"
)

def createTable: doobie.ConnectionIO[Int] =
sql"""|CREATE TABLE IF NOT EXISTS USERS(
|id INT SERIAL UNIQUE,
|name VARCHAR NOT NULL UNIQUE,
|age SMALLINT
|)""".stripMargin.update.run

def dropTable: doobie.ConnectionIO[Int] =
sql"""DROP TABLE IF EXISTS USERS""".update.run

def insert(name: String, age: Int): doobie.ConnectionIO[Int] =
sql"insert into users (name, age) values ($name, $age)".update.run

def loadUsers: Stream[doobie.ConnectionIO, User] =
sql"""SELECT * FROM users""".query[User].stream

val doobieApp: Stream[doobie.ConnectionIO, User] = for {
_ <- fs2.Stream.eval(dropTable)
_ <- fs2.Stream.eval(createTable)
_ <- fs2.Stream.eval(insert("Olivia", 21))
_ <- fs2.Stream.eval(insert("Oliver", 30))
u <- loadUsers
} yield u

val run: Stream[Task, User] = doobieApp.transact(xa)

val allUsers: List[User] =
zioRuntime.unsafeRun(run.compile.toList)

Sounds good, but how can we specify a specialized transactor than the default one? Creating a customized transactor in CE2 differs from CE3.

Let's try doing that in each of which:

Customized Transactor (CE2)​

ZIO provides a specific blocking thread pool for blocking operations. The doobie-hikari module helps us create a transactor with two separated executors, one for blocking operations, and the other one for non-blocking operations. So we shouldn't run blocking JDBC operations or perform awaiting connections to the database on the main thread pool.

So let's fix this issue in the previous example. In the following snippet we are going to create a ZMHikari of Hikari transactor. In this example we are using 0.13.4 version of doobie which supports CE2:

import zio.ZManaged
import zio.{ Runtime, Task, ZIO, ZManaged }
import doobie.hikari.HikariTransactor
import cats.effect.Blocker
import zio.interop.catz._

def transactor: ZManaged[Any, Throwable, HikariTransactor[Task]] =
for {
rt <- ZIO.runtime[Any].toManaged
be <- ZIO.blockingExecutor.toManaged // our blocking EC
xa <- HikariTransactor
.newHikariTransactor[Task](
"org.h2.Driver", // driver classname
"jdbc:h2:mem:test;DB_CLOSE_DELAY=-1", // connect URL
"sa", // username
"", // password
be.asExecutionContext, // await connection here
Blocker.liftExecutionContext(be.asExecutionContext) // execute JDBC operations here
)
.toManagedZIO
} yield xa

Now we can transact our doobieApp with this transactor and convert that to the ZIO effect:

val zioApp: ZIO[Any, Throwable, List[User]] =
transactor.use(xa => doobieApp.transact(xa).compile.toList)

Customized Transactor (CE3)​

In Cats Effect 3.x, the cats.effect.Blocker has been removed. So the transactor constructor doesn't require us a blocking executor; it happens under the hood using the Sync[F].blocking operation.

To create a Transactor in CE3, we need to create an instance of Dispatcher for zio.Task. The following example is based on Doobie's 1.0.0-M5 version which supports CE3:

import doobie.hikari.HikariTransactor
import zio.interop.catz._
import zio.{Task, ZIO, ZManaged}

implicit val zioRuntime: zio.Runtime[Any] =
zio.Runtime.default

implicit val dispatcher: cats.effect.std.Dispatcher[zio.Task] =
zioRuntime
.unsafeRun(
cats.effect.std
.Dispatcher[zio.Task]
.allocated
)
._1

def transactor: ZManaged[Any, Throwable, HikariTransactor[Task]] =
for {
rt <- ZIO.runtime[Any].toManaged
xa <-
HikariTransactor
.newHikariTransactor[Task](
"org.h2.Driver", // driver classname
"jdbc:h2:mem:test;DB_CLOSE_DELAY=-1", // connect URL
"sa", // username
"", // password
rt.runtimeConfig.executor.asExecutionContext // await connection here
)
.toManaged
} yield xa

Now we can transact our doobieApp with this transactor and convert that to the ZIO effect:

val zioApp: ZIO[Any, Throwable, List[User]] =
transactor.use(xa => doobieApp.transact(xa).compile.toList)

Http4s​

Here is the full working example of using http4s in ZIO App:

Cats Effect 2.x​

The following example is based on http4s's 0.21.24 version which supports CE2:

import cats.effect.{ConcurrentEffect, Sync, Timer}
import cats.implicits._
import fs2.Stream
import org.http4s.HttpRoutes
import org.http4s.client.blaze.BlazeClientBuilder
import org.http4s.dsl.Http4sDsl
import org.http4s.server.blaze.BlazeServerBuilder
import zio.interop.catz._
import zio.interop.catz.implicits._
import zio.{Task, URIO}

import scala.concurrent.ExecutionContext.global

object ZioHttp4sInterop extends CatsApp {
def run(args: List[String]): URIO[Any, zio.ExitCode] =
stream[Task].compile.drain.exitCode

def stream[F[_]: ConcurrentEffect: Timer]: Stream[F, Nothing] = {
import org.http4s.implicits._
val httpApp = helloWorldRoute[F].orNotFound
for {
_ <- BlazeClientBuilder[F](global).stream
exitCode <- BlazeServerBuilder[F](global)
.bindHttp(8080, "0.0.0.0")
.withHttpApp(httpApp)
.serve
} yield exitCode
}.drain

def helloWorldRoute[F[_]: Sync]: HttpRoutes[F] = {
val dsl = new Http4sDsl[F]{}
import dsl._
HttpRoutes.strict[F] {
case GET -> Root =>
Ok("Hello, World!")
}
}
}

Cats Effect 3.x​

The following example is based on http4s's 0.23.0-RC1 version which supports CE3:

import cats.Applicative
import cats.effect.Async
import fs2.Stream
import org.http4s.HttpRoutes
import org.http4s.blaze.client.BlazeClientBuilder
import org.http4s.blaze.server.BlazeServerBuilder
import org.http4s.dsl.Http4sDsl
import zio.interop.catz._
import zio.{Task, URIO}

import scala.concurrent.ExecutionContext.global

object ZioHttp4sInterop extends zio.interop.catz.CatsApp {
def stream[F[_]: Async]: Stream[F, Nothing] = {
import org.http4s.implicits._
val httpApp = helloWorldRoute[F].orNotFound
for {
_ <- BlazeClientBuilder[F](global).stream
exitCode <- BlazeServerBuilder[F](global)
.bindHttp(8080, "0.0.0.0")
.withHttpApp(httpApp)
.serve
} yield exitCode
}.drain

def helloWorldRoute[F[_]: Applicative]: HttpRoutes[F] = {
val dsl = new Http4sDsl[F] {}
import dsl._
HttpRoutes.strict[F] { case GET -> Root =>
Ok("Hello, World!")
}
}

def run(args: List[String]): URIO[Any, zio.ExitCode] =
stream[Task].compile.drain.exitCode
}