ZIO Kinesis
ZIO Kinesis is a ZIO-based AWS Kinesis client for Scala.
Introduction​
ZIO Kinesis is an interface to Amazon Kinesis Data Streams for consuming and producing data. This library is built on top of ZIO AWS, a library of automatically generated ZIO wrappers around AWS SDK methods.
Installation​
In order to use this library, we need to add the following line in our build.sbt
file:
libraryDependencies += "nl.vroste" %% "zio-kinesis" % "0.20.0"
Example​
This is an example of consuming a stream from Amazon Kinesis:
import nl.vroste.zio.kinesis.client.serde.Serde
import nl.vroste.zio.kinesis.client.zionative.Consumer
import zio.clock.Clock
import zio.console.{Console, putStrLn}
import zio.duration._
import zio.logging.Logging
import zio.{ExitCode, URIO, _}
object ZIOKinesisConsumerExample extends zio.App {
val loggingLayer: ZLayer[Any, Nothing, Logging] =
(Console.live ++ Clock.live) >>>
Logging.console() >>>
Logging.withRootLoggerName(getClass.getName)
override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] =
Consumer
.consumeWith(
streamName = "my-stream",
applicationName = "my-application",
deserializer = Serde.asciiString,
workerIdentifier = "worker1",
checkpointBatchSize = 1000L,
checkpointDuration = 5.minutes
)(record => putStrLn(s"Processing record $record"))
.provideCustom(Consumer.defaultEnvironment ++ loggingLayer)
.exitCode
}