In this article, we will have an overview of how to set up kafka in a spring boot project in kotlin. From the configuration to the consumer and producer, and how to make sure it works properly with an integration test.

Installation

Since we are using Kafka with springboot, you will need to add those libraries to your project in your build.gradle.kts file in addition to the other junit and spring dependencies.

dependencies {
    implementation("org.springframework.kafka:spring-kafka")
    implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
    testImplementation("org.springframework.kafka:spring-kafka-test:2.6.5")
}

To get the regular spring project dependency, create a project with the spring initializer. As you can see there are two kafka dependencies one for the implementation and the other for the tests.

Kafka configuration

For the config, we’re using the default, and updating any value necessary. For the serializer, the default is string else you can use an Avro one (if you have set it up with the registry). For json, if it can’t deserialize it will fail in loop, it’s called the poison pill pattern. So you may want to log it and discard so your consumer doesn’t get stuck.

Here is the configuration for a consumerFactory that will be used to create our kafka consumer.

@Configuration
open class Config {

    @Autowired
    private lateinit var kafkaProperties: KafkaProperties

    private fun consumerFactory(): ConsumerFactory<String, Foo> {
        val configs = kafkaProperties.buildConsumerProperties()
        configs[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
        configs[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonDeserializer::class.java

        return DefaultKafkaConsumerFactory(configs, StringDeserializer(), JsonDeserializer<Foo>())
    }

    @Bean(name=["fooKafkaListenerContainerFactory"])
    open fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, Foo>? {
        val factory: ConcurrentKafkaListenerContainerFactory<String, Foo> = ConcurrentKafkaListenerContainerFactory()
        factory.consumerFactory = consumerFactory()
        return factory
    }
}

With springboot you can have specific kafka configuration (broker ip and such) that can be picked up automatically, so you can just autowire it without any config (it’ll use the default plus what’s in the config). Here is an example of the configuration you may have inside your application.yml file:

kafka:
  security-protocol: "${KAFKA_SECURITY_PROTOCOL:PLAINTEXT}"
  username: "${KAFKA_USERNAME:user}"
  password: "${KAFKA_PASSWORD:password}"
  bootstrap-servers: "${KAFKA_BOOTSTRAP_SERVERS:localhost:9092}"
  sasl:
    mechanism: "${KAFKA_SASL_MECHANISM:PLAIN}"
  ssl:
    enabled: false
    protocol: "TLS"
    enabled-protocols: "TLSv1.2,TLSv1.1,TLSv1"

But that level of “magic” that spring is known for, might not be the best to the best understanding of what is being used nor does it make it stable in case something is dropped in the future. I’d rather be explicit with configuration on the items that matters, this way you know where urls and credential are taken from to ease integration issues investigation.

The sensitive information is passed via environment variables, and we have a default value for local development.

Create a consumer

From a Class

We use a KafkaHandler annotation in a kafkaListener annotated class to handle different records. You must use the @Payload annotation for the consumed type.

Additional configuration can be set within the @KafkaListener annotation:

  • groupId: Consumers with the same group id will share the load of the topic (they will divide the partitions between themselves). If you set different group ids, they’ll consume the same message.
  • topics: The topic to consume from, set via the application.yml file.
  • containerFactory: The factory to use to create the consumer.
@KafkaListener(
    groupId = "foo-group",
    topics = ["\${app.topic.consumer}"],
    containerFactory = "fooKafkaListenerContainerFactory"
)
@Component
class FooConsumer {

    val foos = mutableListOf<Foo>()

    @KafkaHandler
    fun consume(@Header(KafkaHeaders.RECEIVED_TIMESTAMP) received: Long, @Payload foo: Foo) {
        println("Consuming Request: $foo received at $received")
        foos.add(foo)
        println("All received: $foos")
    }

}

This basic example will add the consumed Foo records to a list and print the output. If you have an issue with trusted packages for the json deserialization check this answer 💬, and upvote if it helped you or leave me a comment if it didn’t.

From a method

Using the class is great, to give fine grain control over the consumption, but if you have multiple messages or topic you want to consume from you may be interested to set up the consumer on a method in a component class.

The @KafkaListener works the same way as for class:

@Component
class FooListener {

    @Autowired
    lateinit var fooService: FooService

    @KafkaListener(
        groupId = "foo-listener-group",
        topics = ["\${app.topic.consumer}"],
        containerFactory = "fooKafkaListenerContainerFactory"
    )
    fun consume(foo: Foo) {
        fooService.handle(foo);
    }
}

This small listener component has a kafka consumer that calls the fooService to handle the received foo records.

Create a producer

The configuration will work similarly to the consumer, both will be using the same configuration to connect to kafka. Since we’re using the default json configuration, it’s fairly straightforward, and we can now use our kafkaTemplate (to follow the naming pattern like the restTemplate in spring) to send message to kafka:

@Configuration
open class Config {

    @Autowired
    private lateinit var kafkaProperties: KafkaProperties

    @Bean
    open fun kafkaTemplate(): KafkaTemplate<String, Foo> {
        return KafkaTemplate(DefaultKafkaProducerFactory(kafkaProperties.buildProducerProperties()), true)
    }
}

Once everything is configured properly, you can autowire the kafka template in your producer to send the record on the specified topic.

@Component
class FooProducer {

    @Autowired
    private lateinit var kafkaTemplate: KafkaTemplate<String, Foo>

    @Value("\${app.topic.producer}")
    private lateinit var topic: String

    fun send(@Payload data: Foo) {
        println("sending data:$data to topic:'$topic'")
        kafkaTemplate.send(topic, "key", data)
    }
}

Here I have created a dummy producer component that can send Foo records on a specific topic in kafka. The topic is passed via the configuration in the application.yml file.

Integration test

To test it out you can use the embedded kafka in an integration test. However, the only way to test the actual configuration will be against the actual topic. But at least if it works with locale config, you’ll know that it’s not a code issue but a config issue if it fails in prod.

Whether it’s on the class or the method, the consuming logic can be tested the same way. I will be mocking the FooService dependency, because it doesn’t do anything, but ideally you should not. Because there should be some business logic happening allowing you to make a worthwhile test.

@SpringBootTest(
    properties = [
        "spring.kafka.producer.bootstrap-servers=localhost:3392",
        "spring.kafka.consumer.bootstrap-servers=localhost:3392",
        "app.topic.consumer=test-topic",
        "app.topic.producer=test-topic"
    ]
)
@EmbeddedKafka(partitions = 1, brokerProperties = ["listeners=PLAINTEXT://localhost:3392", "port=3392"])
@DirtiesContext
internal class FooListenerTest {

    @TestConfiguration
    open class TestConfig {
        @Bean
        open fun fooService() = mockk<FooService>()
    }

    @Autowired
    private lateinit var fooService: FooService

    @Autowired
    private lateinit var fooProducer: FooProducer

    @Test
    fun consumeFoo() {
        every { fooService.handle(any()) } just runs
        val foo = Foo("example", "description")
        fooProducer.send(foo)
        verify(timeout = 1000, atMost = 5) { fooService.handle(foo) }
    }
}

In this case I am using my own producer to test my consumer, because it produces Foo records. So I don’t need in this case to create a test producer and I can validate both producer and consumer in one shot.

In this test, I am using the @SpringBootTest to specify the configuration so the application will connect to the embedded kafka that is spawned for the test. I use the @EmbeddedKafka and specify matching configuration. The @DirtiesContext is used to clean the cached context after the test.