Resourceful Streams
Most of the constructors of ZStream
have a special variant to lift a scoped resource to a Stream (e.g. ZStream.fromReaderScoped
). By using these constructors, we are creating streams that are resource-safe. Before creating a stream, they acquire the resource, and after usage; they close the stream.
ZIO Stream also has acquireRelease
and finalizer
constructors which are similar to ZIO.acquireRelease
. They allow us to clean up or finalizing before the stream ends:
Acquire Release
We can provide acquire
and release
actions to ZStream.acquireReleaseWith
to create a resourceful stream:
object ZStream {
def acquireReleaseWith[R, E, A](
acquire: ZIO[R, E, A]
)(
release: A => URIO[R, Any]
): ZStream[R, E, A] = ???
Let's see an example of using an acquire release when reading a file. In this example, by providing acquire
and release
actions to ZStream.acquireReleaseWith
, it gives us a scoped stream of BufferedSource
. As this stream is scoped, we can convert that BufferedSource
to a stream of its lines and then run it, without worrying about resource leakage:
val lines: ZStream[Any, Throwable, String] =
ZStream
.acquireReleaseWith(
ZIO.attempt(Source.fromFile("file.txt")) <* printLine("The file was opened.")
)(x => ZIO.succeed(x.close()) <* printLine("The file was closed.").orDie)
.flatMap { is =>
ZStream.fromIterator(is.getLines())
}
Finalization
We can also create a stream that never fails and define a finalizer for it, so that finalizer will be executed before that stream ends.
object ZStream {
def finalizer[R](
finalizer: URIO[R, Any]
): ZStream[R, Nothing, Any] = ???
}
It is useful when need to add a finalizer to an existing stream. Assume we need to clean up the temporary directory after our streaming application ends:
import zio.Console._
def application: ZStream[Any, IOException, Unit] = ZStream.fromZIO(printLine("Application Logic."))
def deleteDir(dir: Path): ZIO[Any, IOException, Unit] = printLine("Deleting file.")
val myApp: ZStream[Any, IOException, Any] =
application ++ ZStream.finalizer(
(deleteDir(Paths.get("tmp")) *>
printLine("Temporary directory was deleted.")).orDie
)
Ensuring
We might want to run some code after the execution of the stream's finalization. To do so, we can use the ZStream#ensuring
operator:
ZStream
.finalizer(Console.printLine("Finalizing the stream").orDie)
.ensuring(
printLine("Doing some other works after stream's finalization").orDie
)
// Output:
// Finalizing the stream
// Doing some other works after stream's finalization