Skip to main content
Version: 2.0.x

Defining flows

Overview

ZIO Flow is based on defining executable workflows as values of the type ZFlow[R, E, A]. This type is similar to ZIO[R, E, A] in that it represents a program as a value that can fail with the type E or succeed with the type A. The most important difference is that a ZFlow value is serializable, which means that it can be sent over the network for a remote executor.

When working with ZFlow programs, another core concept of ZIO Flow is remote values. There is a separate section about working with remote values and remote functions. In this section we will focus on how to define flows and we can assume that remote values and functions work similarly to regular Scala values and functions.

Flow control

The basic building blocks of defining a ZFlow workflow are similar to ZIO. We can use succeed or fail to create a flow that finishes with a result immediately:

import zio.{ZNothing, durationInt}
import zio.flow._

val flow1 = ZFlow.succeed(100)
// flow1: ZFlow[Any, ZNothing, Int] = Return(
// value = Literal(value = Primitive(value = 100, standardType = int))
// )
val flow2 = ZFlow.fail("Failed")
// flow2: ZFlow[Any, String, ZNothing] = Fail(
// error = Literal(value = Primitive(value = "Failed", standardType = string))
// )

Note that the above code does not run anything; it just defines workflows as serializable values of the type ZFlow that can be later sent to an executor. In the future code examples we will not show the evaluated ZFlow values as they can be very verbose.

The usual map, flatMap, as, and variants of zip are all available on ZFlow so we can use for comprehensions to define them:

val flow3 = 
for {
a <- ZFlow.succeed(100)
b <- ZFlow.succeed(200)
} yield a + b

To handle failures we have catchAll, orElse, foldFlow and ensuring:

val flow4 = ZFlow.fail("Failed").ensuring(ZFlow.log("Flow 4 finished"))
val flow5 = ZFlow.fail("Failed").orElse(ZFlow.succeed(1))
val flow6 =
ZFlow
.fail("Failed")
.catchAll { (failure: Remote[String]) =>
ZFlow.log(rs"Failed with $failure") *> ZFlow.succeed(1)
}

Logging

The simplest way to interact with the outside world from a ZIO Flow program is to log a message:

val flow7 = ZFlow.log("Hello world")

The logged message goes to the log output of the executor tagged by the running flow's identifier.

Variables

Persistent variables are named mutable slots belonging to a specific flow. They can be used to communicate between different, possibly parallel steps of a flow, and to expose some observable flow state for debugging purposes. They are similar to the Ref data type in ZIO.

val flow8 = 
for {
var1 <- ZFlow.newVar("var1", 100)
_ <- var1.update(_ + 1)
result <- var1.get
} yield result

Iteration and recursion

There are multiple ways to repeat the execution of a flow.

The simplest one is replicate that repeats the flow a given number of times:

val flow9 = ZFlow.log("hello").replicate(10)

A more complex way is using iterate which allows you to define a step function and a condition and repeats the flow until the condition becomes true. The initial value is the flow's result itself:

val flow10 = ZFlow.log("hello").as(1).iterate((x: Remote[Int]) => x + 1)(_ === 10) 

The result of the flow will be the final value of the iterated variable, in this case 10. Note the use of the triple equal operator (===) instead of the usual double (==). This is a limitation of the remote types, covered in the next section.

Another two operators on flows are recurseSimple and recurse. Why can't we just write recursive Scala functions as usual? Let's take the following example:

def recursiveFlow1(n: Remote[Int]): ZFlow[Any, Nothing, Int] =
ZFlow.ifThenElse(n === 10)(
ifTrue = ZFlow.succeed(10),
ifFalse = recursiveFlow1(n + 1).map(_ + 1)
)
val flow11Bad = recursiveFlow1(0)

Note that we cannot use Scala if () {} else {} expressions because they are not serializable. The ifThenElse method defined on Remote[Boolean] is an alternative of that, explained in the next section. Even though the above code compiles, it is not serializable! ZIO Flow has no way to detect that the call to recursiveFlow1 is a recursion point and it would end up in a stack overflow.

The same example can be safely implemented by using the recurseSimple operator:

val flow11Good = 
ZFlow.recurseSimple[Any, ZNothing, Int](0) { case (value, rec) =>
ZFlow.ifThenElse(value === 10)(
ifTrue = ZFlow.succeed(value),
ifFalse = rec(value + 1)
)
}

recurse is a slightly more complicated variant that allows the recursion body to have a different result type than the initial value. This usually requires specifying some type parameters by hand, so in simple cases recurseSimple leads to cleaner code.

ZIO Flow also has foreach and foreachPar to execute a flow for each element of a list:

val flow12 = ZFlow.foreach(List(1, 2, 3)) { x => ZFlow.log(rs"${x.toString}") }
val flow13 = ZFlow.foreachPar(List(1, 2, 3)) { x => ZFlow.log(rs"${x.toString}") }

Fibers

ZIO Flow programs can fork fibers for parallel execution, similar to how ZIO programs does it. The fork operator returns a value of ExecutingFlow[E, A] which can be used to await or interrupt the execution of the child fiber.

val flow14 =
for {
fiber1 <- ZFlow.sleep(5.minutes).fork
fiber2 <- ZFlow.sleep(1.hour).fork
_ <- fiber2.interrupt
_ <- fiber2.await
} yield ()

It is often useful to timeout after a given duration, for example when waiting for a fiber to finish. This can be done using the timeout operator:

val flow15 = ZFlow.sleep(1.hour).timeout(10.seconds)

The external world

There are other ways for ZIO Flow programs to communicate with the external world than just logging messages. The main categories are:

  • Getting an input for the execution of the flow
  • Accessing some built-in services such as time and random generator
  • Accessing external services

Input

ZIO Flow programs has a third type parameter, R, which was not used in the above examples. This represents the input of the flow. When sending a flow to an executor, if the R type is not Any, you also have to provide an input value of type R as a parameter of the execution.

To access this value from the flow, we can use the input operator:

val flow16 = ZFlow.input[Int]
// flow16: ZFlow[Int, ZNothing, Int] = Input()

Note that the result type is now ZFlow[Int, ZNothing, Int]. The flow will read the input, and use it as its result.

When composing flows, you may want to eliminate the input requirement of a sub-flow. This can be done using the provide operator:

val flow17 = ZFlow.input[Int].provide(100)
// flow17: ZFlow[Any, ZNothing, Int] = Provide(
// value = Literal(value = Primitive(value = 100, standardType = int)),
// flow = Input()
// )

The result type is now ZFlow[Any, ZNothing, Int]. This flow has no input requirement, and will always return 100.

Time

Getting the current time is an important operation for ZIO Flow workflows because it allows time based schedules, generating timestamps or expiration times, etc.

To get the current time as a remote value of Instant, use:

val flow18 = ZFlow.now

Random

The flow may need to generate random values like random numbers, UUIDs etc. The ZFlow type has two random related operators:

val flow19 = ZFlow.random.map(double => rs"Random double: ${double.toString}")
val flow20 = ZFlow.randomUUID.map(uuid => rs"Random UUID: ${uuid.toString}")

There are higher level random functions exposed on zio.flow.Random, reflecting the ZIO Random service's functionalities:

val flow21 = Random.nextIntBetween(10, 100)
val flow22 = Random.nextString(16)

Activities

The primary way to communicate with the external world is by using activities. An activity is defined by the type Activity and it is a description of how to call an external service and how it should behave when it is used in transactions. The activities section explains in detail how to define and use activities.

Scheduling

We have seen that ZIO Flow programs can access the current time, and ways to repeat the execution of a part of the flow. Often we want to either delay the execution of something or repeat it at a given interval.

The two most simple operators are sleep and waitTill. Sleep will delay the execution of the flow for a given duration:

val flow23 = ZFlow.sleep(10.seconds)

The waitTill operator will delay the execution of the flow until a given instant:

val flow24 = ZFlow.waitTill(Instant.parse("2022-12-12T10:00:00Z"))

ZIO Flow also defines a data type for describing more complex schedules. This type is ZFlowSchedule and it can be used as a parameter to the schedule and repeat opeators on ZFlow:

  • schedule will execute the flow once, according to the given schedule
  • repeat will execute the flow immeditely and then repeat it according to the given schedule

The following example logs a message every second:

val flow25 = ZFlow.log("hello").repeat(ZFlowSchedule.fixed(1.second))

Other than fixed there are some other schedule contructors for defining hourly, daily or monthly schedules:

val flow26 = ZFlow.log("hello").repeat(ZFlowSchedule.everyHourAt(11, 0))
val flow27 = ZFlow.log("hello").repeat(ZFlowSchedule.everyDayAt(15, 30, 0))
val flow28 = ZFlow.log("hello").repeat(ZFlowSchedule.everyMonthAt(1, 12, 0, 0))

It is possible to define more complex schedules by using the or (or |) operator. The following example will log a message every hour twice:

val flow29 = ZFlow.log("hello").repeat(
ZFlowSchedule.everyHourAt(11, 0) | ZFlowSchedule.everyHourAt(44, 30)
)

To limit the number of repetions of a schedule, use maxCount:

val flow30 = ZFlow.log("hello").repeat(
ZFlowSchedule.fixed(1.second).maxCount(10)
)

Transactions

ZIO Flow programs can modify variables and perform activities in a transactional way. When transactions are used in a single fiber, it guarantees that in case of failure all the undoable activities performed within the transactions are rolled back.

To learn more about how activities can be rolled back check the activities page.

When using transactions in parallel running ZFlow fibers, the transactions are also tracking the usage of remote variables. If two transactional fibers are modifying the same variable, one of them will retry to make sure the whole transaction remains consistent.

The following simple example shows how the traditional problem of parallel modification of a mutable variable can be solved using ZIO Flow transactions:

val flow31 =
for {
var1 <- ZFlow.newVar[Int]("var1", 10)
now <- ZFlow.now
fib1 <- ZFlow.transaction { _ =>
for {
v1 <- var1.get
_ <- var1.set(v1 + 1)
} yield ()
}.fork
fib2 <- ZFlow.transaction { _ =>
for {
v1 <- var1.get
_ <- var1.set(v1 + 1)
} yield ()
}.fork
_ <- fib1.await
_ <- fib2.await
result <- var1.get
} yield result

This will always return 12 because in case of conflict one of the transactions will be retried.

It is also possible to conditionally retry a transaction. By passing a remote boolean expression, the transaction will evaluate the condition and in case it is false it will undo everything the transaction did and retry it. It also suspends the execution of the retried transactions until any of the variables used in the transaction up to the evaluation of the condition changes. This way it avoids repeated evaluation and failure of the flow.

The following example will retry the transaction two times:

val flow32 =
for {
variable <- ZFlow.newVar("var", 0)
fiber <- ZFlow.transaction { tx =>
for {
value <- variable.get
_ <- ZFlow.log("Transaction executed")
_ <- tx.retryUntil(value === 2)
} yield value
}.fork
_ <- ZFlow.sleep(1.second)
_ <- variable.set(1)
_ <- ZFlow.sleep(1.second)
_ <- variable.set(2)
_ <- ZFlow.log("Waiting for the transaction fiber to stop")
result <- fiber.await
} yield result