DynamoDB Connector
Setup​
libraryDependencies += "dev.zio" %% "zio-connect-dynamodb" % "0.4.4"
How to use it?
All available DynamoDBConnector combinators and operations are available in the package object zio.connect.dynamodb._
you will need to import that to get started.
Additionally, you must also configure and provide the underlying DynamoDB
layer provided by zio-aws
you can read more about how to configure it here
If you have default credentials in the system environment typically at ~/.aws/credentials
or as env variables
the following configuration will likely work.
import zio._
import zio.aws.netty.NettyHttpClient
import zio.aws.core.config.AwsConfig
import zio.aws.core.httpclient.HttpClient
import zio.connect.dynamodb._
lazy val httpClient: ZLayer[Any, Throwable, HttpClient] = NettyHttpClient.default
lazy val awsConfig: ZLayer[Any, Throwable, AwsConfig] = httpClient >>> AwsConfig.default
Almost everything in this api requires the existence of a table, we utilize the models provided by zio-aws
to create tables, requests, responses, and all other DynamoDB related types. These are typically modeled as
new-types
from zio-prelude or case classes.
Here's a create table request:
import zio.aws.dynamodb.model._
import zio.aws.dynamodb.model.primitives._
def createTableRequest(tableName: TableName): CreateTableRequest =
CreateTableRequest(
tableName = tableName,
attributeDefinitions = List(
AttributeDefinition(
KeySchemaAttributeName("id"),
ScalarAttributeType.S
)
),
keySchema = List(
KeySchemaElement(KeySchemaAttributeName("id"), KeyType.HASH)
),
provisionedThroughput = Some(
ProvisionedThroughput(
readCapacityUnits = PositiveLongObject(16L),
writeCapacityUnits = PositiveLongObject(16L)
)
),
tableClass = TableClass.STANDARD
)
DynamoDB is "schemaless" in the sense that put data of different shapes in different rows, but you must define
a schema for the keys that the table depends on, these are called "partition keys" and "sort keys" or "hash" and "range" keys.
Only a partition/hash key is required, and you should only define attributes for the fields which constitute your
keySchema
. A valid table definition must also declare a provisionedThroughput
, but there are many other
options available for creating tables, you can read more about tables here.
Once you have a table definition you can create a table using the createTable
combinator:
val tableName = TableName("my-table")
val createTableAction: ZIO[DynamoDBConnector, AwsError, CreateTableResponse] = createTable(createTableRequest(tableName)) >>> createTable
Dynamo tables can take a moment to be created, so you'll want to have some kind of retry mechanism when performing subsequent operations on the table. To illustrate, let's put an item into the table:
val putItemAction: ZIO[DynamoDBConnector, AwsError, PutItemResponse] =
ZStream(PutItemRequest(tableName, Map("id" -> AttributeValue(s = StringAttributeValue("my-id"))))) >>> putItem
val putItemWithRetry: ZIO[DynamoDBConnector, AwsError, PutItemResponse] =
putItemAction.retryWhile {
case GenericAwsError(_: ResourceNotFoundException) => true
case _ => false
}
Let's say we want to get an item from the table, we can use the getItem
combinator, note we need to
provide the full key here to use it:
val key = Map("id" -> AttributeValue(s = StringAttributeValue("my-id")))
val getItemAction: ZIO[DynamoDBConnector, AwsError, GetItemResponse] =
getItem(GetItemRequest(tableName, key)) >>> getItem
Currently, entries from the table are returned as a Map[AttributeName, AttributeValue]
this may change in the future.
And we can also delete an item from the table:
val deleteItemAction: ZIO[DynamoDBConnector, AwsError, DeleteItemResponse] =
deleteItem(DeleteItemRequest(tableName, key)) >>> deleteItem
In order to run a program involving the DynamoDBConnector you need to provide a live DynamoDB
from zio-aws
along with config
and the live connector layer:
override def run: ZIO[ZIOAppArgs with Scope, Any, Any] =
program.provide(awsConfig, DynamoDb.live, dynamoDBConnectorLiveLayer)
dynamoDBConnectorLiveLayer
is a ZLayer
that provides the LiveDynamoDBConnector
Operators​
The following operations are available:
batchGetItem
​
Accepts a stream of BatchGetItemRequest
and returns a Chunk
of BatchGetItemResponse
, if
one of the tables in the request is not available, the entire request will fail.
The response object will contain and unprocessedKeys
field which can be used to retry the request.
val item1 = Map(AttributeName("id") -> AttributeValue(s = StringAttributeValue("key1")))
val item2 = Map(AttributeName("id") -> AttributeValue(s = StringAttributeValue("key2")))
val keysAndAttributes = KeysAndAttributes(List(item1))
val keysAndAttributes2 = KeysAndAttributes(List(item2))
val batchGetItemRequest =
BatchGetItemRequest(Map(tableName -> keysAndAttributes, tableName2 -> keysAndAttributes2))
val batchGetItemAction: ZIO[DynamoDBConnector, AwsError, Chunk[BatchGetItemResponse]] =
batchGetItem(ZStream(batchGetItemRequest)) >>> batchGetItem
batchWriteItem
​
Accepts a stream of BatchWriteItemRequest
and returns a Chunk
of BatchWriteItemResponse
, if
one of the tables in the request is not available, the entire request will fail. This is used to
simultaneously put and delete items on multiple tables, you cannot write and delete an item
with the same key in the same table in a single request. The response object does
have an unprocessedKeys
field which can be used to retry the remaining requests.
val tableName = TableName("batchWriteItem1")
val item1 = Map(AttributeName("id") -> AttributeValue(s = StringAttributeValue("key1")))
val item2 = Map(AttributeName("id") -> AttributeValue(s = StringAttributeValue("key2")))
val writeRequests = List(
WriteRequest(putRequest = PutRequest(item1)),
WriteRequest(putRequest = PutRequest(item2))
)
val batchWriteItemRequest = BatchWriteItemRequest(
Map(tableName -> writeRequests)
)
val batchWriteItemAction: ZIO[DynamoDBConnector, AwsError, Chunk[BatchWriteItemResponse]] =
batchWriteItem(ZStream(batchWriteItemRequest)) >>> batchWriteItem
createTable
​
Accepts a stream of CreateTableRequest
and returns Unit
. Will fail with a ResourceInUseException
if the table already exists.
def createTableRequest(tableName: TableName): CreateTableRequest =
CreateTableRequest(
tableName = tableName,
attributeDefinitions = List(
AttributeDefinition(
KeySchemaAttributeName("id"),
ScalarAttributeType.S
)
),
keySchema = List(
KeySchemaElement(KeySchemaAttributeName("id"), KeyType.HASH)
),
provisionedThroughput = Some(
ProvisionedThroughput(
readCapacityUnits = PositiveLongObject(16L),
writeCapacityUnits = PositiveLongObject(16L)
)
),
tableClass = TableClass.STANDARD
)
val createTableAction: ZIO[DynamoDBConnector, AwsError, Unit] =
ZStream(createTableRequest(tableName)) >>> createTable
deleteItem
​
Accepts a stream of DeleteItemRequest
and returns Unit
. Will fail with a
ResourceNotFoundException
if the table does not exist, does
not fail if the item does not exist.
val item1 = Map(AttributeName("id") -> AttributeValue(s = StringAttributeValue("key1")))
val deleteItemAction: ZIO[DynamoDBConnector, AwsError, Unit] = ZStream(DeleteItemRequest(tableName, item1)) >>> deleteItem
deleteTable
​
Accepts a stream of DeleteTableRequest
and returns Unit
. Will fail
with ResourceNotFoundException
if the table does not exist.
val deleteTableAction: ZIO[DynamoDBConnector, AwsError, Unit] = ZStream(DeleteTableRequest(tableName)) >>> deleteTable
describeTable
​
Accepts a stream of DescribeTableRequest
and returns a Chunk
of DescribeTableResponse
. Will fail
if the table does not exist.
val describeTableAction: ZIO[DynamoDBConnector, AwsError, Chunk[DescribeTableResponse]] = ZStream(DescribeTableRequest(tableName)) >>> describeTable
listTables
​
Takes a ListTableRequest
and return a Stream of TableName
, can also provide a limit to the number of tables returned.
val listTablesAction: ZIO[DynamoDBConnector, AwsError, TableName] = listTables(ListTablesRequest()).runCollect
getItem
​
Accepts a stream of GetItemRequest
and returns a Chunk
of GetItemResponse
.
Will fail with a ResourceNotFoundException
if the table does not exist.
val item1 = Map(AttributeName("id") -> AttributeValue(s = StringAttributeValue("key1")))
val getItemAction: ZIO[DynamoDBConnector, AwsError, Chunk[GetItemResponse]] = ZStream(GetItemRequest(tableName, item1)) >>> getItem
putItem
​
Accepts a stream of PutItemRequest
and returns Unit
. Will fail with a ResourceNotFoundException
if the table does not exist.
val item1 = Map(AttributeName("id") -> AttributeValue(s = StringAttributeValue("key1")))
val putItemAction: ZIO[DynamoDBConnector, AwsError, Unit] = ZStream(PutItemRequest(tableName, item1)) >>> putItem
query
​
Accepts a stream of QueryRequest
and returns a Chunk
of Map[AttributeName, AttributeValue]
.
Will fail with a ResourceNotFoundException
if the table does not exist.
val tableName = TableName("query1")
val keyExpression = KeyExpression("id = :id")
val expressionAttributeValues = Map(
ExpressionAttributeValueVariable(":id") -> AttributeValue(s = StringAttributeValue("key1"))
)
val queryRequest = QueryRequest(
tableName,
keyConditionExpression = keyExpression,
expressionAttributeValues = expressionAttributeValues
)
val queryAction: ZIO[DynamoDBConnector, AwsError, Chunk[Map[AttributeName, AttributeValue]]] =
ZStream(queryRequest) >>> query
scan
​
Similar to query, but if you don't know the key, you can use scan to return some items in a table or filter
by some non-key condition. Accepts a stream of ScanRequest
returns a Chunk
of Map[AttributeName, AttributeValue]
.
Fails if the table does not exist.
val tableName = TableName("scan1")
val scanRequest = ScanRequest(tableName)
val scanAction: ZIO[DynamoDBConnector, AwsError, Chunk[Map[AttributeName, AttributeValue]]] =
ZStream(scanRequest) >>> scan
tableExists
​
Given a TableName
, returns a Boolean
indicating if the table exists.
val tableExistsAction: ZIO[DynamoDBConnector, AwsError, Boolean] = ZStream(tableName) >>> tableExists
updateItem
​
Accepts a stream of UpdateItemRequest
and returns Unit
. Will fail with a ResourceNotFoundException
if the table does not exist.
val tableName = TableName("updateItem1")
val item1 = Map(AttributeName("id") -> AttributeValue(s = StringAttributeValue("key1")))
val updateItemRequest = UpdateItemRequest(
tableName,
item1,
Map(
AttributeName("authorized") -> AttributeValueUpdate(
AttributeValue(bool = BooleanAttributeValue(true)),
AttributeAction.PUT
)
)
)
val updateItemAction: ZIO[DynamoDBConnector, AwsError, Unit] =
ZStream(updateItemRequest) >>> updateItem
updateTable
​
Accepts a stream of UpdateTableRequest
and returns Unit
. Will fail with a ResourceNotFoundException
if the table does not exist.
val tableName = TableName("updateTable1")
val updateTableRequest = UpdateTableRequest(
tableName,
provisionedThroughput = Some(
ProvisionedThroughput(
readCapacityUnits = PositiveLongObject(16L),
writeCapacityUnits = PositiveLongObject(16L)
)
)
)
val updateTableAction: ZIO[DynamoDBConnector, AwsError, Unit] =
ZStream(updateTableRequest) >>> updateTable