Showing posts with label Kafka. Show all posts
Showing posts with label Kafka. Show all posts

Wednesday, July 29, 2020

Publishing Application Events using Kafka and Spring Transaction Events

Many applications have a need to publish application level events based on some operations happening in the system.  Here is one requirement that I had recently. Basically in my tutorial system, if there is new user created, I need to publish a UserCreated event. Here is how we can accomplish this using Transaction Events provided by Springframework.
First I create a event pojo that will be used to publish the event.

Now we create an event listener.

As you can see within the event listener, I have a handler method that is a hook to a particular phase in transaction commit cycle. At different phases of commit cycle, we are publishing an event to Kafka topic 'FirstTopic'

Now we create an event publisher.
The event publisher just constructs the event object and publishes the event.
We add a createUser method within the user service and mark it @Transactional. This is important to make sure any exceptions coming out of createUser transaction are caught and propagated appropriately.

 
Now within the user registration, we add a publish within a transaction. Please be careful that the code block that is publishing the event is within a transaction otherwise these don't perform any operations. 
Look at the createUser method that is implementing /user request mapping and method POST. The penultimate line in the method is publishing the event. Now let's test this code.

Just to test the code, we also run a kafka-console-consumer listening to the topic to which our code is publishing the message.
First we try to create a user that is already existing.

$ curl --request POST   --url http://localhost:8081/tenant1/registration/user   \
 --header 'authorization: Basic c3VwZXJzZWNyZXRjbGllbnQ6c3VwZXJzZWNyZXRjbGllbnQxMjM='   \
 --header 'cache-control: no-cache'   \
 --header 'content-type: application/json'   \
 --header 'postman-token: 71180383-2b1f-482e-2fcc-2a23d045b205'   \
 --header 'x-tenant: tenant1'   \
 --data '{"username": "admin8","password": "admin1234","audience" : "self"}' | python -m json.tool
 {
    "error": "Conflict",
    "message": "",
    "path": "/tenant1/registration/user",
    "status": 409,
    "timestamp": 1596015557641
}

Now we try to create a user that doesn't exist.
$ curl --request POST   --url http://localhost:8081/tenant1/registration/user \
--header 'authorization: Basic c3VwZXJzZWNyZXRjbGllbnQ6c3VwZXJzZWNyZXRjbGllbnQxMjM='\
--header 'cache-control: no-cache' \
--header 'content-type: application/json'\
--header 'postman-token: 71180383-2b1f-482e-2fcc-2a23d045b205' \
--header 'x-tenant: tenant1' \
--data '{"username": "admin9","password": "admin1234","audience" : "self"}' | python -m json.tool
{
    "createdAt": 1596015670301,
    "createdBy": "UnAuthenticated",
    "id": 21,
    "mask": 1,
    "tenantId": 1,
    "updatedAt": 1596015670301,
    "updatedBy": "UnAuthenticated",
    "username": "admin9"
}
We can also check our Kafka consumer that we ran earlier.
$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic FirstTopic
UserCreatedEvent(userId=20, username=admin8, fullname=null, status=BeforeCommit)
UserCreatedEvent(userId=20, username=admin8, fullname=null, status=AfterRollback)
UserCreatedEvent(userId=20, username=admin8, fullname=null, status=AfterCompletion)
UserCreatedEvent(userId=21, username=admin9, fullname=null, status=BeforeCommit)
UserCreatedEvent(userId=21, username=admin9, fullname=null, status=AfterCommit)
UserCreatedEvent(userId=21, username=admin9, fullname=null, status=AfterCompletion)
The first three events are for the failed transaction which rolled back and the last three are for the successful transaction.

Friday, April 12, 2019

Spring and more Kafka

In the last post, we saw how to integrate Kafka with Spring Boot application. The post was a very simple implementation of Kafka. The real world is much more complex. You have to deal with multiple topics, you need multiple partitions. In this post, we explore more details of a spring boot application with Kafka.
Let's start with the fact that we have multiple topics that we are dealing with. The very first thing that we need to do is to separate out the properties for each of the topics.

We can no longer use default KafkaTemplate, we will have to create our own. The best way is to define a KafkaProducer. To create a KafkaProducer, we need to create KarkaProducerConfig. Here are two sets of KafkaProducerConfig class, one each for the topic.


Looking at the producer config, we can observe that we create two beans which are qualified by names and return an appropriate KafkaTemplate which is later used to send the message. Similar to producer config, we need to create consumer config. Consumer config is used by the listener to listen for the message.


As we can see in each of the consumer configs, we create a bean which returns a ConcurrentKafkaListenerContainerFactory. The beans are qualified by a name so that we can use an appropriate container factory for receiving messages.
We also modify the MyTopicMessage and add a member variable topicName that will help us distinguish the topic to which the message needs to be sent.

We also modify the endpoint so that the message can be sent to the appropriate topic.

Now we modify the Listener to integrate everything so that the appropriate message can be received. Look at @KafkaListener annotation, we pass on the ListenerContainerFactory as an argument to receive a message from a queue.
Now we can test the server. The flow of the service is as below.

  1. The message is posted to the endpoint as a POST request
  2. The @RestController receives the message and based on the topicName in the request, it sends to message to the topic with the same name.
  3. The listener receives the message from the appropriate queue.
$ curl -X POST \
>   'http://localhost:8081/send?token=3193fa24-a0ba-451b-83ff-eb563c3fd43b-cdf12811-7e41-474b-8fa6-e8fefd4a738c' \
>   -H 'Content-Type: application/json' \
>   -H 'Postman-Token: 15fbe075-9c80-4af9-a797-6b5e0979fd1b' \
>   -H 'cache-control: no-cache' \
>   -H 'token: 3193fa24-a0ba-451b-83ff-eb563c3fd43b-cdf12811-7e41-474b-8fa6-e8fefd4a738c' \
>   -d '{
> "message" : "This is my message!",
> "topicName" : "FirstTopic"
> }'
Success!

The message receipt is indicated in the server log.
2019-04-12 14:50:47.142  INFO 51396 --- [ntainer#0-0-C-1] i.s.b.t.listeners.KafkaMessageListener   : Received FirstTopic message for partition 0 This is my message!
Similarly, we can send a message to SecondTopic.
$ curl -X POST   'http://localhost:8?token=3193fa24-a0ba-451b-83ff-eb563c3fd43b-cdf12811-7e41-474b-8fa6-e8fefd4a738c'   -H 'Content-Type: application/json'   -H 'Postman-Token: 15fbe075-9c80-4af9-a797-6b5e0979fd1b'   -H 'cache-control: no-cache'   -H 'token: 3193fa24-a0ba-451b-83ff-eb563c3fd43b-cdf12811-7e41-474b-8fa6-e8fefd4a738c'   -d '{
"message" : "This is another message!",
"topicName" : "SecondTopic"
}'
Success!

The message receipt is indicated in the server log.
2019-04-12 14:53:14.972  INFO 51396 --- [ntainer#1-0-C-1] i.s.b.t.listeners.KafkaMessageListener   : Received SecondTopic message for partition 0 This is another message!
The complete code base for this tutorial can be found at my github repository at v1.4.

Wednesday, April 10, 2019

Kafka and Spring

Kafka has become a very popular platform and is being used as a stream, journal and even eventing system. In this post, we explore how to integrate Kafka with spring framework application. First, we add the Kafka bootstrap server details in the application.properties file.

Let's also add dependencies in pom.xml.

Now, for each Kafka topic, we create a listener class. The listener class provides a callback method that is called when any message is retrieved on that topic.

Now we create an endpoint through which we inject a message in the queue. The message is sent to the queue and is retrieved by the listener.

We autowire a KafkaTemplate instance that is used to send the message to the queue.

$ curl -X POST \
>   'http://localhost:8081/send?token=3193fa24-a0ba-451b-83ff-eb563c3fd43b-cdf12811-7e41-474b-8fa6-e8fefd4a738c' \
>   -H 'Content-Type: application/json' \
>   -H 'Postman-Token: e281e3c5-0dae-4bb7-ac8d-6555f66a18c6' \
>   -H 'cache-control: no-cache' \
>   -H 'token: 3193fa24-a0ba-451b-83ff-eb563c3fd43b-cdf12811-7e41-474b-8fa6-e8fefd4a738c' \
>   -d '{
> "message" : "This is my message!"
> }'
Message sent successfully!.
The receipt of message is indicated in the spring server log.

2019-04-10 14:28:03.969  INFO 31091 --- [ntainer#0-0-C-1] i.s.b.t.listeners.MyTopicKafkaListener   : Received Promise message This is my message!