Skip to main content
Version: 2.x

Channel Interruption

We can interrupt a channel using the ZChannel.interruptWhen operator. It takes a ZIO effect that will be evaluated, if it finishes before the channel is closed, it will interrupt the channel, and the terminal value of the returned channel will be the success value of the effect:

import zio._
import zio.stream._

def randomNumbers: ZChannel[Any, Any, Any, Any, Nothing, Int, Nothing] =
ZChannel
.fromZIO(Random.nextIntBounded(100))
.flatMap(ZChannel.write) *>
ZChannel.fromZIO(ZIO.sleep(1.second)) *> randomNumbers

randomNumbers.interruptWhen(ZIO.sleep(3.seconds).as("Done!")).runCollect.debug
// One output: (Chunk(84,57,70),Done!)

Another version of interruptWhen takes a Promise as an argument. It will interrupt the channel when the promise is fulfilled:

import zio.stream._

for {
p <- Promise.make[Nothing, Unit]
f <- randomNumbers
.interruptWhen(p)
.mapOutZIO(e => Console.printLine(e))
.runDrain
.fork
_ <- p.succeed(()).delay(5.seconds)
_ <- f.join
} yield ()

// Output:
// 74
// 60
// 52
// 52
// 79