Skip to main content
Version: 2.x

Couchbase Connector

Setup​

libraryDependencies += "dev.zio" %% "zio-connect-couchbase" % "0.4.4"

How to use it?​

All available CouchbaseConnector combinators and operations are available in the package object zio.connect.couchbase, you only need to import zio.connect.couchbase._ to get started.

The couchbase connector presumes you already have a couchbase cluster to connect to, and uses the official java client under the hood. You can provide a cluster connection in the usual way and wrap it in a ZLayer, typically something like this:

import com.couchbase.client.java.Cluster
import zio._

val cluster = ZLayer.scoped(
ZIO.acquireRelease(
ZIO.attempt(
Cluster
.connect("127.0.0.1", "admin", "admin22")
)
)(c => ZIO.attempt(c.disconnect()).orDie)
)

The connector provides a number of operations that can be used to interact with the cluster, most of the operations require a QueryObject or ContentQueryObject which are case classes. The couchbase primitives that are used: BucketName, CollectionName, ScopeName and DocumentId are all defined as zio-prelude newtypes of String

import zio.connect.couchbase.CouchbaseConnector._

val bucket = BucketName("gamesim-sample")
val collection = CollectionName("_default")
val scope = ScopeName("_default")
val newKey = DocumentKey("zio-connect-doc")

val queryObject = QueryObject(bucket, scope, collection, newKey)

Now let's do something, and by do, we mean let's describe an action, like checking to see that a document exists by key/id:

val checkExists: ZIO[CouchbaseConnector, CouchbaseException, Boolean] = ZStream(queryObject) >>> exists

Some important points to note here, the exists operation is a ZSink that expects elements of type QueryObject as its streamed input. You can access the underlying Throwable from the CouchbaseException by using the reason property

Here's what inserting a document looks like:

val key                = DocumentKey(UUID.randomUUID().toString)
val content = Chunk[Byte](1, 2, 3)
val contentQueryObject = ContentQueryObject(bucketName, scopeName, collectionName, key, content)

val insertAction: ZIO[CouchbaseConnector, CouchbaseException, Unit] = ZStream(contentQueryObject) >>> insert

insert is a ZSink that expects elements of type ContentQueryObject, which is a query object with an additional content property, as its streamed input and returns Unit as its output.

To get a document by key/id:

val getAction: ZIO[CouchbaseConnector, CouchbaseException, Chunk[Byte]] = get(queryObject).runCollect

get is a ZStream that takes a query object as an argument and returns a ZStream of Chunk[Byte] as its output. There are other ways to process the chunk of bytes which get returned, but of course this depends on your domain.

In order to run a program involving the couchbase connector, you need to provide the CouchbaseConnector layer, and the cluster connection layer we created earlier:

def run = getAction.provide(couchbaseConnectorLiveLayer, cluster)

couchbaseConnectorLiveLayer is a ZLayer that provides the LiveCouchbaseConnector service, and is defined in the CouchbaseConnector companion object.

Operators​

The following operators are available:

exists​

Checks if a document exists by key/id, if the bucket, collection or scope do not exist you will get an AuthenticationFailureException complaining about privileges.

val existsAction: ZIO[CouchbaseConnector, CouchbaseException, Boolean] = ZStream(queryObject) >>> exists

get​

Gets a document by key/id, if the bucket, collection or scope do not exist you will get an AuthenticationFailureException. If the document does not exist you will get a DocumentNotFoundException.

val getAction: ZIO[CouchbaseConnector, CouchbaseException, Chunk[Byte]] = get(queryObject).runCollect

insert​

Inserts a document by key/id, if the bucket, collection or scope do not exist you will get an AuthenticationFailureException. If the document already exists you will get a DocumentExistsException.

val insertAction: ZIO[CouchbaseConnector, CouchbaseException, Unit] = ZStream(contentQueryObject) >>> insert

remove​

Removes a document by key/id, if the bucket, collection or scope do not exist you will get an AuthenticationFailureException. If the document does not exist you will get a DocumentNotFoundException.

val removeAction: ZIO[CouchbaseConnector, CouchbaseException, Unit] = ZStream(queryObject) >>> remove

replace​

Replaces a document by key/id, if the bucket, collection or scope do not exist you will get an AuthenticationFailureException. If the document does not exist you will get a DocumentNotFoundException.

val replaceAction: ZIO[CouchbaseConnector, CouchbaseException, Unit] = ZStream(contentQueryObject) >>> replace

upsert​

Updates or inserts a document by key/id, if the bucket, collection or scope do not exist you will get an AuthenticationFailureException.

val upsertAction: ZIO[CouchbaseConnector, CouchbaseException, Unit] = ZStream(contentQueryObject) >>> upsert