Skip to main content
Version: 2.0.x

ZIO Kinesis

ZIO Kinesis is a ZIO-based AWS Kinesis client for Scala.

Introduction

ZIO Kinesis is an interface to Amazon Kinesis Data Streams for consuming and producing data. This library is built on top of ZIO AWS, a library of automatically generated ZIO wrappers around AWS SDK methods.

Installation

In order to use this library, we need to add the following line in our build.sbt file:

libraryDependencies += "nl.vroste" %% "zio-kinesis" % "0.20.0"

Example

This is an example of consuming a stream from Amazon Kinesis:

import nl.vroste.zio.kinesis.client.serde.Serde
import nl.vroste.zio.kinesis.client.zionative.Consumer
import zio.clock.Clock
import zio.console.{Console, putStrLn}
import zio.duration._
import zio.logging.Logging
import zio.{ExitCode, URIO, _}

object ZIOKinesisConsumerExample extends zio.App {
val loggingLayer: ZLayer[Any, Nothing, Logging] =
(Console.live ++ Clock.live) >>>
Logging.console() >>>
Logging.withRootLoggerName(getClass.getName)

override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] =
Consumer
.consumeWith(
streamName = "my-stream",
applicationName = "my-application",
deserializer = Serde.asciiString,
workerIdentifier = "worker1",
checkpointBatchSize = 1000L,
checkpointDuration = 5.minutes
)(record => putStrLn(s"Processing record $record"))
.provideCustom(Consumer.defaultEnvironment ++ loggingLayer)
.exitCode
}