Skip to main content
Version: 2.0.x

Backends

This page lists all the provided backend implementations for storing persistent data in the ZIO Flow executors. Backend implementations need to implement two interfaces:

  • KeyValueStore
  • IndexedStore

Custom backend implementations are possible by implementing these traits, and a shared test suite is published to validate the custom implementations (more information about this can be found in the testing section.

RocksDB

The RocksDb backend is implemented in the following module:

libraryDependencies += "dev.zio" %% "zio-flow-rocksdb" % "1.0.0-RC4"

RocksDb databases are stored in local files. You can use the same file for the key-value store and the indexed store, but you don't have to.

Cassandra

To use Cassandra as a ZIO Flow backend, add the following dependency:

libraryDependencies += "dev.zio" %% "zio-flow-cassandra" % "1.0.0-RC4"

Supported Versions

The Cassandra module supports Cassandra 3.x, Cassandra 4.x and ScyllaDB 4.x. Specifically, we test against Cassandra 3.11, Cassandra 4.1, and ScyllaDB 4.5. See CassandraKeyValueStoreSpec.scala in the test suite for more details.

Database Setup

The Cassandra module requires two tables (column family) for persistence.

To create these tables, run the following CQL statements:

for key-value store:

CREATE TABLE _zflow_key_value_store (
zflow_kv_namespace VARCHAR,
zflow_kv_key BLOB,
zflow_kv_timestamp BIGINT,
zflow_kv_value BLOB,
PRIMARY KEY (zflow_kv_namespace, zflow_kv_key, zflow_kv_timestamp)
);

for indexed store:

CREATE TABLE _zflow_idx_store (
zflow_idx_topic VARCHAR,
zflow_idx_index BIGINT,
zflow_idx_value BLOB,
PRIMARY KEY (zflow_idx_topic, zflow_idx_index)
);

You should add table options to this statement in a production environment for tuning. Please consult the official documentations of the database product of your choosing.

Performance/Scaling Considerations:

As you can see from the CQL above, the primary key is composed of the three columns, zflow_kv_namespace , zflow_kv_key and zflow_kv_timestamp. In particular, zflow_kv_namespace is the partition key and zflow_kv_key and zflow_kv_timestamp are the clustering keys. Assuming the default partitioner (Murmur3Partitioner) is used, data will be partitioned by the hash values of the zflow_kv_namespace column. If one small set of namespace values are the majority for all possible values, that will create data skew and can have a big impact to your cluster down the road. Some consideration is needed when deciding the type of values zflow_kv_namespace should store.

DynamoDB

To use AWS DynamoDb as a key-value store or indexed store implementation add the following dependency:

libraryDependencies += "dev.zio" %% "zio-flow-dynamodb" % "1.0.0-RC4"

Metrics

The DynamoDb backend does not publish any metrics by default, but you can use zio-aws's built-in metrics aspect to enable AWS operation level metrics:

DynamoDb.live @@ zio.aws.core.aspects.callDuration(
prefix = "zioflow",
boundaries = Histogram.Boundaries.exponential(0.01, 2, 14)
)

Database Setup

The DynamoDB module requires two tables for persistence (one for key-value store, one for indexed store). Here's a Python script to create the table:

import boto3

dynamodb = boto3.resource('dynamodb')

dynamodb.create_table (
TableName = '_zflow_key_value_store',
AttributeDefinitions = [
{
'AttributeName': 'zflow_kv_key',
'AttributeType': 'B'
},
{
'AttributeName': 'zflow_kv_timestamp',
'AttributeType': 'N'
}
],
KeySchema = [
{
'AttributeName': 'zflow_kv_key',
'KeyType': 'HASH'
},
{
'AttributeName': 'zflow_kv_timestamp',
'KeyType': 'RANGE'
}
],
GlobalSecondaryIndexes = [
{
'IndexName': 'namespace_index',
'KeySchema': [
{
'AttributeName': 'zflow_kv_namespace',
'KeyType': 'HASH'
}
],
'Projection': {
'NonKeyAttributes': ['zflow_kv_value'],
'ProjectionType': 'INCLUDE'
},
'ProvisionedThroughput' = {
'ReadCapacityUnits': 1,
'WriteCapacityUnits': 1
}
}
],
ProvisionedThroughput = {
'ReadCapacityUnits': 1,
'WriteCapacityUnits': 1
}
)

dynamodb.create_table (
TableName = '_zflow_indexed_store',
AttributeDefinitions = [
{
'AttributeName': 'zflow_idx_topic',
'AttributeType': 'S'
},
{
'AttributeName': 'zflow_idx_index',
'AttributeType': 'N'
}
],
KeySchema = [
{
'AttributeName': 'zflow_idx_topic',
'KeyType': 'HASH'
},
{
'AttributeName': 'zflow_idx_index',
'KeyType': 'RANGE'
}
],
ProvisionedThroughput = {
'ReadCapacityUnits': 1,
'WriteCapacityUnits': 1
}
)

Of course, you can use your favourite AWS tool to create the table (e.g. DynamoDB Console) or to automate the creation ( e.g. via CloudFormation). You should customize the ProvisionedThroughput settings when the table is provisioned.

Performance/Scaling Considerations:

As you can see from the script above, the primary key is composed of the two columns, zflow_kv_key and zflow_kv_timestamp. In particular, zflow_kv_key is the partition key and zflow_kv_timestamp is the sort key. Internally, zio-flow will store the namespace as part of the zflow_kv_key value as well, but it will also store it in the zflow_kv_namespace attribute for easy access to all values within a namespace. This requires a secondary index to be set up on the zflow_kv_namespace attribute.

In-memory

ZIO Flow also provides a default in-memory implementation for both the key-value store and the indexed store. These are useful for running ZIO Flow programs in tests, but they are not safe to use in production.

The following layers create the in-memory implementations of the stores:

import zio.flow.runtime._

KeyValueStore.inMemory
// res0: zio.ZLayer[Any, Nothing, KeyValueStore] = Suspend(
// self = zio.ZLayer$ScopedEnvironmentPartiallyApplied$$$Lambda$13761/0x000000080335e2c8@141d6bd8
// )
IndexedStore.inMemory
// res1: zio.ZLayer[Any, Nothing, IndexedStore] = Suspend(
// self = zio.ZLayer$$$Lambda$13752/0x0000000803352838@3db1a527
// )