Skip to content

How do I configure a Spring Boot application to use multiple Kafka clusters?

Published: at 03:42 AM

Configuring your Spring Boot application to use multiple Kafka clusters is a fairly uncommon requirement. In this post, I’ll show you how to do it.

Table of contents

Open Table of contents

Intro

Typically, you only need to configure a single Kafka cluster in your Spring Boot application. However, there are some scenarios where you might need to connect to multiple Kafka clusters. For example, you need to connect to a cloud based and an on-prem cluster.

How to configure a Spring Boot application to use multiple Kafka clusters

Normally spring boot will autoconfigure a ConcurrentKafkaListenerContainerFactory for you. This is the factory that creates the KafkaListenerContainer instances for you. However, if you need to connect to multiple Kafka clusters, you’ll need to create your own ConcurrentKafkaListenerContainerFactory beans, and configure them to use the correct ConsumerFactory beans.

Here’s an example of how you can do this in Kotlin:

import org.springframework.boot.autoconfigure.kafka.KafkaProperties
import org.springframework.boot.context.properties.ConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory

@Configuration
@ConfigurationProperties(prefix = "kafka")
class KafkaConfiguration {
    lateinit var onPrem: KafkaProperties?
    lateinit var cloud: KafkaProperties?

    @Bean
    fun onPremKafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> =
        onPrem.kafkaConsumerFactory()
            ?: throw IllegalStateException("On-prem Kafka properties not found")

    @Bean
    fun cloudKafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> =
        cloud.kafkaConsumerFactory()
            ?: throw IllegalStateException("Cloud Kafka properties not found")
    
}

private fun KafkaProperties.kafkaConsumerFactory(): ConcurrentKafkaListenerContainerFactory<String, String>? {
    val concurrentKafkaListenerContainerFactory = ConcurrentKafkaListenerContainerFactory<String, String>()
    concurrentKafkaListenerContainerFactory.consumerFactory = DefaultKafkaConsumerFactory(buildConsumerProperties())
    concurrentKafkaListenerContainerFactory.setConcurrency(listener.concurrency ?: 1)
    concurrentKafkaListenerContainerFactory.isBatchListener = listener.type == KafkaProperties.Listener.Type.BATCH
    return concurrentKafkaListenerContainerFactory
}

As you notice, using the @ConfigurationProperties annotation, we can bind the Kafka properties to the KafkaProperties class.

kafka.on-prem.consumer.bootstrap-servers=bootstrap.onprem.datacenter.com:9092
kafka.on-prem.consumer.max-poll-records=2000
kafka.on-prem.listener.concurrency=4
kafka.on-prem.listener.type=BATCH
kafka.on-prem.properties.sasl.mechanism=SCRAM-SHA-512
kafka.on-prem.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username='${on-prem.user}' password='${on-prem.password}';
kafka.on-prem.security.protocol=SASL_SSL
kafka.on-prem.ssl.trust-store-type=PKCS12
kafka.on-prem.ssl.trust-store-location=/stores/truststore.p12
kafka.on-prem.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.on-prem.properties.spring.deserializer.key.delegate.class=org.apache.kafka.common.serialization.StringDeserializer
kafka.on-prem.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
kafka.on-prem.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
kafka.on-prem.properties.spring.json.trusted.packages=*
kafka.on-prem.consumer.auto-offset-reset=earliest
kafka.on-prem.consumer.group-id=${application.name}

kafka.cloud.consumer.bootstrap-servers=bootstrap.in.the.confluent.cloud:9092
kafka.cloud.consumer.key-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
kafka.cloud.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
kafka.cloud.properties.spring.deserializer.key.delegate.class=io.confluent.kafka.serializers.KafkaAvroDeserializer
kafka.cloud.properties.spring.deserializer.value.delegate.class=io.confluent.kafka.serializers.KafkaAvroDeserializer
kafka.cloud.consumer.properties.specific.avro.reader=true
kafka.cloud.consumer.group-id=${application.name}
kafka.cloud.consumer.auto-offset-reset=earliest
kafka.cloud.properties.sasl.mechanism=PLAIN
kafka.cloud.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='${cloud.user}' password='${cloud.password}';
kafka.cloud.security.protocol=SASL_SSL
kafka.cloud.properties.basic.auth.credentials.source=USER_INFO
kafka.cloud.properties.basic.auth.user.info=${schema.user}:${schema.password}
kafka.cloud.properties.schema.registry.url=https://somewhere.in.the.confluent.cloud

We are configuring two completely different Kafka clusters. Using different deserializers, different security protocols, and different SASL mechanisms.

Now in order to use these ConcurrentKafkaListenerContainerFactory beans, you can use the @KafkaListener annotation and specify the containerFactory attribute.

@KafkaListener(topics = ["topic1"], containerFactory = "onPremKafkaListenerContainerFactory")
fun consumeTransactions(records: List<PayLoads>) {
    log.info("Received ${records.size} from topic1")
}