Internals
This page contains information about some implementation details of ZIO Flow which are not necessary to know about for regular use of the system.
The Remote type​
The Remote
data type is a core concept in ZIO Flow. The most important feature a Remote
provides is that it can be
evaluated. It is important to understand that evaluating a Remote[A]
does not produce a type of A
!
The evaluateDynamic
function provides a DynamicValue
, which is a generic data type from zio-schema
.
This DynamicValue
must be something isomorphic to the A
type the remote value represents. So in case we have
a Schema[A]
we can convert this DynamicValue
back to a typed A
. Remote
itself contains a helper method
called .eval
that requires an implicit schema for A
and thus it returns with a typed value instead of
the DynamicValue
. This method is only used in special cases in the executor though, because the executor does not
always have a schema for the values it is working with.
Schemas and dynamic values​
This "limitation" allows us to work with user-defined types in ZIO Flow programs without the need to inject these types
into the server's classpath! Another reason is that we cannot serialize a Schema
that defines transformations via
Scala functions. If we just use the dynamic values on the server side we can run all the flow steps and only convert to
actual typed representation when necessary.
There are three main cases when converting to typed value is required:
- When a flow finishes running, the user may want to get a typed result. This is OK because it happens on the "client
side" (where the flow is defined, not on the executor). This is the process where our custom types are defined so we
have the necessary
Schema
to convert the dynamic result value to the expected one. - When calling external services, for example using
Operation.Http
, the serialization of the parameters like request body needs to know itsSchema
. This is a schema that is serialized as part of the flow, because it is used on the server side. So this is not the same schema that the client side has, but it is still necessary because it may contain some additional information required for the serialization codec to produce the expected format. - When performing some server-side operations that are "native". For example performing numerical operations is
something that is implemented by calling the underlying Java implementation for those numeric operations. To do so, we
need to convert the
DynamicValue
to the given numeric type to be able to call the native implementation.
Let's go through an example!
We define a case class with a schema:
import zio.durationInt
import zio.flow._
import zio.flow.operation.http._
import zio.schema._
final case class Example1(name: String, value: Int)
object Example1 {
implicit val schema = DeriveSchema.gen[Example1]
}
then we store this as a Remote
value:
val remote1 = Remote(Example1("something", 1))
// remote1: Remote[Example1] = Literal(
// value = Record(
// id = Nominal(
// packageName = IndexedSeq("repl"),
// objectNames = IndexedSeq("MdocSession", "MdocApp"),
// typeName = "Example1"
// ),
// values = ListMap(
// "name" -> Primitive(value = "something", standardType = string),
// "value" -> Primitive(value = 1, standardType = int)
// )
// )
// )
This will first convert the Example1
value to a DynamicValue
and then wrap it in a Remote.Literal
constructor.
This particular remote constructor does not store anything else than the dynamic value. The schema of Example1
is not
transferred to the executor. Evaluating it just returns the dynamic value itself. To call .eval
and get
back Example1
we need to provide the schema, which we only have on the definition side.
Now let's assume that we have an activity that requires an Example1
value as its input:
val activity1: Activity[Example1, Unit] =
Activity(
"example activity",
"",
operation = Operation.Http(
host = "https://example.com",
API
.post("test")
.input[Example1]
.output[Unit]
),
check = Activity.checkNotSupported,
compensate = Activity.compensateNotSupported
)
We can pass remote1
to activity1
to perform the HTTP request:
val flow1 = activity1(remote1)
This translates to a ZFlow.RunActivity
value that connects a Remote
and an Activity
. The activity, however,
through the operation
field, stores the input and output schema. This means that when we serialize flow1
, we also
serialize Example1.schema
as part of it. Serializing a schema means converting it to a MetaSchema
, then on the
server side we deserialize a MetaSchema
and produce a Schema
from it. On the server side, however, we don't know
anything about the Example1
Scala class at all! So the deserialized schema on the server side will be
a GenericRecord
, which stores its fields in a ListMap[String, _]
. That's a representation isomorphic to the original
case class, so the server can work with it.
Remote function application​
Let's see how remote function application works.
First we define a remote function as a regular Scala function:
val f1 = (x: Remote[Int]) => x + 1
// f1: Remote[Int] => Remote[Int] = <function1>
This is not a serializable Remote
value yet, it is a Scala function. So we have to first convert it to
a Remote.UnboundRemoteFunction
:
val f2 = Remote.UnboundRemoteFunction.make(f1)
// f2: Remote.UnboundRemoteFunction[Int, Int] = UnboundRemoteFunction(
// input = Unbound(identifier = ce0fdfae-90df-4cf5-b49d-c165ff9c1b2b),
// result = Binary(
// left = Unbound(identifier = ce0fdfae-90df-4cf5-b49d-c165ff9c1b2b),
// right = Literal(value = Primitive(value = 1, standardType = int)),
// operator = Numeric(
// operator = Add,
// numeric = zio.flow.remote.numeric.Numeric$NumericInt$@6533bc8e
// )
// )
// )
This creates a Remote.Unbound
representing the unbound input parameter of the function, and calls the function with
it, injecting this "hole" in our expression tree.
Then we can bind the parameter of this function by calling .apply
on f2
:
val f3 = f2(100)
// f3: Remote[Int] = Bind(
// unbound = Unbound(identifier = ce0fdfae-90df-4cf5-b49d-c165ff9c1b2b),
// value = Literal(value = Primitive(value = 100, standardType = int)),
// inner = Binary(
// left = Unbound(identifier = ce0fdfae-90df-4cf5-b49d-c165ff9c1b2b),
// right = Literal(value = Primitive(value = 1, standardType = int)),
// operator = Numeric(
// operator = Add,
// numeric = zio.flow.remote.numeric.Numeric$NumericInt$@6533bc8e
// )
// )
// )
This way we get a Remote.Bind
which stores the parameter value (a Remote.Literal
holding 100
)
Let's see what happens when we evaluate this:
- First we evaluate the parameter value and get a
DynamicValue
back - Then we generate a fresh
RemoteVariableName
, a new ID that is guaranteed that was not used before - We store the parameter value in a remote variable with this new ID. When this evaluation is part of a persistent flow execution, in this case the remote context is not persisting the remote variable yet. It is just storing it in memory.
- We push a binding in the local context. The local context belongs to a single remote evaluation, while the remote
context for a whole persistent execution step. By pushing the local binding to the local context we are able to
evaluate the
Remote.Unbound
nodes. - We evaluate the
Bind
's inner remote, which is theUnboundRemoteFunction
. This is a remote expression tree which hasRemote.Unbound
in it, which we can now evaluate to the evaluated input because it is stored in the local context. - After that we remove the binding from the local context.
At this point we have a result of the applied function, but we are not done yet. It is possible that the result of the
function is a remote that captures the original Unbound
remote node, but the binding is only active during evaluating
this particular remote Bind
. So that would lead into an invalid continuation. To help imagine this situation, consider
this very simple and unusual example:
val f4 = (x: Remote[Int]) => ZFlow.succeed(x)
// f4: Remote[Int] => ZFlow[Any, zio.package.ZNothing, Int] = <function1>
This could be a function passed to a flatMap
in a ZIO Flow program.
When we evaluate the function we just get back a new ZFlow
value, that refers to a Remote.Unbound
node (by using x
in succeed
). But the binding is only active during the actual function evaluation. So the solution ZIO Flow has is
that once we evaluated a function, we check if the result refers to the input Unbound
identifier. All ZFlow
and Remote
nodes are tracking their used variables so this does not require traversing the whole continuation. If it
does not contain it, we are done. Otherwise we substitute the Unbound
node in the result tree to the remote
variable we generated. This way we moved out the local binding to be a persisted binding, as remote variables are
persisted; now it can be accessed in the continuation safely, even if the executor restarts.
What is implemented as a Remote and what not?​
There are many Remote
constructors for some primitive operations we support, but there are even more functionalities
implemented in other classes, such as BinaryOperators
, RemoteConversions
, etc. The primary distinction is that if
something can be implemented by staying on the level of DynamicValue
s, it is a Remote
constructor. If the
calculation requires converting the dynamic values to some typed value first, it is implemented in one of the supporting
classes such as the ones mentioned above.
Persistent variables and promises​
From the ZIO Flow program's perspective a persistent variables has a name typed as RemoteVariableName
, which is just a
wrapper over String
. The name used to store the variable in the key-value store is derived from this name but
contains more information.
Each variable is defined in a given scope. In short, the scope identifies the flow/fiber/transaction the variable was defined in. When accessing a remote variable there are scoping rules (defined below) defining how variables can be accessed from parent scopes.
The actual variable name used for storing the variable is described by ScopedRemoteVariableName
, which associates
a RemoteVariableScope
with a RemoteVariableName
. The key has to be invertible
for garbage collection to be able to identify all the stored variables.
The persistent executor is not storing the variables directly using the KeyValueStore
interface, but uses a wrapper on
top of it called RemoteVariableKeyValueStore
. This wrapper is responsible for dealing with the scoping rules,
publishing change events for watched variables, and to handle timestamps.
Each variable has a timestamp. Setting a new value for a remote variable does not overwrite the old value, but instead writes a new value with a new timestamp. Timestamps are essential for transactions to be able to detect conflicts in accessing the variables. The Timestamp values are coming from a virtual clock which is basically incrementing at each flow step. Forked flows inherit the current parent timestamp. In case a forked flow is joined, the parent fiber's virtual clock will be advanced to the maximum of the parent and the child fiber's clock.
Executor state management​
The persistent executor persists its state and any new/changed persisted remote variables after each step. One step is
the processing of one ZFlow
instruction.
While processing the step the executor collects a sequence of StateChange
values. At the end of each execution step
the following major steps are performed:
- applying the
StateChange
changes to the executor's state (in memory) - collecting the remote variables which were accessed (read or modified) during the step
- saving the modified persistent variables to the key-value store
- applying some more
StateChange
values to the executor's state, like recording the modified variables and advancing the virtual clock - persisting the new executor's state to the key-value store
Currently after each step we save a the full executor state into the key-value store. This is not optimal, but the
executor is designed in a way by working with StateChange
values that in the future it is going to support saving only
the changes into a journal instead.
Transactions​
In transactions every time a variable is accessed, it's current timestamp gets recorded. When the transaction is committed, these timestamps are compared to the actual timestamps and in case there is a difference that means there is a conflict and the transaction has to be retried.
Retry can also be triggered by the retryUntil
operator - it is implemented as special kind of failure. In fact within
a
transaction each user error E
is wrapped in a TransactionalFailure[E] = UserError[E] | Retry
type.
Retry can be captured by the orTry
operator in which case it works exactly like handling an error with Fold
.
In case the retry is not handled when it reaches a CommitTransaction
instruction in the stack, the transaction gets
restarted.
Retrying or failing in a transaction also causes all the activities to get compensated by running their compensate flows in reverse order.
Scoping rules​
Workflows​
A top level workflow defines the top level scope by its unique flow identifier. This guarantees that:
- separate workflows cannot see each other's variables
- restarted workflows see the same set of variables as the previous run as they share the flow id
Operation semantics on the top level of workflows (not in a forked fiber and not in an active transaction):
- New(name): creates a new remote variable in the KV store's variable
namespace called
"$flowid__$name"
- Get(name): reads
"$flowid__$name"
- Set(name): writes
"$flowid__$name"
Fibers​
Forked workflows are like regular workflows but they are not individually submitted, instead created by the executor by
the Fork operator. Each workflow maintains a fork counter and generates new workflow ids based
on that. So a forked workflow's flow identifier will be
"$parentId_fork$parentForkCounter"
.
Desired semantics:
- Forked workflows should have read/write access to variables accessible to the parent workflow
- Creating new variables in a forked workflow should not be accessible to the parent and sibling workflows
- Parallel forked workflows should be able to create independent variables with the same name
Operation semantics in forked workflows:
- New(name): creates a new remote variable in the KV store's variable
namespace prefixed by the active workflow identifier
"$flowid__$name"
(which is"$parentId_fork$parentForkCounter__$$name"
). - Get(name): first finds the variable's scope by first looking in the
current fiber's scope (using
"$flowid__$name"
) - if it does not exist, it recursively tries to access the variable in the parent scope ("$parentid__$name"
). - Set(name): same lookup as for Get - Get and Set must always select the same variable in an executor step
Transactions​
In transactions we have to delay the effect of Set (but within the transaction still see that value in Get) until the transaction is committed. This means that we need to store values for the same remote variable name per transaction beside its original value - which means transactions define their own scope.
Desired semantics:
- Creating a new variable in a transaction: should not behave differently
than in a regular scope
- transactional variable updates are only interesting if there are multiple fibers running transactions modifying the same variable. This means that even if there are "colliding" new variables in parallel transactions, their parent scope will be different (because fibers are also defining scopes) so they would never collide.
- Within the transaction, Get and Set should work as usual, but the effect of Set should not be visible for other fibers, even if the changed variable is in a shared scope.
- When the transaction is committed, the changes are either applied to these shared variables, or the transaction gets reverted.
Flow state contains a transaction counter that can be used as a unique
identifier for transaction scopes, similar to how fiber scopes are generated:
"parentId_tx$transactionCounter"
.
Operation semantics in transaction scopes:
- New(name): creates a new remote variable in the parent scope
- Get(name): acts the same way as in forked workflows, but also records the accessed variable's version if necessary
- Set(name): always sets the value in the transaction scope
(
$parentid__$name
)
Garbage collection​
The garbage collector of the persisted executor runs periodically and performs the following steps:
First we get all the persisted remote variables from the key-value store. The encoding of scopes in the variable names is invertible so we can recover a set of scoped remote variable names by scanning the keys.
For each running flow we gather the known set of remote variables referenced by the remaining of that flow. For this we get each flow's state and get the referenced remote variables from it's current stack (variable usage is already tracked on ZFlow and Remote level).
Because of how variable scoping works, we don't know in the GC in advance exactly which scoped remote variable a given remote variable name refers to - it is possible that a fiber refers to its parent fiber's remote variable, etc.
Transactions are also generating scoped variables. To work around this the garbage collector is following a pessimistic
but safe logic: if a flow refers to variable A, it prevents the removal of A from that flow and all its parent flows.
For
variables with transactional scope, we know them from the list of all existing scoped remote variables, and we simply
keep all of them belonging to the flow where the referenced name is coming from. So if flow X references variable A and
in the key-value store we have two variables with name A in scope X/transaction1
and X/transaction2
we simply keep
both.