In the last post, we saw how to integrate Kafka with Spring Boot application. The post was a very simple implementation of Kafka. The real world is much more complex. You have to deal with multiple topics, you need multiple partitions. In this post, we explore more details of a spring boot application with Kafka.
As we can see in each of the consumer configs, we create a bean which returns a ConcurrentKafkaListenerContainerFactory. The beans are qualified by a name so that we can use an appropriate container factory for receiving messages.
We also modify the MyTopicMessage and add a member variable topicName that will help us distinguish the topic to which the message needs to be sent.
We also modify the endpoint so that the message can be sent to the appropriate topic.
Now we modify the Listener to integrate everything so that the appropriate message can be received. Look at @KafkaListener annotation, we pass on the ListenerContainerFactory as an argument to receive a message from a queue.
Now we can test the server. The flow of the service is as below.
The message receipt is indicated in the server log.
The message receipt is indicated in the server log.
Let's start with the fact that we have multiple topics that we are dealing with. The very first thing that we need to do is to separate out the properties for each of the topics.
We can no longer use default KafkaTemplate, we will have to create our own. The best way is to define a KafkaProducer. To create a KafkaProducer, we need to create KarkaProducerConfig. Here are two sets of KafkaProducerConfig class, one each for the topic.
Looking at the producer config, we can observe that we create two beans which are qualified by names and return an appropriate KafkaTemplate which is later used to send the message. Similar to producer config, we need to create consumer config. Consumer config is used by the listener to listen for the message.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
spring.kafka.topic1.bootstrap-servers=localhost:9092 | |
spring.kafka.topic1.consumer.group-id=firstGroupId | |
spring.kafka.topic2.bootstrap-servers=localhost:9092 | |
spring.kafka.topic2.consumer.group-id=secondGroupId |
We can no longer use default KafkaTemplate, we will have to create our own. The best way is to define a KafkaProducer. To create a KafkaProducer, we need to create KarkaProducerConfig. Here are two sets of KafkaProducerConfig class, one each for the topic.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package in.springframework.blog.tutorials.configs; | |
import in.springframework.blog.tutorials.Constants; | |
import org.apache.kafka.clients.producer.ProducerConfig; | |
import org.apache.kafka.common.serialization.StringSerializer; | |
import org.springframework.beans.factory.annotation.Value; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.kafka.core.DefaultKafkaProducerFactory; | |
import org.springframework.kafka.core.KafkaTemplate; | |
import org.springframework.kafka.core.ProducerFactory; | |
import org.springframework.stereotype.Component; | |
import java.util.HashMap; | |
import java.util.Map; | |
@Component | |
public class FirstTopicProducerConfig { | |
@Value("${spring.kafka.topic1.bootstrap-servers:localhost}") | |
private String bootstrapAddress; | |
public ProducerFactory<String, String> producerFactory() { | |
Map<String, Object> configProps = new HashMap<>(); | |
configProps.put( | |
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, | |
bootstrapAddress); | |
configProps.put( | |
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, | |
StringSerializer.class); | |
configProps.put( | |
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, | |
StringSerializer.class); | |
return new DefaultKafkaProducerFactory<>(configProps); | |
} | |
@Bean(name = Constants.FIRST_TOPIC_TEMPLATE_NAME) | |
public KafkaTemplate<String, String> kafkaTemplate() { | |
return new KafkaTemplate<>(producerFactory()); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package in.springframework.blog.tutorials.configs; | |
import in.springframework.blog.tutorials.Constants; | |
import org.apache.kafka.clients.producer.ProducerConfig; | |
import org.apache.kafka.common.serialization.StringSerializer; | |
import org.springframework.beans.factory.annotation.Value; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.kafka.core.DefaultKafkaProducerFactory; | |
import org.springframework.kafka.core.KafkaTemplate; | |
import org.springframework.kafka.core.ProducerFactory; | |
import org.springframework.stereotype.Component; | |
import java.util.HashMap; | |
import java.util.Map; | |
@Component | |
public class SecondTopicProducerConfig { | |
@Value("${spring.kafka.topic2.bootstrap-servers:localhost}") | |
private String bootstrapAddress; | |
public ProducerFactory<String, String> producerFactory() { | |
Map<String, Object> configProps = new HashMap<>(); | |
configProps.put( | |
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, | |
bootstrapAddress); | |
configProps.put( | |
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, | |
StringSerializer.class); | |
configProps.put( | |
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, | |
StringSerializer.class); | |
return new DefaultKafkaProducerFactory<>(configProps); | |
} | |
@Bean(name = Constants.SECOND_TOPIC_TEMPLATE_NAME) | |
public KafkaTemplate<String, String> kafkaTemplate() { | |
return new KafkaTemplate<>(producerFactory()); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package in.springframework.blog.tutorials.configs; | |
import org.apache.kafka.clients.consumer.ConsumerConfig; | |
import org.apache.kafka.common.serialization.StringDeserializer; | |
import org.springframework.beans.factory.annotation.Value; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
import org.springframework.kafka.annotation.EnableKafka; | |
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; | |
import org.springframework.kafka.core.ConsumerFactory; | |
import org.springframework.kafka.core.DefaultKafkaConsumerFactory; | |
import java.util.HashMap; | |
import java.util.Map; | |
@EnableKafka | |
@Configuration | |
public class FirstTopicConsumerConfig { | |
@Value("${spring.kafka.topic1.bootstrap-servers:localhost}") | |
private String bootstrapAddress; | |
@Value("${spring.kafka.topic1.group-id:firstgroup}") | |
private String groupId; | |
public ConsumerFactory<String, String> consumerFactory() { | |
Map<String, Object> props = new HashMap<>(); | |
props.put( | |
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, | |
bootstrapAddress); | |
props.put( | |
ConsumerConfig.GROUP_ID_CONFIG, | |
groupId); | |
props.put( | |
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, | |
StringDeserializer.class); | |
props.put( | |
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, | |
StringDeserializer.class); | |
return new DefaultKafkaConsumerFactory<>(props); | |
} | |
@Bean(name="firstTopicListenerContainerFactory") | |
public ConcurrentKafkaListenerContainerFactory<String, String> | |
kafkaListenerContainerFactory() { | |
ConcurrentKafkaListenerContainerFactory<String, String> factory | |
= new ConcurrentKafkaListenerContainerFactory<>(); | |
factory.setConsumerFactory(consumerFactory()); | |
return factory; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package in.springframework.blog.tutorials.configs; | |
import org.apache.kafka.clients.consumer.ConsumerConfig; | |
import org.apache.kafka.common.serialization.StringDeserializer; | |
import org.springframework.beans.factory.annotation.Value; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
import org.springframework.kafka.annotation.EnableKafka; | |
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; | |
import org.springframework.kafka.core.ConsumerFactory; | |
import org.springframework.kafka.core.DefaultKafkaConsumerFactory; | |
import java.util.HashMap; | |
import java.util.Map; | |
@EnableKafka | |
@Configuration | |
public class SecondTopicConsumerConfig { | |
@Value("${spring.kafka.topic2.bootstrap-servers:localhost}") | |
private String bootstrapAddress; | |
@Value("${spring.kafka.topic2.group-id:secondgroup}") | |
private String groupId; | |
public ConsumerFactory<String, String> consumerFactory() { | |
Map<String, Object> props = new HashMap<>(); | |
props.put( | |
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, | |
bootstrapAddress); | |
props.put( | |
ConsumerConfig.GROUP_ID_CONFIG, | |
groupId); | |
props.put( | |
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, | |
StringDeserializer.class); | |
props.put( | |
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, | |
StringDeserializer.class); | |
return new DefaultKafkaConsumerFactory<>(props); | |
} | |
@Bean(name="secondTopicListenerContainerFactory") | |
public ConcurrentKafkaListenerContainerFactory<String, String> | |
kafkaListenerContainerFactory() { | |
ConcurrentKafkaListenerContainerFactory<String, String> factory | |
= new ConcurrentKafkaListenerContainerFactory<>(); | |
factory.setConsumerFactory(consumerFactory()); | |
return factory; | |
} | |
} |
As we can see in each of the consumer configs, we create a bean which returns a ConcurrentKafkaListenerContainerFactory. The beans are qualified by a name so that we can use an appropriate container factory for receiving messages.
We also modify the MyTopicMessage and add a member variable topicName that will help us distinguish the topic to which the message needs to be sent.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package in.springframework.blog.tutorials.pojos; | |
import in.springframework.blog.tutorials.Constants; | |
public class MyTopicMessage { | |
public MyTopicMessage() { | |
} | |
public MyTopicMessage(String message, Constants.TOPIC_NAME topicName) { | |
this.message = message;this.topicName = topicName; | |
} | |
public String getMessage() { | |
return message; | |
} | |
public void setMessage(String message) { | |
this.message = message; | |
} | |
public Constants.TOPIC_NAME getTopicName() { | |
return topicName; | |
} | |
public void setTopicName(Constants.TOPIC_NAME topicName) { | |
this.topicName = topicName; | |
} | |
private String message; | |
private Constants.TOPIC_NAME topicName; | |
} |
We also modify the endpoint so that the message can be sent to the appropriate topic.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package in.springframework.blog.tutorials.endpoints; | |
import in.springframework.blog.tutorials.Constants; | |
import in.springframework.blog.tutorials.MyConstants; | |
import in.springframework.blog.tutorials.pojos.MyTopicMessage; | |
import lombok.extern.slf4j.Slf4j; | |
import org.apache.log4j.Level; | |
import org.apache.log4j.Logger; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.beans.factory.annotation.Qualifier; | |
import org.springframework.http.MediaType; | |
import org.springframework.kafka.core.KafkaTemplate; | |
import org.springframework.security.access.prepost.PreAuthorize; | |
import org.springframework.util.concurrent.ListenableFutureCallback; | |
import org.springframework.web.bind.annotation.RequestBody; | |
import org.springframework.web.bind.annotation.RequestMapping; | |
import org.springframework.web.bind.annotation.RequestMethod; | |
import org.springframework.web.bind.annotation.RestController; | |
@RestController | |
@RequestMapping("/send") | |
public class MyTopicMessageSenderEndpoint { | |
@Autowired | |
@Qualifier(Constants.FIRST_TOPIC_TEMPLATE_NAME) | |
private KafkaTemplate firstKafkaTemplate; | |
@Autowired | |
@Qualifier(Constants.SECOND_TOPIC_TEMPLATE_NAME) | |
private KafkaTemplate secondKafkaTemplate; | |
Logger logger = Logger.getLogger(MyTopicMessageSenderEndpoint.class); | |
@RequestMapping(method = RequestMethod.POST, produces = MediaType.APPLICATION_JSON_VALUE) | |
@PreAuthorize(MyConstants.ANNOTATION_ROLE_USER) | |
public String sendMessage(@RequestBody MyTopicMessage message) { | |
switch(message.getTopicName()) { | |
case FirstTopic: | |
firstKafkaTemplate.send(Constants.FIRST_TOPIC, message.getMessage()).addCallback(new ListenableFutureCallback() { | |
@Override | |
public void onFailure(Throwable throwable) { | |
logger.log(Level.INFO, String.format("Message '%s' failed!", message.getMessage())); | |
} | |
@Override | |
public void onSuccess(Object o) { | |
logger.log(Level.INFO, String.format("Message '%s' sent successfully!", message.getMessage())); | |
} | |
}); | |
break; | |
case SecondTopic: | |
secondKafkaTemplate.send(Constants.SECOND_TOPIC, message.getMessage()).addCallback(new ListenableFutureCallback() { | |
@Override | |
public void onFailure(Throwable throwable) { | |
logger.log(Level.INFO, String.format("Message '%s' failed!", message.getMessage())); | |
} | |
@Override | |
public void onSuccess(Object o) { | |
logger.log(Level.INFO, String.format("Message '%s' sent successfully!", message.getMessage())); | |
} | |
}); | |
break; | |
} | |
return "Success!"; | |
} | |
} |
Now we modify the Listener to integrate everything so that the appropriate message can be received. Look at @KafkaListener annotation, we pass on the ListenerContainerFactory as an argument to receive a message from a queue.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package in.springframework.blog.tutorials.listeners; | |
import org.apache.logging.log4j.LogManager; | |
import org.apache.logging.log4j.Logger; | |
import org.springframework.kafka.annotation.KafkaListener; | |
import org.springframework.kafka.support.KafkaHeaders; | |
import org.springframework.messaging.handler.annotation.Header; | |
import org.springframework.messaging.handler.annotation.Payload; | |
import org.springframework.stereotype.Component; | |
@Component | |
public class KafkaMessageListener { | |
Logger logger = LogManager.getLogger(KafkaMessageListener.class); | |
@KafkaListener(topics = "FirstTopic", containerFactory = "firstTopicListenerContainerFactory") | |
public void processFirstTopicMessage(@Payload String content, | |
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, | |
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { | |
logger.info(String.format("Received %s message for partition %d ", topic, partition) + content); | |
} | |
@KafkaListener(topics = "SecondTopic", containerFactory = "secondTopicListenerContainerFactory") | |
public void processSecondTopicMessage(@Payload String content, | |
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, | |
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { | |
logger.info(String.format("Received %s message for partition %d ", topic, partition) + content); | |
} | |
} |
Now we can test the server. The flow of the service is as below.
- The message is posted to the endpoint as a POST request
- The @RestController receives the message and based on the topicName in the request, it sends to message to the topic with the same name.
- The listener receives the message from the appropriate queue.
$ curl -X POST \ > 'http://localhost:8081/send?token=3193fa24-a0ba-451b-83ff-eb563c3fd43b-cdf12811-7e41-474b-8fa6-e8fefd4a738c' \ > -H 'Content-Type: application/json' \ > -H 'Postman-Token: 15fbe075-9c80-4af9-a797-6b5e0979fd1b' \ > -H 'cache-control: no-cache' \ > -H 'token: 3193fa24-a0ba-451b-83ff-eb563c3fd43b-cdf12811-7e41-474b-8fa6-e8fefd4a738c' \ > -d '{ > "message" : "This is my message!", > "topicName" : "FirstTopic" > }' Success!
The message receipt is indicated in the server log.
2019-04-12 14:50:47.142 INFO 51396 --- [ntainer#0-0-C-1] i.s.b.t.listeners.KafkaMessageListener : Received FirstTopic message for partition 0 This is my message!Similarly, we can send a message to SecondTopic.
$ curl -X POST 'http://localhost:8?token=3193fa24-a0ba-451b-83ff-eb563c3fd43b-cdf12811-7e41-474b-8fa6-e8fefd4a738c' -H 'Content-Type: application/json' -H 'Postman-Token: 15fbe075-9c80-4af9-a797-6b5e0979fd1b' -H 'cache-control: no-cache' -H 'token: 3193fa24-a0ba-451b-83ff-eb563c3fd43b-cdf12811-7e41-474b-8fa6-e8fefd4a738c' -d '{ "message" : "This is another message!", "topicName" : "SecondTopic" }' Success!
The message receipt is indicated in the server log.
2019-04-12 14:53:14.972 INFO 51396 --- [ntainer#1-0-C-1] i.s.b.t.listeners.KafkaMessageListener : Received SecondTopic message for partition 0 This is another message!The complete code base for this tutorial can be found at my github repository at v1.4.