Skip to main content
Version: 2.x

Supervisor

A Supervisor[A] is allowed to supervise the launching and termination of fibers, producing some visible value of type A from the supervision.

Creation

track

The track creates a new supervisor that tracks children in a set. It takes a boolean weak parameter as input, which indicates whether track children in a Weakset or not.

val supervisor = Supervisor.track(true)
// supervisor: zio.package.UIO[Supervisor[zio.Chunk[zio.Fiber.Runtime[Any, Any]]]] = Sync(
// trace = "repl.MdocSession.MdocApp.supervisor(supervisor.md:14)",
// eval = zio.Supervisor$$$Lambda$19138/0x00007fd94afdcb08@7eff26c1
// )

We can periodically, report the status of the fibers of our program with the help of the Supervisor.

fibersIn

The fibersIn creates a new supervisor with an initial sorted set of fibers.

In the following example we are creating a new supervisor from an initial set of fibers:

def fiberListSupervisor = for { 
ref <- ZIO.succeed(new AtomicReference(SortedSet.from(fibers)))
s <- Supervisor.fibersIn(ref)
} yield (s)

Supervising

Whenever we need to supervise a ZIO effect, we can call ZIO#supervised function, supervised takes a supervisor and return another effect. The behavior of children fibers is reported to the provided supervisor:

val supervised = supervisor.flatMap(s => fib(20).supervised(s))

Now we can access all information of children fibers through the supervisor.

Example

In the following example we are going to periodically monitor the number of fibers throughout our application life cycle:

import zio._
import zio.Fiber.Status

object SupervisorExample extends ZIOAppDefault {

def run = for {
supervisor <- Supervisor.track(true)
fiber <- fib(20).supervised(supervisor).fork
policy = Schedule
.spaced(500.milliseconds)
.whileInputZIO[Any, Unit](_ => fiber.status.map(_ != Status.Done))
logger <- monitorFibers(supervisor)
.repeat(policy).fork
_ <- logger.join
result <- fiber.join
_ <- Console.printLine(s"fibonacci result: $result")
} yield ()

def monitorFibers(supervisor: Supervisor[Chunk[Fiber.Runtime[Any, Any]]]) = for {
length <- supervisor.value.map(_.length)
_ <- Console.printLine(s"number of fibers: $length")
} yield ()

def fib(n: Int): ZIO[Any, Nothing, Int] =
if (n <= 1) {
ZIO.succeed(1)
} else {
for {
_ <- ZIO.sleep(500.milliseconds)
fiber1 <- fib(n - 2).fork
fiber2 <- fib(n - 1).fork
v2 <- fiber2.join
v1 <- fiber1.join
} yield v1 + v2
}

}