ZIO Kinesis
ZIO Kinesis is a ZIO-based AWS Kinesis client for Scala.
Introduction
ZIO Kinesis is an interface to Amazon Kinesis Data Streams for consuming and producing data. This library is built on top of ZIO AWS, a library of automatically generated ZIO wrappers around AWS SDK methods.
Installation
In order to use this library, we need to add the following line in our build.sbt
file:
libraryDependencies += "nl.vroste" %% "zio-kinesis" % "0.20.0"
Example
This is an example of consuming a stream from Amazon Kinesis:
import nl.vroste.zio.kinesis.client.serde.Serde
import nl.vroste.zio.kinesis.client.zionative.Consumer
import zio.clock.Clock
import zio.console.{Console, putStrLn}
import zio.duration._
import zio.logging.Logging
import zio.{ExitCode, URIO, _}
object ZIOKinesisConsumerExample extends zio.App {
val loggingLayer: ZLayer[Any, Nothing, Logging] =
(Console.live ++ Clock.live) >>>
Logging.console() >>>
Logging.withRootLoggerName(getClass.getName)
override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] =
Consumer
.consumeWith(
streamName = "my-stream",
applicationName = "my-application",
deserializer = Serde.asciiString,
workerIdentifier = "worker1",
checkpointBatchSize = 1000L,
checkpointDuration = 5.minutes
)(record => putStrLn(s"Processing record $record"))
.provideCustom(Consumer.defaultEnvironment ++ loggingLayer)
.exitCode
}