Apache Avro Codecs
Introduction
Apache Avro is a popular data serialization format used in distributed systems, particularly in the Apache Hadoop ecosystem. In this article, we will explore how to work with Apache Avro codecs in Scala using the ZIO Schema. Avro codecs allow us to easily serialize and deserialize data in Avro's binary and JSON formats.
Installation
To use the Avro codecs, we need to add the following dependency to our build.sbt
file:
libraryDependencies += "dev.zio" %% "zio-schema-avro" % "1.5.0"
Codecs
It has two codecs:
- An AvroSchemaCodec to serialize a
Schema[A]
to Avro JSON schema and deserialize an Avro JSON schema to aSchema.GenericRecord
. - An AvroCodec to serialize/deserialize the Avro binary serialization format.
AvroSchemaCodec
The AvroSchemaCodec
provides methods to encode a Schema[_]
to Avro JSON schema and decode an Avro JSON schema to a Schema[_]
(Schema.GenericRecord
):
trait AvroSchemaCodec {
def encode(schema: Schema[_]): scala.util.Either[String, String]
def decode(bytes: Chunk[Byte]): scala.util.Either[String, Schema[_]]
}
The encode
method takes a Schema[_]
and returns an Either[String, String]
where the Right
side contains the Avro schema in JSON format.
The decode
method takes a Chunk[Byte]
which contains the Avro JSON Schema in binary format and returns an Either[String, Schema[_]]
where the Right
side contains the ZIO Schema in GenericRecord
format.
Here is an example of how to use it:
import zio._
import zio.schema.Schema
import zio.schema.DeriveSchema
import zio.schema.codec.AvroSchemaCodec
case class Person(name: String, age: Int)
object Person {
implicit val schema: Schema[Person] = DeriveSchema.gen
}
object Main extends ZIOAppDefault {
def run =
for {
_ <- ZIO.debug("AvroSchemaCodec Example:")
avroSchema <- ZIO.fromEither(AvroSchemaCodec.encode(Person.schema))
_ <- ZIO.debug(s"The person schema in Avro Schema JSON format: $avroSchema")
avroSchemaBinary = Chunk.fromArray(avroSchema.getBytes)
zioSchema <- ZIO.fromEither(AvroSchemaCodec.decode(avroSchemaBinary))
_ <- ZIO.debug(s"The person schema in ZIO Schema GenericRecord format: $zioSchema")
} yield ()
}
The output:
AvroSchemaCodec Example:
The person schema in Avro Schema JSON format: {"type":"record","name":"Person","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"}]}
The person schema in ZIO Schema GenericRecord format: GenericRecord(Nominal(Chunk(),Chunk(),Person),Field(name,Primitive(string,Chunk())) :*: Field(age,Primitive(int,Chunk())) :*: Empty,Chunk(name(Person)))
As we can see, we converted the Schema[Person]
to Avro schema JSON format, and then we converted it back to the ZIO Schema GenericRecord
format.
AvroCodec
We can create a BinaryCodec[A]
for any type A
that has a Schema[A]
instance using AvroCodec.schemaBasedBinaryCodec
:
object AvroCodec {
implicit def schemaBasedBinaryCodec[A](implicit schema: Schema[A]): BinaryCodec[A] = ???
}
Now, let's write an example and see how it works:
import zio._
import zio.schema.Schema
import zio.schema.DeriveSchema
import zio.schema.codec.{AvroCodec, BinaryCodec}
case class Person(name: String, age: Int)
object Person {
implicit val schema: Schema[Person] = DeriveSchema.gen
implicit val binaryCodec: BinaryCodec[Person] =
AvroCodec.schemaBasedBinaryCodec[Person]
}
object Main extends ZIOAppDefault {
def run =
for {
_ <- ZIO.debug("AvroCodec Example:")
encodedPerson = Person.binaryCodec.encode(Person("John", 42))
_ <- ZIO.debug(s"encoded person object: ${toHex(encodedPerson)}")
decodedPerson <- ZIO.fromEither(
Person.binaryCodec.decode(encodedPerson)
)
_ <- ZIO.debug(s"decoded person object: $decodedPerson")
} yield ()
def toHex(bytes: Chunk[Byte]): String =
bytes.map("%02x".format(_)).mkString(" ")
}
The output:
AvroCodec Example:
encoded person object: 08 4a 6f 68 6e 54
decoded person object: Person(John,42)
Annotations
The Apache Avro specification supports some attributes for describing the data which are not part of the default ZIO Schema. To support these extra metadata, we can use annotations defined in the zio.schema.codec.AvroAnnotations
object.
There tons of annotations that we can use. Let's introduce some of them:
@AvroAnnotations.name(name: String)
: To change the name of a field or a record.@AvroAnnotations.namespace(namespace: String)
: To add the namespace for a field or a record.@AvroAnnotations.doc(doc: String)
: To add documentation to a field or a record.@AvroAnnotations.aliases(aliases: Set[String])
: To add aliases to a field or a record.@AvroAnnotations.avroEnum
: To treat a sealed trait as an Avro enum.@AvroAnnotations.scale(scale: Int = 24)
and@AvroAnnotations.precision(precision: Int = 48)
: To describe the scale and precision of a decimal field.@AvroAnnotations.decimal(decimalType: DecimalType)
: Used to annotate aBigInteger
orBigDecimal
type to indicate the logical type encoding (avro bytes or avro fixed).@AvroAnnotations.bytes(bytesType: BytesType)
: Used to annotate a Byte type to indicate the avro type encoding (avro bytes or avro fixed).@AvroAnnotations.formatToString
: Used to annotate fields of typeLocalDate
,LocalTime
,LocalDateTime
orInstant
in order to render them as a string using the given formatter instead of rendering them as avro logical types.@AvroAnnotations.timeprecision(timeprecisionType: TimePrecisionType)
: Used to indicate the precision (millisecond precision or microsecond precision) of avro logical typesTime
,Timestamp
andLocal timestamp
@AvroAnnotations.error
: Used to annotate a record in order to render it as a avro error record@AvroAnnotations.fieldOrder(fieldOrderType: FieldOrderType)
: Used to indicate the avro field order of a record
For example, to change the name of a field in the Avro schema, we can use the AvroAnnotations.name
annotation:
import zio.schema.Schema
import zio.schema.DeriveSchema
import zio.schema.codec.AvroAnnotations
@AvroAnnotations.name("User")
case class Person(name: String, age: Int)
object Person {
implicit val schema: Schema[Person] = DeriveSchema.gen
}
Now, if we generate the Avro schema for the Person
class, we will see that the name of the record is User
instead of Person
:
import zio._
import zio.schema.Schema
import zio.schema.DeriveSchema
import zio.schema.codec.AvroSchemaCodec
object Main extends ZIOAppDefault {
def run =
for {
_ <- ZIO.debug("AvroSchemaCodec Example with annotations:")
avroSchema <- ZIO.fromEither(AvroSchemaCodec.encode(Person.schema))
_ <- ZIO.debug(s"The person schema in Avro Schema JSON format: $avroSchema")
} yield ()
}
The output:
The person schema in Avro Schema JSON format: {"type":"record","name":"User","fields":[{"name":"name","type":"string"},{"name":"age","type":{"type":"bytes","logicalType":"decimal","precision":48,"scale":24}}]}