We already talked about Avro, the syntax for the schema, how to build a consumer and a producer and kafka in general. So let’s mix it all together for an article about implementing kafka producer and consumer using Avro in Kotlin.

I will be mainly relying on confluent for the avro library and its registry, because that is the main company in the field, but Kafka being open source, it could be homemade (but that would be a bit of waste not reusing what’s already there). We will start straight with the implementation using the confluent schema registry, and then go into more details about how it works.

Implementation

Gradle plugin to generate POJO from avro

It will be generated in Java, but that does not matter for Kotlin. I will be using davidmc24 avro generation gradle plugin.

Let’s have a look at the build.gradle.kts to make it work, be sure to put your avro schemas within the source folder in src/main/avro for it to work properly. It is the standard, but that can be modified in the generation task as well.

import com.github.davidmc24.gradle.plugin.avro.GenerateAvroJavaTask

plugins {
 kotlin("jvm") version "1.3.50"
 java
 id("com.github.davidmc24.gradle.plugin.avro-base") version "1.2.0"
}

repositories {
 gradlePluginPortal()
}

// To add the generateAvroJava task 
tasks.register<GenerateAvroJavaTask>("generateAvroJava") {
 source("src/main/avro")
 setOutputDir(file("build/generated-main-avro-java"))
}

// To generate avro classes before compiling Kotlin
tasks.withType<KotlinCompile> {
 kotlinOptions {
  freeCompilerArgs = listOf("-Xjsr305=strict")
  jvmTarget = "1.8"
 }

 dependsOn("generateAvroJava") // So avro is generated
}

Now you can try to generate the schema using gradle generateAvroJava or it should be generated automatically when building the gradle project.

Extra libraries

For the producer and consumer to use avro, we’ll need some more confluent libraries, so let’s add that into our build file:

repositories {
    mavenCentral()
    maven("http://packages.confluent.io/maven/")
}

dependencies {
    implementation("org.apache.kafka:kafka-clients:2.3.0")
    implementation("org.apache.avro:avro:1.10.2")
    implementation("org.apache.avro:avro-tools:1.10.2")
    implementation("io.confluent:kafka-avro-serializer:5.5.1")
}

We need an extra repository to fetch the confluent-specific libraries, like the avro serializer to use with the confluent schema registry. Without the registry, you could one generic for bytes (avro schema needs to be serialized as bytes) from org.apache.kafka.common.serialization which is the open source kafka library.

Producer

Now that we have the necessary package, let’s configure our producer. We’ll be using some of the default properties, and use the variables for the name of the ones we want to update for our avro producer:

import io.confluent.kafka.serializers.KafkaAvroSerializer
import io.confluent.kafka.serializers.KafkaAvroSerializerConfig
import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig

import avro.model.PositionKey
import avro.model.PositionValue

fun kafkaAvroProducer(): KafkaProducer<PositionKey, PositionValue> {
  val settings = Properties()
  settings[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
  settings[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
  settings[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = KafkaAvroSerializer::class.java
  settings[KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://schema-registry:8081"
  settings[KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY] = TopicRecordNameStrategy::class.java
  return KafkaProducer(settings)
}

To send records, it’s the same as any other producer, build the record with the avro objects and use producer.send(record) and off it goes. The schema registry is hardcoded for the sake of this example, but you need to pass the url for the producer to use it.

Since we’re using avro, we set the value serializer to the KafkaAvroSerializer, which will serialize the data using the schema’s id from the register. The VALUE_SUBJECT_NAME_STRATEGY does not necessarily need to be set, but we will be talking about the different strategies later, so it’s important to know which option it matches to.

Consumer

We’re going to use the schema registry in this example as well, we should have all the dependencies from before. The construction of the consumer is similar to the producer one.

fun kafkaAvroConsumer(): KafkaConsumer<PositionKey, PositionValue> {
  val settings = Properties()
  settings[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
  settings[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
  settings[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = KafkaAvroDeserializer::class.java
  settings[KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG] = "true"
  settings[KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://schema-registry:8081"

  return KafkaConsumer(settings)
}

The SPECIFIC_AVRO_READER_CONFIG is to parse it to the correct Java object generated by the plugin from the avro schema when you only expect one type, without it, it will be treated as a GenericRecord which can then be cast to another object.

For the consumer, if integrated with spring kafka, you can use a @KafkaListener as we looked before, or you can just use a consumer.poll(Duration.ofMillis(100)).forEach { printf("${it.value()}") } which is more low level from the kafka library.

How does the schema registry work?

The avro schema registry is to ensure compliance, but you could use avro schema without it and serialize/deserialize as bytes with a ByteArraySerializer/ByteArrayDeserializer, you can build the avro’s POJO from the bytes using a method from the generated class: PositionValue.fromByteBuffer(ByteBuffer.wrap(record.value));.

The other way around is true. You could use a registry without avro, only using json (which is compatible with the confluent registry)

Let’s first have a diagram of the flow, then highlight the interesting parts.

Diagram

Let’s say we have a producer, a consumer, a schema registry and a kafka broker, here’s the sequence diagram of how a message would be sent using an avro schema and the registry (like we learned how to configure above in kotlin):

sequenceDiagram participant P as Producer participant R as Schema Registry participant K as Kafka participant C as Consumer P -->> R: Register schema R -->> P: Send schema id Note Right of R: The schema is uploaded
in the registry P ->> K: Send message (Using schema id) Note Right of P: The schema itself
is not sent.
Only the id and payload. K ->> C: Consume message C --> R: Check schema compatibility Note right of C: Record is processed
successfully.

The schema registry has a REST endpoint to interact with it. This is used under the hood by both producer and consumer. So it might be a bit slower on the first run, but then once register the producer does not need to re-register for each new message to send.

The registered schema id is used for the avro serialization and deserialization.

Schemas in the registry

The registry saves the schema resource, which consists of the schema itself, a subject and a version. The subject is defined by the Schema Registry as scope in which schemas can evolve, and the version is used to check compatibility. The REST endpoint for the registry is pretty straightforward and can be used to validate the registered resources.

The schema saved in the registry may evolve depending on your compatibility configuration, that why we talked about versions.

For example, setting the compatibility to:

  • BACKWARD allows you to:
    • add new optional fields
    • delete old fields
  • FORWARD allows you to:
    • add new fields
    • delete optional fields

An optional field means that it does need to be present and will fall back to a default value like null, while a not optional field must be present and have a value which can be null.

Schema and topic name strategy

The naming strategy is related to how the schema’s subject will be created, and drives the compatibility of a schema in a topic for its consumption.

There are some main strategies that we explored before and can be configured using:

# E.g. for the RecordNameStrategy strategy
value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy

The main confluent strategies are:

  • TopicNameStrategy
    • To allow on one single type of message to be published on the topic (all schemas must be compatible with each other)
    • The subject for the schema resource will be built as {topic name}-value for the payload.
    • Default strategy
  • RecordNameStrategy
    • To allow multiple types of event in one topic and order needs to be maintained
    • Compatibility is checked on the type level regardless of the topic
    • The subject for the schema will be {namespace}.{name} (e.g.: com.github.avro.PositionValue known as record’s name too)
  • TopicRecordNameStrategy
    • To allow multiple types of messages in one topic
    • Compatibility on the current topic only, so if the schema has changed on another topic, it won’t impact this topic’s compatibility.
    • The subject for the schema resource will be {topic name}-{namespace}.{name}.

If you encounter a SerializationException, then it might be due to an issue with your schema, and you might either revise it or re-think your name strategy for that topic. In case of mistake, you could use the DELETE API, or go with a v2 version of your topic/schema.

I suggest reviewing the confluent’s fundamentals on the schema registry if some concepts are still fuzzy, this should unfizz them! 🙃