Configuring your Spring Boot application to use multiple Kafka clusters is a fairly uncommon requirement. In this post, I’ll show you how to do it.
Table of contents#
Open Table of contents
Intro#
Typically, you only need to configure a single Kafka cluster in your Spring Boot application. However, there are some scenarios where you might need to connect to multiple Kafka clusters. For example, you need to connect to a cloud based and an on-prem cluster.
How to configure a Spring Boot application to use multiple Kafka clusters#
Normally spring boot will autoconfigure a ConcurrentKafkaListenerContainerFactory
for you. This is the factory that creates the KafkaListenerContainer
instances for you. However, if you need to connect to multiple Kafka clusters, you’ll need to create your own ConcurrentKafkaListenerContainerFactory
beans, and configure them to use the correct ConsumerFactory
beans.
Here’s an example of how you can do this in Kotlin:
import org.springframework.boot.autoconfigure.kafka.KafkaProperties
import org.springframework.boot.context.properties.ConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
@Configuration
@ConfigurationProperties(prefix = "kafka")
class KafkaConfiguration {
lateinit var onPrem: KafkaProperties?
lateinit var cloud: KafkaProperties?
@Bean
fun onPremKafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> =
onPrem.kafkaConsumerFactory()
?: throw IllegalStateException("On-prem Kafka properties not found")
@Bean
fun cloudKafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> =
cloud.kafkaConsumerFactory()
?: throw IllegalStateException("Cloud Kafka properties not found")
}
private fun KafkaProperties.kafkaConsumerFactory(): ConcurrentKafkaListenerContainerFactory<String, String>? {
val concurrentKafkaListenerContainerFactory = ConcurrentKafkaListenerContainerFactory<String, String>()
concurrentKafkaListenerContainerFactory.consumerFactory = DefaultKafkaConsumerFactory(buildConsumerProperties())
concurrentKafkaListenerContainerFactory.setConcurrency(listener.concurrency ?: 1)
concurrentKafkaListenerContainerFactory.isBatchListener = listener.type == KafkaProperties.Listener.Type.BATCH
return concurrentKafkaListenerContainerFactory
}
As you notice, using the @ConfigurationProperties
annotation, we can bind the Kafka properties to the KafkaProperties
class.
kafka.on-prem.consumer.bootstrap-servers=bootstrap.onprem.datacenter.com:9092
kafka.on-prem.consumer.max-poll-records=2000
kafka.on-prem.listener.concurrency=4
kafka.on-prem.listener.type=BATCH
kafka.on-prem.properties.sasl.mechanism=SCRAM-SHA-512
kafka.on-prem.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username='${on-prem.user}' password='${on-prem.password}';
kafka.on-prem.security.protocol=SASL_SSL
kafka.on-prem.ssl.trust-store-type=PKCS12
kafka.on-prem.ssl.trust-store-location=/stores/truststore.p12
kafka.on-prem.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.on-prem.properties.spring.deserializer.key.delegate.class=org.apache.kafka.common.serialization.StringDeserializer
kafka.on-prem.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
kafka.on-prem.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
kafka.on-prem.properties.spring.json.trusted.packages=*
kafka.on-prem.consumer.auto-offset-reset=earliest
kafka.on-prem.consumer.group-id=${application.name}
kafka.cloud.consumer.bootstrap-servers=bootstrap.in.the.confluent.cloud:9092
kafka.cloud.consumer.key-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
kafka.cloud.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
kafka.cloud.properties.spring.deserializer.key.delegate.class=io.confluent.kafka.serializers.KafkaAvroDeserializer
kafka.cloud.properties.spring.deserializer.value.delegate.class=io.confluent.kafka.serializers.KafkaAvroDeserializer
kafka.cloud.consumer.properties.specific.avro.reader=true
kafka.cloud.consumer.group-id=${application.name}
kafka.cloud.consumer.auto-offset-reset=earliest
kafka.cloud.properties.sasl.mechanism=PLAIN
kafka.cloud.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='${cloud.user}' password='${cloud.password}';
kafka.cloud.security.protocol=SASL_SSL
kafka.cloud.properties.basic.auth.credentials.source=USER_INFO
kafka.cloud.properties.basic.auth.user.info=${schema.user}:${schema.password}
kafka.cloud.properties.schema.registry.url=https://somewhere.in.the.confluent.cloud
We are configuring two completely different Kafka clusters. Using different deserializers, different security protocols, and different SASL mechanisms.
Now in order to use these ConcurrentKafkaListenerContainerFactory
beans, you can use the @KafkaListener
annotation and specify the containerFactory
attribute.
@KafkaListener(topics = ["topic1"], containerFactory = "onPremKafkaListenerContainerFactory")
fun consumeTransactions(records: List<PayLoads>) {
log.info("Received ${records.size} from topic1")
}