import zio._
import zio.http.ChannelEvent.{ChannelRead, ExceptionCaught, UserEvent, UserEventTriggered}
import zio.http._
import zio.http.model.Method
import zio.http.socket._
object WebSocketAdvanced extends ZIOAppDefault {
val messageFilter: Http[Any, Nothing, WebSocketChannelEvent, (Channel[WebSocketFrame], String)] =
Http.collect[WebSocketChannelEvent] { case ChannelEvent(channel, ChannelRead(WebSocketFrame.Text(message))) =>
(channel, message)
}
val messageSocket: Http[Any, Throwable, WebSocketChannelEvent, Unit] =
messageFilter >>> Handler.fromFunctionZIO[(WebSocketChannel, String)] {
case (ch, "end") => ch.close()
case (ch, "foo") => ch.writeAndFlush(WebSocketFrame.text("bar"))
case (ch, "bar") => ch.writeAndFlush(WebSocketFrame.text("foo"))
case (ch, text) =>
ch.write(WebSocketFrame.text(text)).repeatN(10) *> ch.flush
}
val channelSocket: Http[Any, Throwable, WebSocketChannelEvent, Unit] =
Http.collectZIO[WebSocketChannelEvent] {
case ChannelEvent(ch, UserEventTriggered(UserEvent.HandshakeComplete)) =>
ch.writeAndFlush(WebSocketFrame.text("Greetings!"))
case ChannelEvent(_, ChannelRead(WebSocketFrame.Close(status, reason))) =>
Console.printLine("Closing channel with status: " + status + " and reason: " + reason)
case ChannelEvent(_, ExceptionCaught(cause)) =>
Console.printLine(s"Channel error!: ${cause.getMessage}")
}
val httpSocket: Http[Any, Throwable, WebSocketChannelEvent, Unit] =
messageSocket ++ channelSocket
val protocol = SocketProtocol.default.withSubProtocol(Some("json"))
val decoder = SocketDecoder.default.withExtensions(allowed = true)
val socketApp: SocketApp[Any] =
httpSocket.toSocketApp
.withDecoder(decoder)
.withProtocol(protocol)
val app: Http[Any, Nothing, Request, Response] =
Http.collectZIO[Request] {
case Method.GET -> !! / "greet" / name => ZIO.succeed(Response.text(s"Greetings ${name}!"))
case Method.GET -> !! / "subscriptions" => socketApp.toResponse
}
override val run = Server.serve(app).provide(Server.default)
}