Skip to main content
Version: 1.0.18

Schedule

A Schedule[Env, In, Out] is an immutable value that describes a recurring effectful schedule, which runs in some environment Env, after consuming values of type In (errors in the case of retry, or values in the case of repeat) produces values of type Out, and in every step based on input values and the internal state decides to halt or continue after some delay d.

Schedules are defined as a possibly infinite set of intervals spread out over time. Each interval defines a window in which recurrence is possible.

When schedules are used to repeat or retry effects, the starting boundary of each interval produced by a schedule is used as the moment when the effect will be executed again.

A variety of other operators exist for transforming and combining schedules, and the companion object for Schedule contains all common types of schedules, both for performing retrying, as well as performing repetition.

Repeat and Retry

Schedules allow us to define and compose flexible recurrence schedules, which can be used to repeat actions, or retry actions in the event of errors.

Repetition and retrying are two similar concepts in the domain of scheduling. It is the same concept and idea, only one of them looks for successes and the other one looks for failures.

Repeat

In the case of repetition, ZIO has a ZIO#repeat function, which takes a schedule as a repetition policy and returns another effect that describes an effect with repetition strategy according to that policy.

Repeat policies are used in the following functions:

  • ZIO#repeat — Repeats an effect until the schedule is done.
  • ZIO#repeatOrElse — Repeats an effect until the schedule is done, with a fallback for errors.

Note:

Scheduled recurrences are in addition to the first execution, so that io.repeat(Schedule.once) yields an effect that executes io, and then if that succeeds, executes io an additional time.

Let's see how we can create a repeated effect by using ZIO#repeat function:

val action:      ZIO[R, E, A] = ???
val policy: Schedule[R1, A, B] = ???

val repeated = action repeat policy

There is another version of repeat that helps us to have a fallback strategy in case of erros, if something goes wrong we can handle that by using the ZIO#repeatOrElse function, which helps up to add an orElse callback that will run in case of repetition failure:

val action:       ZIO[R, E, A] = ???
val policy: Schedule[R1, A, B] = ???

val orElse: (E, Option[B]) => ZIO[R1, E2, B] = ???

val repeated = action repeatOrElse (policy, orElse)

Retry

In the case of retrying, ZIO has a ZIO#retry function, which takes a schedule as a repetition policy and returns another effect that describes an effect with repetition strategy which will retry following the failure of the original effect.

Repeat policies are used in the following functions:

  • ZIO#retry – Retries an effect until it succeeds.
  • ZIO#retryOrElse — Retries an effect until it succeeds, with a fallback for errors.

Let's see how we can create a repeated effect by using ZIO#retry function:

val action:       ZIO[R, E, A] = ???
val policy: Schedule[R1, E, S] = ???

val repeated = action retry policy

There is another version of retry that helps us to have a fallback strategy in case of erros, if something goes wrong we can handle that by using the ZIO#retryOrElse function, which helps up to add an orElse callback that will run in case of failure of repetition failure:

val action:       ZIO[R, E, A] = ???
val policy: Schedule[R1, A, B] = ???

val orElse: (E, S) => ZIO[R1, E1, A1] = ???

val repeated = action retryOrElse (policy, orElse)

Base Schedules

stop

A schedule that does not recur, just stops and returns one Unit element:

val stop = Schedule.stop

once

A schedule that recurs one time an returns one Unit element:

val once = Schedule.once

forever

A schedule that always recurs and produces number of recurrence at each run:

val forever = Schedule.forever

recurs

A schedule that only recurs the specified number of times:

val recurs = Schedule.recurs(5)

spaced

A schedule that recurs continuously, each repetition spaced the specified duration from the last run:

val spaced = Schedule.spaced(10.milliseconds)

fixed

A schedule that recurs on a fixed interval. Returns the number of repetitions of the schedule so far:

val fixed = Schedule.fixed(10.seconds)

exponential

A schedule that recurs using exponential backoff:

val exponential = Schedule.exponential(10.milliseconds)

fibonacci

A schedule that always recurs, increasing delays by summing the preceding two delays (similar to the fibonacci sequence). Returns the current duration between recurrences:

val fibonacci = Schedule.fibonacci(10.milliseconds)

identity

A schedule that always decides to continue. It recurs forever, without any delay. identity schedule consumes input, and emit the same as output (Schedule[Any, A, A]):

val identity = Schedule.identity[Int]

unfold

A schedule that repeats one time from the specified state and iterator:

val unfold = Schedule.unfold(0)(_ + 1)

succeed

Returns a schedule that repeats one time, producing the specified constant value:

val constant = Schedule.succeed(5)

fromFunction

A schedule that always recurs, mapping input values through the specified function:

val inc = Schedule.fromFunction[Int, Int](_ + 1)

Schedule Combinators

Schedules define stateful, possibly effectful, recurring schedules of events, and compose in a variety of ways. Combinators allow us to take schedules and combine them together to get other schedules and if we have combinators with just the right properties. Then in theory we should be able to solve an infinite number of problems, with only a few combinators and few base schedules.

Composition

Schedules compose in the following primary ways:

  • Union. This performs the union of the intervals of two schedules.
  • Intersection. This performs the intersection of the intervals of two schedules.
  • Sequence. This concatenates the intervals of one schedule onto another.

Union

Combines two schedules through union, by recurring if either schedule wants to recur, using the minimum of the two delays between recurrences.

s1s2s1 | | s2
TypeSchedule[R, A, B]Schedule[R, A, C]Schedule[R, A, (B, C)]
Continute: Booleanb1b2b1 | | b2
Delay: Durationd1d2d1.min(d2)
Emit: (A, B)ab(a, b)

We can combine two schedule through union with || operator:

val expCapped = Schedule.exponential(100.milliseconds) || Schedule.spaced(1.second)

Intersection

Combines two schedules through the intersection, by recurring only if both schedules want to recur, using the maximum of the two delays between recurrences.

s1s2s1 && s2
TypeSchedule[R, A, B]Schedule[R, A, C]Schedule[R, A, (B, C)]
Continute: Booleanb1b2b1 && b2
Delay: Durationd1d2d1.max(d2)
Emit: (A, B)ab(a, b)

We can intersect two schedule with && operator:

val expUpTo10 = Schedule.exponential(1.second) && Schedule.recurs(10)

Sequence

Combines two schedules sequentially, by following the first policy until it ends, and then following the second policy.

s1s2s1 andThen s2
TypeSchedule[R, A, B]Schedule[R, A, C]Schedule[R, A, C]
Delay: Durationd1d2d1 + d2
Emit: Babb

We can sequence two schedule by using andThen:

val sequential = Schedule.recurs(10) andThen Schedule.spaced(1.second)

Piping

Combine two schedules by piping the output of the first schedule to the input of the other. Effects described by the first schedule will always be executed before the effects described by the second schedule.

s1s2s1 >>> s2
TypeSchedule[R, A, B]Schedule[R, B, C]Schedule[R, A, C]
Delay: Durationd1d2d1 + d2
Emit: Babb

We can pipe two schedule by using >>> operator:

val totalElapsed = Schedule.spaced(1.second) <* Schedule.recurs(5) >>> Schedule.elapsed

Jittering

A jittered is a combinator that takes one schedule and returns another schedule of the same type except for the delay which is applied randomly:

FunctionInput TypeOutput Type
jitteredSchedule[Env with Random, In, Out]
jitteredmin: Double, max: DoubleSchedule[Env with Random, In, Out]

We can jitter any schedule by calling jittered on it:

val jitteredExp = Schedule.exponential(10.milliseconds).jittered

When a resource is out of service due to overload or contention, retrying and backing off doesn't help us. If all failed API calls are backed off to the same point of time, they cause another overload or contention. Jitter adds some amount of randomness to the delay of the schedule. This helps us to avoid ending up accidentally synchronizing and taking the service down by accident.

Collecting

A collectAll is a combinator that when we call it on a schedule, produces a new schedule that collects the outputs of the first schedule into a chunk.

FunctionInput TypeOutput Type
collectAllSchedule[Env, In, Out]Schedule[Env, In, Chunk[Out]]

In the following example, we are catching all recurrence of schedule into Chunk, so at the end, it would contain Chunk(0, 1, 2, 3, 4):

val collect = Schedule.recurs(5).collectAll

Filtering

We can filter inputs or outputs of a schedule with whileInput and whileOutput. Alse ZIO schedule has an effectful version of these two functions, whileInputM and whileOutputM.

FunctionInput TypeOutput Type
whileInputIn1 => BooleanSchedule[Env, In1, Out]
whileOutputOut => BooleanSchedule[Env, In, Out]
whileInputMIn1 => URIO[Env1, Boolean]Schedule[Env1, In1, Out]
whileOutputMOut => URIO[Env1, Boolean]Schedule[Env1, In, Out]

In following example we collect all emiting outputs before reaching the 5 output, so it would return Chunk(0, 1, 2, 3, 4):

val res = Schedule.unfold(0)(_ + 1).whileOutput(_ < 5).collectAll

Mapping

There are two versions for mapping schedules, map and its effectful version mapM.

FunctionInput TypeOutput Type
mapf: Out => Out2Schedule[Env, In, Out2]
mapMf: Out => URIO[Env1, Out2]Schedule[Env1, In, Out2]

Left/Right Ap

Sometimes when we intersect two schedules with the && operator, we just need to ignore the left or the right output.

    • *> ignore the left output
    • <* ignore the right output

Modifying

Modifies the delay of a schedule:

val boosted = Schedule.spaced(1.second).delayed(_ => 100.milliseconds)

Tapping

Whenever we need to effectfully process each schedule input/output, we can use tapInput and tapOutput.

We can use these two functions for logging purposes:

val tappedSchedule = Schedule.count.whileOutput(_ < 5).tapOutput(o => putStrLn(s"retrying $o").orDie)

Examples

Stops retrying after a specified amount of time has elapsed:

val expMaxElapsed = (Schedule.exponential(10.milliseconds) >>> Schedule.elapsed).whileOutput(_ < 30.seconds)

Retry only when a specific exception occurs:

import scala.concurrent.TimeoutException

val whileTimeout = Schedule.exponential(10.milliseconds) && Schedule.recurWhile[Throwable] {
case _: TimeoutException => true
case _ => false
}