Showing posts with label Multiple Producers. Show all posts
Showing posts with label Multiple Producers. Show all posts

Friday, April 12, 2019

Spring and more Kafka

In the last post, we saw how to integrate Kafka with Spring Boot application. The post was a very simple implementation of Kafka. The real world is much more complex. You have to deal with multiple topics, you need multiple partitions. In this post, we explore more details of a spring boot application with Kafka.
Let's start with the fact that we have multiple topics that we are dealing with. The very first thing that we need to do is to separate out the properties for each of the topics.

We can no longer use default KafkaTemplate, we will have to create our own. The best way is to define a KafkaProducer. To create a KafkaProducer, we need to create KarkaProducerConfig. Here are two sets of KafkaProducerConfig class, one each for the topic.


Looking at the producer config, we can observe that we create two beans which are qualified by names and return an appropriate KafkaTemplate which is later used to send the message. Similar to producer config, we need to create consumer config. Consumer config is used by the listener to listen for the message.


As we can see in each of the consumer configs, we create a bean which returns a ConcurrentKafkaListenerContainerFactory. The beans are qualified by a name so that we can use an appropriate container factory for receiving messages.
We also modify the MyTopicMessage and add a member variable topicName that will help us distinguish the topic to which the message needs to be sent.

We also modify the endpoint so that the message can be sent to the appropriate topic.

Now we modify the Listener to integrate everything so that the appropriate message can be received. Look at @KafkaListener annotation, we pass on the ListenerContainerFactory as an argument to receive a message from a queue.
Now we can test the server. The flow of the service is as below.

  1. The message is posted to the endpoint as a POST request
  2. The @RestController receives the message and based on the topicName in the request, it sends to message to the topic with the same name.
  3. The listener receives the message from the appropriate queue.
$ curl -X POST \
>   'http://localhost:8081/send?token=3193fa24-a0ba-451b-83ff-eb563c3fd43b-cdf12811-7e41-474b-8fa6-e8fefd4a738c' \
>   -H 'Content-Type: application/json' \
>   -H 'Postman-Token: 15fbe075-9c80-4af9-a797-6b5e0979fd1b' \
>   -H 'cache-control: no-cache' \
>   -H 'token: 3193fa24-a0ba-451b-83ff-eb563c3fd43b-cdf12811-7e41-474b-8fa6-e8fefd4a738c' \
>   -d '{
> "message" : "This is my message!",
> "topicName" : "FirstTopic"
> }'
Success!

The message receipt is indicated in the server log.
2019-04-12 14:50:47.142  INFO 51396 --- [ntainer#0-0-C-1] i.s.b.t.listeners.KafkaMessageListener   : Received FirstTopic message for partition 0 This is my message!
Similarly, we can send a message to SecondTopic.
$ curl -X POST   'http://localhost:8?token=3193fa24-a0ba-451b-83ff-eb563c3fd43b-cdf12811-7e41-474b-8fa6-e8fefd4a738c'   -H 'Content-Type: application/json'   -H 'Postman-Token: 15fbe075-9c80-4af9-a797-6b5e0979fd1b'   -H 'cache-control: no-cache'   -H 'token: 3193fa24-a0ba-451b-83ff-eb563c3fd43b-cdf12811-7e41-474b-8fa6-e8fefd4a738c'   -d '{
"message" : "This is another message!",
"topicName" : "SecondTopic"
}'
Success!

The message receipt is indicated in the server log.
2019-04-12 14:53:14.972  INFO 51396 --- [ntainer#1-0-C-1] i.s.b.t.listeners.KafkaMessageListener   : Received SecondTopic message for partition 0 This is another message!
The complete code base for this tutorial can be found at my github repository at v1.4.