Wednesday, July 29, 2020

Publishing Application Events using Kafka and Spring Transaction Events

Many applications have a need to publish application level events based on some operations happening in the system.  Here is one requirement that I had recently. Basically in my tutorial system, if there is new user created, I need to publish a UserCreated event. Here is how we can accomplish this using Transaction Events provided by Springframework.
First I create a event pojo that will be used to publish the event.

Now we create an event listener.

As you can see within the event listener, I have a handler method that is a hook to a particular phase in transaction commit cycle. At different phases of commit cycle, we are publishing an event to Kafka topic 'FirstTopic'

Now we create an event publisher.
The event publisher just constructs the event object and publishes the event.
We add a createUser method within the user service and mark it @Transactional. This is important to make sure any exceptions coming out of createUser transaction are caught and propagated appropriately.

 
Now within the user registration, we add a publish within a transaction. Please be careful that the code block that is publishing the event is within a transaction otherwise these don't perform any operations. 
Look at the createUser method that is implementing /user request mapping and method POST. The penultimate line in the method is publishing the event. Now let's test this code.

Just to test the code, we also run a kafka-console-consumer listening to the topic to which our code is publishing the message.
First we try to create a user that is already existing.

$ curl --request POST   --url http://localhost:8081/tenant1/registration/user   \
 --header 'authorization: Basic c3VwZXJzZWNyZXRjbGllbnQ6c3VwZXJzZWNyZXRjbGllbnQxMjM='   \
 --header 'cache-control: no-cache'   \
 --header 'content-type: application/json'   \
 --header 'postman-token: 71180383-2b1f-482e-2fcc-2a23d045b205'   \
 --header 'x-tenant: tenant1'   \
 --data '{"username": "admin8","password": "admin1234","audience" : "self"}' | python -m json.tool
 {
    "error": "Conflict",
    "message": "",
    "path": "/tenant1/registration/user",
    "status": 409,
    "timestamp": 1596015557641
}

Now we try to create a user that doesn't exist.
$ curl --request POST   --url http://localhost:8081/tenant1/registration/user \
--header 'authorization: Basic c3VwZXJzZWNyZXRjbGllbnQ6c3VwZXJzZWNyZXRjbGllbnQxMjM='\
--header 'cache-control: no-cache' \
--header 'content-type: application/json'\
--header 'postman-token: 71180383-2b1f-482e-2fcc-2a23d045b205' \
--header 'x-tenant: tenant1' \
--data '{"username": "admin9","password": "admin1234","audience" : "self"}' | python -m json.tool
{
    "createdAt": 1596015670301,
    "createdBy": "UnAuthenticated",
    "id": 21,
    "mask": 1,
    "tenantId": 1,
    "updatedAt": 1596015670301,
    "updatedBy": "UnAuthenticated",
    "username": "admin9"
}
We can also check our Kafka consumer that we ran earlier.
$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic FirstTopic
UserCreatedEvent(userId=20, username=admin8, fullname=null, status=BeforeCommit)
UserCreatedEvent(userId=20, username=admin8, fullname=null, status=AfterRollback)
UserCreatedEvent(userId=20, username=admin8, fullname=null, status=AfterCompletion)
UserCreatedEvent(userId=21, username=admin9, fullname=null, status=BeforeCommit)
UserCreatedEvent(userId=21, username=admin9, fullname=null, status=AfterCommit)
UserCreatedEvent(userId=21, username=admin9, fullname=null, status=AfterCompletion)
The first three events are for the failed transaction which rolled back and the last three are for the successful transaction.