Monday, August 22, 2022

Story of a funny linked list

 Recently I was presented with this problem, for a moment I was completely blanked on how to solve this problem. I had to think for some time and finally found a solution. Let's first look at the problem.


Let's assume we have a simple singly linked list. The only modification is that each node of the linked list also has another link, let's call it other. For some of the elements of the list, the link other contains a pointer to some other element in the list. Now, we have to write a clone function that makes a deep copy of the list.

The salient method is the clone method in the class. The challenge is that other pointer can come in any other. A node earlier in the list can point to a node later in the list and vice versa. So we have to basically run two passes on the list.

In the first pass, we make a regular copy of the list and create two hash-maps. The first hash-map contains other pointer as the key and the old and new node as the value pair. The second hash map contain a mapping of each node and its corresponding new map. 

Once we complete the simple copy of the list, we traverse through the first map and find all corresponding new nodes for corresponding old nodes to which the other pointer is pointing to and then we retrofit the list.

So that's how I was able to solve the problem of funny linked list.

Tuesday, August 25, 2020

Database Schema Version Control

 Most of the times when we have a service in production, we might make changes to the database schema. We want to make sure of two things here. 

  1. Database changes are version controlled
  2. Changes to actual database are done in a controlled fashion. 
I am not very comfortable with some CI/CD job going on its own and making changes to database schema. Deployments that need  schema changes are done as part of planned maintenance cycles and need to be monitored by actual humans.

So, what do I expect from a solution that takes care of schema versioning.

  • The service should not be allowed to start if the schema is not in sync with the in-memory entities and code that is designed to use them
  • An easy mechanism should be provided for migration of schema version. This is generally done through different phases of development like dev/stage/test and then to production., We want to automate it in the sense that there should be scripts that are tested and validated through previous phases of development lifecycle but still has an option of actual user triggering it in production.
With above two goals, let's try to define a database schema versioning system. I am going to use flywaydb as the primary tool that does it with configurations that make it useful for my usecase. First let's define a configuration file that takes defines database connectivity option for my database.

You would remember since ours is a multi-tenant application, we have three databases and we have define configurations for all the three databases.



Here we have all the three database configurations defined. Now let's make changes to pom.xml for dependencies.

If you are just creating your application, then your life is easy, just create the initial sql file and add it to your src/resources/db/migration directory. Be sure to name it something like V1.0__Initial_Schema.sql. You need to make sure the first character is V, U, or R for version, undo, repeatable respectively. After that you have a version number, you can follow any version number strategy but it should be in the form of the dotted notation of ver.major.minor. For more details on this, please read here on how this can be effectively used. What we need to understand is that flyway will consider a version 1.1 to be a version later than version 1.0.
Now that we have the existing SQL stored in the file, we can baseline our current schema so that any future changes could be incorporated using this mechanism.
$ mvn flyway:baseline -Dflyway.configFiles=./tutorials/config/default.config
[INFO] Scanning for projects...
[INFO] 
[INFO] -----------------< in.springframework.blog:tutorials >------------------
[INFO] Building tutorials 0.0.1-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- flyway-maven-plugin:6.5.5:baseline (default-cli) @ tutorials ---
Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
[INFO] Flyway Community Edition 6.5.5 by Redgate
[INFO] Database: jdbc:mysql://localhost:3306/tutorial (MySQL 8.0)
[INFO] Creating Schema History table `tutorial`.`flyway_schema_history` with baseline ...
[INFO] Successfully baselined schema with version: 1.0
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  1.285 s
[INFO] Finished at: 2020-08-25T10:14:53+05:30
[INFO] ------------------------------------------------------------------------

Make sure the pathname of the configuration file is appropriate for your scenario. Also like in my case, since I have multiple databases due to multi-tenancy, I will need to do this for each of the databases. Once done, I can check the status of schema using the info command.
$ mvn flyway:info -Dflyway.configFiles=./tutorials/config/default.config                                          

[INFO] Scanning for projects...
[INFO] 
[INFO] -----------------< in.springframework.blog:tutorials >------------------
[INFO] Building tutorials 0.0.1-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- flyway-maven-plugin:6.5.5:info (default-cli) @ tutorials ---
Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
[INFO] Flyway Community Edition 6.5.5 by Redgate
[INFO] Database: jdbc:mysql://localhost:3306/tutorial (MySQL 8.0)
[INFO] Schema version: 1.0
[INFO] 
[INFO] +----------+---------+----------------+----------+---------------------+----------+
| Category | Version | Description    | Type     | Installed On        | State    |
+----------+---------+----------------+----------+---------------------+----------+
|          | 1.0     | Base Migration | BASELINE | 2020-08-25 10:13:12 | Baseline |
+----------+---------+----------------+----------+---------------------+----------+

[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  1.247 s
[INFO] Finished at: 2020-08-25T10:13:26+05:30
[INFO] ------------------------------------------------------------------------
Now our existing database is baselined and we can start using the flyway versioning mechanism. We still have one issue to resolve, since we want to make sure our application doesn't start till the time we have appropriate code backing appropriate schema. The first step is to disable any kind of migration being started by our application. So we define a null migration strategy that will not do anything.

This will ensure that no automatic migration is executed. We still want to make sure that the application exits if the schema can't be validated. For this we need to go to the method where we are creating the datasource. We add the validation of the datasource there.

In our example, since we are using Multi-tenant datasource, we add this validation there. 

Look at validateDatasource method, it takes a dataSource as input and raises a SpringApplication exit if the database can't be validated. Now, let's test it. We will take our entity Profile and add a new field in, let's call it maritalStatus

We now create a sql file that alters the table.


Now if we try to run this application, let's see what happens.
 
2020-08-25 11:53:54.652 ERROR 40041 --- [           main] o.s.boot.SpringApplication               : Application run failed

org.springframework.beans.factory.UnsatisfiedDependencyException: Error creating bean with name 'flywayInitializer' defined in class path resource [org/springframework/boot/autoconfigure/flyway/FlywayAutoConfiguration$FlywayConfiguration.class]: Unsatisfied dependency expressed through method 'flywayInitializer' parameter 0; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'flyway' defined in class path resource [org/springframework/boot/autoconfigure/flyway/FlywayAutoConfiguration$FlywayConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.flywaydb.core.Flyway]: Factory method 'flyway' threw exception; nested exception is org.springframework.boot.context.properties.ConfigurationPropertiesBindException: Error creating bean with name 'dataSource': Could not bind properties to 'DataSource' : prefix=spring.datasource, ignoreInvalidFields=false, ignoreUnknownFields=true; nested exception is java.lang.IllegalStateException: org.springframework.boot.web.servlet.context.AnnotationConfigServletWebServerApplicationContext@ba2f4ec has been closed already
	at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:797) ~[spring-beans-5.2.7.RELEASE.jar:5.2.7.RELEASE]

Now let's  run the migration for this sql.
$ mvn flyway:migrate -Dflyway.configFiles=./tutorials/config/default.config
     
[INFO] Scanning for projects...
[INFO] 
[INFO] -----------------< in.springframework.blog:tutorials >------------------
[INFO] Building tutorials 0.0.1-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- flyway-maven-plugin:6.5.5:migrate (default-cli) @ tutorials ---
Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
[INFO] Flyway Community Edition 6.5.5 by Redgate
[INFO] Database: jdbc:mysql://localhost:3306/tutorial (MySQL 8.0)
[INFO] Successfully validated 3 migrations (execution time 00:00.017s)
[INFO] Current version of schema `tutorial`: 1.1
[INFO] Migrating schema `tutorial` to version 1.2 - Add Marital Status
[INFO] Successfully applied 1 migration to schema `tutorial` (execution time 00:00.075s)
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  1.323 s
[INFO] Finished at: 2020-08-25T11:56:25+05:30
[INFO] ------------------------------------------------------------------------

$ mvn flyway:info -Dflyway.configFiles=./tutorials/config/default.config 

[INFO] Scanning for projects...
[INFO] 
[INFO] -----------------< in.springframework.blog:tutorials >------------------
[INFO] Building tutorials 0.0.1-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
[INFO] 
[INFO] --- flyway-maven-plugin:6.5.5:info (default-cli) @ tutorials ---
Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
[INFO] Flyway Community Edition 6.5.5 by Redgate
[INFO] Database: jdbc:mysql://localhost:3306/tutorial (MySQL 8.0)
[INFO] Schema version: 1.2
[INFO] 
[INFO] +-----------+---------+---------------------------+----------+---------------------+----------+
| Category  | Version | Description               | Type     | Installed On        | State    |
+-----------+---------+---------------------------+----------+---------------------+----------+
|           | 1.0     | Base Migration            | BASELINE | 2020-08-25 10:15:00 | Baseline |
| Versioned | 1.1     | Remove AuthToken FromUser | SQL      | 2020-08-25 10:49:27 | Success  |
| Versioned | 1.2     | Add Marital Status        | SQL      | 2020-08-25 11:56:25 | Success  |
+-----------+---------+---------------------------+----------+---------------------+----------+

[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  1.226 s
[INFO] Finished at: 2020-08-25T11:56:30+05:30
[INFO] ------------------------------------------------------------------------
As we can see now, the database schema is migrated. So if we make sure all the database migration SQL files are properly made part of the project, we can eliminate schema mismatch issues to a large extent. We also have an option to make it fully automated or a single command migration with manual verification.
The hibernate ORMs are just in-memory representations of actual database schema and the schema is owned by the database, ORM changes have to be done by the programmers.

The complete code supporting this tutorial is available at my spring blog tagged as v1.8 

Wednesday, July 29, 2020

Publishing Application Events using Kafka and Spring Transaction Events

Many applications have a need to publish application level events based on some operations happening in the system.  Here is one requirement that I had recently. Basically in my tutorial system, if there is new user created, I need to publish a UserCreated event. Here is how we can accomplish this using Transaction Events provided by Springframework.
First I create a event pojo that will be used to publish the event.

Now we create an event listener.

As you can see within the event listener, I have a handler method that is a hook to a particular phase in transaction commit cycle. At different phases of commit cycle, we are publishing an event to Kafka topic 'FirstTopic'

Now we create an event publisher.
The event publisher just constructs the event object and publishes the event.
We add a createUser method within the user service and mark it @Transactional. This is important to make sure any exceptions coming out of createUser transaction are caught and propagated appropriately.

 
Now within the user registration, we add a publish within a transaction. Please be careful that the code block that is publishing the event is within a transaction otherwise these don't perform any operations. 
Look at the createUser method that is implementing /user request mapping and method POST. The penultimate line in the method is publishing the event. Now let's test this code.

Just to test the code, we also run a kafka-console-consumer listening to the topic to which our code is publishing the message.
First we try to create a user that is already existing.

$ curl --request POST   --url http://localhost:8081/tenant1/registration/user   \
 --header 'authorization: Basic c3VwZXJzZWNyZXRjbGllbnQ6c3VwZXJzZWNyZXRjbGllbnQxMjM='   \
 --header 'cache-control: no-cache'   \
 --header 'content-type: application/json'   \
 --header 'postman-token: 71180383-2b1f-482e-2fcc-2a23d045b205'   \
 --header 'x-tenant: tenant1'   \
 --data '{"username": "admin8","password": "admin1234","audience" : "self"}' | python -m json.tool
 {
    "error": "Conflict",
    "message": "",
    "path": "/tenant1/registration/user",
    "status": 409,
    "timestamp": 1596015557641
}

Now we try to create a user that doesn't exist.
$ curl --request POST   --url http://localhost:8081/tenant1/registration/user \
--header 'authorization: Basic c3VwZXJzZWNyZXRjbGllbnQ6c3VwZXJzZWNyZXRjbGllbnQxMjM='\
--header 'cache-control: no-cache' \
--header 'content-type: application/json'\
--header 'postman-token: 71180383-2b1f-482e-2fcc-2a23d045b205' \
--header 'x-tenant: tenant1' \
--data '{"username": "admin9","password": "admin1234","audience" : "self"}' | python -m json.tool
{
    "createdAt": 1596015670301,
    "createdBy": "UnAuthenticated",
    "id": 21,
    "mask": 1,
    "tenantId": 1,
    "updatedAt": 1596015670301,
    "updatedBy": "UnAuthenticated",
    "username": "admin9"
}
We can also check our Kafka consumer that we ran earlier.
$ kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic FirstTopic
UserCreatedEvent(userId=20, username=admin8, fullname=null, status=BeforeCommit)
UserCreatedEvent(userId=20, username=admin8, fullname=null, status=AfterRollback)
UserCreatedEvent(userId=20, username=admin8, fullname=null, status=AfterCompletion)
UserCreatedEvent(userId=21, username=admin9, fullname=null, status=BeforeCommit)
UserCreatedEvent(userId=21, username=admin9, fullname=null, status=AfterCommit)
UserCreatedEvent(userId=21, username=admin9, fullname=null, status=AfterCompletion)
The first three events are for the failed transaction which rolled back and the last three are for the successful transaction.

Wednesday, June 24, 2020

Building a multitenant service

If you build any production service that is valuable, very soon you find a customer who wants to use it but wants it white-labeled for him. You also very quickly find out that there is a need to data isolation, i.e. different customers want their data to be kept in their own databases. Because of these reasons, it is always a good idea to design a service keeping multi-tenancy in mind.
In this tutorial we will look at some of the important features required for a multi-tenant service and how do we leverage springframework to deliver a service that is truly enabled for a modern multi-tenant service.
Here are couple of the important aspects of a multi-tenant service.
  1. Everybody gets their own endpoints
  2. Everybody can be given their own databases
Let's design how our endpoint URLs would look like keeping tenancy in mind. We add a discriminator in the URL that identified a tenant. For example our OAuth URLs would become something like below.
http://example.com/tenant1/oauth/token
http://example.com/tenant1/oauth/check_token
To accomplish, we define our RequestMapping with an embedded tenant variable that becomes part of each of the URLs.

 public static final String BASE_ENDPOINT = "/{tenant}";
 public static final String USER_ENDPOINT = BASE_ENDPOINT + "/user";
 public static final String REGISTRATION_ENDPOINT = BASE_ENDPOINT + "/registration";



As we can see, the first part of the URL defines the tenant. It is not mandatory but I also design the service so that a header is expected that also defines the tenant. This is just to guard against some client mistakenly calling a wrong tenant because the same tenant has to be added in two places. We will use following header.
X-tenant:tenant1
Every incoming request into the system needs to know the tenant and the authenticated user that is part of the request. We define a sequence of filters for this purpose. Since these filters need to be called in a particular order, we define the order as follow.

public static final int TENANT_HEADER_PRECEDENCE = Ordered.HIGHEST_PRECEDENCE;
public static final int SEED_DATA_PRECEDENCE = TENANT_HEADER_PRECEDENCE - 1;
public static final int TENANT_PRECEDENCE = SEED_DATA_PRECEDENCE - 1;
public static final int USER_PRECEDENCE = Ordered.LOWEST_PRECEDENCE;

As you can see we are going to define four filters which will perform specific functions.
  1. TenantHeader filter will extract tenant header from the incoming request, match the URL piece with the header and set it in a ThreadLocal variable.
  2. SeedData filter is only require to create some seed data to make the service usable. We need a default tenant in the system so that we can start making some requests. This doesn't do anything most of the time.
  3. Tenant filter will extract the tenant object and set it into another ThreadLocal variable.
  4. User filter is the last in the precedence and will extract the current authenticated user and will store it into a ThreadLocal.
We are using following TutorialRequestContext class to store these thread local variables.

Now let's look at these filters one by one.

This filter just extracts the X-tenant header stores in the thread local.

This filter checks that there should be atleast one tenant in the system, if not found, it inserts a default tenant. This is required so that we can use rest calls.

This filter extract the complete Tenant object from the database and stores it into the thread local.

This filter extract currently authenticate user and populates it in the thread local.
In the last tutorial, we used the spring provided default ClientDetailsService, we create our own service in this tutorial to make sure we can have a schema that we like. To do that we need a entity, TutorialClientDetails and a service TutorialClientDetailsService.

This entity just implements the ClientDetails interface

This service needs to implement a method loadClientByClientId.

Now that all the foundation work is in place, we get down to making our service multitenant. The first things that we need to do is to remove all the old dataSources that we had defined. We will now define a routing data source that will choose the right data source based on the tenant that we are taking to. This is where our tenant discriminator thread local will be usedful. Take a look at the following class.

Here we implement a method determineCurrentLookupKey which uses thread local to identify the current tenant and returns the key.

The bulk of smartness lies in the class TutorialMultitenantConfig, specifically the dataSource bean that returns the default dataSource for the system. What we are assuming that within our resource directory, we will have a tenants subdirectory and within that we will have one properties file per tenant. The property file will look like below.
 

Here we have added usual spring data source properties alongwith a name property which will be the name of the tenant. This name will be matched with the name in the URL and the header.
In the class TutorialMultitenantConfig, look at the definition of variable TENANT_PROPERTIES_RESOURCE, this basically looks up for all the *.properties file in the tenant directory.  Lines 61 through 74, create a data source object for each of the tenants and store these in a map with key being the tenant name. We remember determineCurrentLookupKey method which returned the name of current tenant, that return value is used to fetch appropriate data source object for the request being processed. Line 88 defines a default data source that is used if there is no data source present for the given tenant.

Now our multi tenanted service is ready and it is time to test. The first thing that we need to do is create couple of more databases with exact same schema as the first database. Please keep in mind that this has to be done manually even if you have defined ddl-auto property. Just take a mysqldump of the first database and import in two other databases.
We also need to make sure that the system atleast has one tenant defined in each of the databases. This is required in order to make sure we are able to use the rest calls. The best approach is to have a default tenant and then take a dump so that default tenant is also copied in each of the databases. We have created an admin user which has ADMIN role. We call the user endpoint that will return the list of all the users.
$ curl --request GET \
>   --url http://localhost:8081/tenant1/user \
>   --header 'authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE1OTI5Mzc1MzksInVzZXJfbmFtZSI6ImFkbWluIiwiYXV0aG9yaXRpZXMiOlsiVVNFUiIsIkFETUlOIl0sImp0aSI6IjRmOTdiNThkLTI1NTEtNDA4Yi04ZWM4LWUzZGZmZWQ3MTQ2NiIsImNsaWVudF9pZCI6InN1cGVyc2VjcmV0Y2xpZW50Iiwic2NvcGUiOlsicmVhZCIsImNvZGUiLCJ3cml0ZSJdfQ.fWt_H-ORHP44xOIljoqMfVIeGkqJGQqUBMj8paVxAPM' \
>   --header 'cache-control: no-cache' \
>   --header 'content-type: application/json' \
>   --header 'postman-token: 76b3525e-bc3b-e629-91b6-a75253f657d8' \
>   --header 'x-tenant: tenant1' |python -m json.tool

[
    {
        "createdAt": 1592936869000,
        "createdBy": "UnAuthenticated",
        "email": "admin@springframework.in",
        "fullname": "Spring Tutorial Admin",
        "grantedAuthorities": [
            "ADMIN",
            "USER"
        ],
        "id": 6,
        "mask": 1,
        "tenantId": 1,
        "updatedAt": 1592936869000,
        "updatedBy": "UnAuthenticated",
        "username": "admin"
    },
    {
        "createdAt": 1592904896000,
        "createdBy": "UnAuthenticated",
        "email": "defaultuser@defaultadmin.com",
        "fullname": "Default User",
        "grantedAuthorities": [],
        "id": 2,
        "mask": 0,
        "tenantId": 1,
        "updatedAt": 1592904896000,
        "updatedBy": "UnAuthenticated",
        "username": "defaultuser"
    },
    {
        "createdAt": 1592904900000,
        "createdBy": "UnAuthenticated",
        "email": "vinay@avasthi.com",
        "fullname": "Vinay Avasthi",
        "grantedAuthorities": [],
        "id": 3,
        "mask": 1,
        "tenantId": 1,
        "updatedAt": 1592904900000,
        "updatedBy": "UnAuthenticated",
        "username": "vavasthi"
    }
]

Now we run the same query in tenant2
$ curl --request GET   --url http://localhost:8081/tenant2/user   --header 'authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE1OTI5MzgzMDcsInVzZXJfbmFtZSI6ImFkbWluIiwiYXV0aG9yaXRpZXMiOlsiVVNFUiIsIkFETUlOIl0sImp0aSI6IjJmOWVkOTYxLTk5MjktNDI3Zi1iZGU4LTc5NGIwYWNhMGYzNiIsImNsaWVudF9pZCI6InN1cGVyc2VjcmV0Y2xpZW50Iiwic2NvcGUiOlsicmVhZCIsImNvZGUiLCJ3cml0ZSJdfQ.KNzjB_JqJDaVI5vZNK-OcXDgvM5Uwt4I8tsVCazerpU'   --header 'cache-control: no-cache'   --header 'content-type: application/json'   --header 'postman-token: b1a9651c-e4c0-7b2e-c7b3-e0b3e06fa40e'   --header 'x-tenant: tenant2' |python -m json.tool
[
    {
        "createdAt": 1592904896000,
        "createdBy": "UnAuthenticated",
        "email": "defaultuser@defaultadmin.com",
        "fullname": "Default User",
        "grantedAuthorities": [],
        "id": 2,
        "mask": 0,
        "tenantId": 1,
        "updatedAt": 1592904896000,
        "updatedBy": "UnAuthenticated",
        "username": "defaultuser"
    },
    {
        "createdAt": 1592904900000,
        "createdBy": "UnAuthenticated",
        "email": "vinay@avasthi.com",
        "fullname": "Vinay Avasthi",
        "grantedAuthorities": [],
        "id": 3,
        "mask": 1,
        "tenantId": 1,
        "updatedAt": 1592904900000,
        "updatedBy": "UnAuthenticated",
        "username": "vavasthi"
    },
    {
        "createdAt": 1592938002000,
        "createdBy": "UnAuthenticated",
        "email": "ut2@springframework.in",
        "fullname": "User in T2",
        "grantedAuthorities": [],
        "id": 6,
        "mask": 1,
        "tenantId": 1,
        "updatedAt": 1592938002000,
        "updatedBy": "UnAuthenticated",
        "username": "userint2"
    },
{
        "createdAt": 1592937749000,
        "createdBy": "UnAuthenticated",
        "email": "admin@springframework.in",
        "fullname": "Springframework Tenant2 Administrator",
        "grantedAuthorities": [
            "ADMIN",
            "USER"
        ],
        "id": 5,
        "mask": 1,
        "tenantId": 1,
        "updatedAt": 1592937749000,
        "updatedBy": "UnAuthenticated",
        "username": "admin"
    }
]
As we can see, we are seeing two totally different sets of users which are stored in two totally different sets of databases.

Complete code for this blog post is available in my github repository here

Sunday, June 21, 2020

OAuth2 and JWT Tokens Part 1

In this blog post we look at how do we make our spring server become an OAuth2 Authorization server and start producing JWT tokens.

The first step that we take is disable all the filters that we had added earlier. We don't want to authorize using old style REST calls.  

Now, the first step is to enable our authorization server. We create a new java configuration file that extends AuthorizationServerConfigurerAdapter.

In the configuration , we autowire AuthenticationManager, DataSource and a UserDetailsService. There are two types of auth tokens in OAuth2. The first one are the usual tokens that are requested by users by providing their username and password. The second set of tokens are not tied to individual users, these are called client tokens and could be used for services talking to each other.

We configure ClientDetailsService to use Jdbc client service. We add a password encoder in the configuration and provide a data source that would be used to store persistent data. In our example, we are using SCryptPasswordEncoder. Here we are using spring provided Jdbc client service but one could implement ClientDetailsService and ClientRegistrationService interfaces and provide their own custom implementation for client service.

The next steps is to enable web security and provide an authentication manager bean for performing authentication.

We have seen earlier that we used a UserDetailsService to configure the authorization server. We are not going to use the spring provided service but write our own. 

In the UserDetailsService, we need to provide our own implementation for a method loadByUsername. In this method, we basically load the user entity from our database and create an object of type UserDetails and return it. The object also requires a list of GrantedAuthority

UserDetails is an interface provided in springframework, we create our own class that implements the interface.

Similar to UserDetails, GrantedAuthority is also an interface provided by springframework, we implement that interface to provide our concrete implementation of GrantedAuthority. For a very simple understanding, GrantedAuthority is like a user role. We can use this role later to provide access control on endpoints.

These changes will make the server ready for OAuth2 service. We still have a testing nightmare. Because now all our endpoints are behind this authentication filter, there is no way for us to create new clients and users. We could directly insert values in database, but we still have to worry about how to encrypt the password before inserting into the data. To get out of this situation, I add two endpoints, one for handling clients and another for handling users. These endpoints need to be configured so that they don't go through the authentication service. These need to be removed before the service goes into production.

We define a pojo that mimics the OauthClientDetails schema in the database. It looks like below.

Now we add a repository for handling OauthClientDetails table.

We also add a service layer for Client handling.


We already had a UserRepository, we change it to encrypt the password before use store the password.

We also add a service layer for User.

Now that we have all the layers required, we add the endpoint for Client.


The next thing to modify is the endpoint layer for user.

Now we are ready with our code for create new clients and users. We still have the small issue because if we hit these endpoints, we will get unauthorized error. So we need to put these in the exception list. For this we go to our SecurityConfiguration that we had defined in one of the earlier tutorials.

Look at the method public void configure(WebSecurity web) throws Exception.  We have added following two lines to ignore evaluation of authentication filters for two families of URL.
        web.ignoring().antMatchers("/oauth/client/**");
        web.ignoring().antMatchers("/user/**");
Of course this is very dangerous and we have added it only for testing purpose. In a future tutorial we will have a more elegant solution for this.

Now we are ready. We can test the service with following curl or equivalent command.

curl -X POST \
  http://localhost:8081/oauth/token \
  -H 'authorization: Basic YW5kcm9pZC1jbGllbnQ6YW5kcm9pZC1zZWNyZXQ=' \
  -H 'cache-control: no-cache' \
  -H 'content-type: application/x-www-form-urlencoded' \
  -H 'postman-token: 3a349bc0-1230-adbe-4b79-9b938728a101' \
  -d 'grant_type=password&password=mypassword&username=myuser&client_id=my-client&client_secret=my-secret'

This is the response that we get

{
    "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE1OTI3MjI5NTgsInVzZXJfbmFtZSI6InZhdmFzdGhpIiwianRpIjoiOGExZDYxN2ItZDU4OC00Nzc5LThlOTQtYTBiNWZkYzcxOTg2IiwiY2xpZW50X2lkIjoiYW5kcm9pZC1jbGllbnQiLCJzY29wZSI6WyJjb2RlIl19.n7hCnBdnjMC8vuCsHxkOznfd06iJctGNeypx2fXWla4",
    "token_type": "bearer",
    "refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE1OTI3MjMwNTgsInVzZXJfbmFtZSI6InZhdmFzdGhpIiwianRpIjoiNGU4ODg3OTYtZWZiZS00ZDc5LTg1YmMtN2EzNzFhM2I4Yzg0IiwiY2xpZW50X2lkIjoiYW5kcm9pZC1jbGllbnQiLCJzY29wZSI6WyJjb2RlIl0sImF0aSI6IjhhMWQ2MTdiLWQ1ODgtNDc3OS04ZTk0LWEwYjVmZGM3MTk4NiJ9.xuElTWYSLscgdcqj0t-4t6prJbVOfHVqM331UUjfPBQ",
    "expires_in": 99,
    "scope": "code",
    "jti": "8a1d617b-d588-4779-8e94-a0b5fdc71986"
}
We can get the refresh token by calling the same endpoint with grant_type refresh_token.

curl -X POST \
>   http://localhost:8081/oauth/token \
>   -H 'authorization: Basic YW5kcm9pZC1jbGllbnQ6YW5kcm9pZC1zZWNyZXQ=' \
>   -H 'cache-control: no-cache' \
>   -H 'content-type: application/x-www-form-urlencoded' \
>   -H 'postman-token: 0adf261f-ed1e-85b3-67a5-21430fee8b38' \
>   -d 'grant_type=refresh_token&refresh_token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE1OTI3MjMxOTAsInVzZXJfbmFtZSI6InZhdmFzdGhpIiwianRpIjoiNGMwNjczOTQtZjllNS00NDVjLTg5MzItMmRiMDM4N2U2ZjIxIiwiY2xpZW50X2lkIjoiYW5kcm9pZC1jbGllbnQiLCJzY29wZSI6WyJjb2RlIl0sImF0aSI6ImE1ZmY5MjhiLWQ1YjEtNDQ5Yy04N2I4LTU3ODgwYzY1NjM3NiJ9.RS2K7N5XCHQNov02WNu1QK1AqOvgc7MuNzeCydt7Ajs'
We get following response.
{
    "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE1OTI3MjMxNzQsInVzZXJfbmFtZSI6InZhdmFzdGhpIiwianRpIjoiY2E1OWU3NzktZDRkZS00NDU1LWFlNjItNDQ2NTAxYzUxZjhmIiwiY2xpZW50X2lkIjoiYW5kcm9pZC1jbGllbnQiLCJzY29wZSI6WyJjb2RlIl19.WgVgpBWuDmdJUk3UG8MTKOBsXn5zbEGO8gyqg2akN8o",
    "token_type": "bearer",
    "refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE1OTI3MjMxOTAsInVzZXJfbmFtZSI6InZhdmFzdGhpIiwianRpIjoiNGMwNjczOTQtZjllNS00NDVjLTg5MzItMmRiMDM4N2U2ZjIxIiwiY2xpZW50X2lkIjoiYW5kcm9pZC1jbGllbnQiLCJzY29wZSI6WyJjb2RlIl0sImF0aSI6ImNhNTllNzc5LWQ0ZGUtNDQ1NS1hZTYyLTQ0NjUwMWM1MWY4ZiJ9.3zDqHLSwVLq_Dg8r4Ppf5tfvzxwT_7FEwdTV67l3VYQ",
    "expires_in": 99,
    "scope": "code",
    "jti": "ca59e779-d4de-4455-ae62-446501c51f8f"
}
We can verify a token by accessing oauth/check_token endpoint and providing the token.

curl --request POST \
>   --url http://localhost:8081/oauth/check_token \
>   --header 'authorization: Basic YW5kcm9pZC1jbGllbnQ6YW5kcm9pZC1zZWNyZXQ=' \
>   --header 'cache-control: no-cache' \
>   --header 'content-type: application/x-www-form-urlencoded' \
>   --header 'postman-token: ef035580-4cba-a7a1-deaa-dcf5f59e41fc' \
>   --data token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE1OTI3MjM2MTEsInVzZXJfbmFtZSI6InZhdmFzdGhpIiwianRpIjoiODZhZjZhYmUtZjA4YS00YzIzLThkYzYtYjY2ZmM3ZWQ0YWRiIiwiY2xpZW50X2lkIjoiYW5kcm9pZC1jbGllbnQiLCJzY29wZSI6WyJjb2RlIl19.Kn284yuxxHSxVtEc3D8H5YjVjPi0yN6oX1hDwEx5bOo
We get the following response.
{
    "client_id": "android-client",
    "exp": 1592723611,
    "jti": "86af6abe-f08a-4c23-8dc6-b66fc7ed4adb",
    "scope": [
        "code"
    ],
    "user_name": "myuser"
}
This is all about enabling JWT OAuth tokens with any spring server. We will continue this tutorial in next part with some more important details. The complete code for the working server is here.

Friday, June 5, 2020

Auditing in MySQL

One of the important requirements in many RDBMS based workloads is to have audit log where any row in any table is stamped with who changed it and  when was it changed? To do this in every place where the table is modified is extremely cumbersome. In this blog post we look at how we can enable the springframework based auditing framework to perform this activity.

Before we do that, we need to upgrade the versions for our dependencies since I have been using this project for quite sometime. Also I decided to use lombok logging annotation and removed all the dependencies on log4j.

Here are the modifications to the pom.xml for setting dependencies.


Now we look at our User entity. We need to add auditing fields to this table. Since in a realistic project one would have multiple entities, we create a abstract entity base class with all the audit fields. We also move the primary key to the abstract base class. When we do that, hibernate provides multiple ways of mapping the entities to the database schema. We want all the parent attributes to be stored in a single table with the child class attribute. To accomplish that, we need to add @MappedSuperClass to the base class.

As we can see, we have added five attributes in the base class. One is the id for all our entities and rest four will be used for auditing purposes.
At this time we also add another layer to our code. Currently all the Endpoints directly call the Repository layer, this causes a problem if we want to write functions that can be reused across different endpoints. An example of this need is retrieveUser method that takes an argument that could be a username or a email. Currently this method lies in the Endpoint layer as a private method. This is a useful method in many different contexts, so we create a new UserService layer and move this method there.


Now, let's get to the original task of enabling auditing. First we define a Auditing Config as below.

We had earlier defined a ThreadLocal that is used by the auditAware method defined above to extract currently logged in user and return its userId. As we can see the audit fields in the AbstractBaseEntity expects a Long for @createdBy and  @LastModifiedBy fields. The EntityAuditConfig also has annotation @EnableJpaAuditing which is required.
At this point we also add a new endpoint called ProfileEndpoint which can be used to manage the entity that represents a user profile. This entity currently only contains a url.
Now if we perform any operation on any of the endpoint, we will see the auditing fields automatically populated. Give it a spin. It is a life saver in many productions applications. I have had situations where users changed their passwords, forgot them and then complained saying that they have been hacked. 
The complete code for this and previous posts can be found at my github repository. This tutorial changes are under v1.5.

Monday, February 24, 2020

Hadoop and Spark Locally

As a continuation of the last post, we now look at how to make it deployable in a proper Spark/Hadoop cluster. We will not go into the details of the setup of these clusters themselves but more into how do we make sure a program that we developed earlier could run as a job in a cluster.
We will continue with the setup in our local machine. I am using a Mac so the instructions are with respect to that but most instructions would be common to any other platform.
If Spark is processing data from a database and writing into a hive, pretty much what we did in the last post would work. The problem arises if some of the data being processed exists as flat files. If we want to submit our jobs to a Spark cluster, we can not use local files because the jobs are not running in the local file system. 
The best approach is to either use a hdfs cluster or deploy a single node hdfs on your machine. Here I am enumerating the steps to set up a single node hdfs cluster on a Mac OS X machine.
  • Download the Hadoop distribution for your machine here.
  • Hadoop distribution is available in the form of a .tar.gz file and you can expand it in some directory on your machine. The expansion will create a directory of the form hadoop-x.y.z assuming your Hadoop version is x.y.z. set the environment variable HADOOP_HOME to the full pathname of this Hadoop directory.
  • Add $HADOOP_HOME/bin to the PATH variable.
  • Now we need to update the configuration files for hadoop.
$ cd $HADOOP_HOME/etc/hadoop
$ vi core-site.xml

We update the file with the following properties.

$ vi hdfs-site.xml

We update the file with the following properties.
$ vi mapred-site.xml

We update the file with the following properties.
$ vi yarn-site.xml

We update the file with the following properties.
Now we start the Hadoop.
$ cd $HADOOP_HOME
$ sbin/start-all.sh
Now we can access files stored into the HDFS in our spark jobs.
The next post will go more into the details of how to process files in spark.

Tuesday, February 18, 2020

Hive and Spark

In this blog post, we take a slight deviation from core issues related to the spring framework and look at an issue that spring programmers might face regularly. Recently I was looking with Spark and found a need to read the data from MySQL, do some processing and write it back to a hive instance. We will look at this issue in this post.
We start by making sure our hive instance is backed by a database. To do this, we do the following.
$ cp $HIVE_HOME/conf/hive-default.xml.template $HIVE_HOME/conf/hive-site.xml
$ vi gnu/apache-hive-3.1.2-bin/conf/hive-site.xml

We edit the hive-site.xml file and make sure it is configured as below.
We can configure the values to suit our needs. But make sure the MySQL username, password, database URL exists and has relevant permissions.
Now we create another database in MySQL which will contain the data that we need to process. I am calling this database mystuff, with username mystuff, password mystuff123. We need to run following commands in MySQL to make sure everything exists and permissions are appropriate.
create database mystuff;
Query OK, 1 row affected (0.00 sec)

mysql> create user mystuff@localhost identified by 'mystuff123';
Query OK, 0 rows affected (0.00 sec)

mysql> create user mystuff@'%' identified by 'mystuff123';
Query OK, 0 rows affected (0.01 sec)

mysql> grant all on mystuff.* to mystuff@localhost;
Query OK, 0 rows affected (0.00 sec)

mysql> grant all on mystuff.* to mystuff@'%';
Query OK, 0 rows affected (0.00 sec)

Now we create a plain java project in IntelliJ with the following pom.xml file.
Now to look at the problem at hand. We have a table in MySQL with the following structure.
mysql> desc mydata;
+-------+--------------+------+-----+---------+-------+
| Field | Type         | Null | Key | Default | Extra |
+-------+--------------+------+-----+---------+-------+
| id    | int          | YES  |     | NULL    |       |
| k     | varchar(10)  | YES  |     | NULL    |       |
| v     | varchar(255) | YES  |     | NULL    |       |
+-------+--------------+------+-----+---------+-------+
3 rows in set (0.00 sec)

We want to flatten this table such that each key gets converted to a column for each id. So assume our current data is as below.
mysql> select * from mydata;
+------+-------+-------------------+
| id   | k     | v                 |
+------+-------+-------------------+
|    1 | NAME  | John Doe.         |
|    1 | EMAIL | jd@example.com    |
|    2 | NAME  | Jane Doe          |
|    2 | EMAIL | janed@example.com |
+------+-------+-------------------+
2 rows in set (0.00 sec)
We want to load this data and convert it into a flattened table that has three columns i.e. id, name. email. Then we want to populate this into table person into hive with flattened data.
hive (default)> select * from person;
OK
person.id person.email person.name
1 jd@example.com.  John Doe
2 janed@example.com Jane Doe
Time taken: 0.094 seconds, Fetched: 2 row(s)

The following code will perform the above conversion.
Line numbers 11 through 17 create a SparkSession for hive operations. The key instruction here is enableHiveSupport. Lines 20 through 26 create a SparkSession that will be used for MySQL operations. Lines 29 through 37 load the complete contents of the table. Lines 39 through 42 will group the results by the id and pivot the table on the field K. Line 46 through 49 creates a data frame for hiveSession and writes the contents in a table with named person.

Friday, April 12, 2019

Spring and more Kafka

In the last post, we saw how to integrate Kafka with Spring Boot application. The post was a very simple implementation of Kafka. The real world is much more complex. You have to deal with multiple topics, you need multiple partitions. In this post, we explore more details of a spring boot application with Kafka.
Let's start with the fact that we have multiple topics that we are dealing with. The very first thing that we need to do is to separate out the properties for each of the topics.

We can no longer use default KafkaTemplate, we will have to create our own. The best way is to define a KafkaProducer. To create a KafkaProducer, we need to create KarkaProducerConfig. Here are two sets of KafkaProducerConfig class, one each for the topic.


Looking at the producer config, we can observe that we create two beans which are qualified by names and return an appropriate KafkaTemplate which is later used to send the message. Similar to producer config, we need to create consumer config. Consumer config is used by the listener to listen for the message.


As we can see in each of the consumer configs, we create a bean which returns a ConcurrentKafkaListenerContainerFactory. The beans are qualified by a name so that we can use an appropriate container factory for receiving messages.
We also modify the MyTopicMessage and add a member variable topicName that will help us distinguish the topic to which the message needs to be sent.

We also modify the endpoint so that the message can be sent to the appropriate topic.

Now we modify the Listener to integrate everything so that the appropriate message can be received. Look at @KafkaListener annotation, we pass on the ListenerContainerFactory as an argument to receive a message from a queue.
Now we can test the server. The flow of the service is as below.

  1. The message is posted to the endpoint as a POST request
  2. The @RestController receives the message and based on the topicName in the request, it sends to message to the topic with the same name.
  3. The listener receives the message from the appropriate queue.
$ curl -X POST \
>   'http://localhost:8081/send?token=3193fa24-a0ba-451b-83ff-eb563c3fd43b-cdf12811-7e41-474b-8fa6-e8fefd4a738c' \
>   -H 'Content-Type: application/json' \
>   -H 'Postman-Token: 15fbe075-9c80-4af9-a797-6b5e0979fd1b' \
>   -H 'cache-control: no-cache' \
>   -H 'token: 3193fa24-a0ba-451b-83ff-eb563c3fd43b-cdf12811-7e41-474b-8fa6-e8fefd4a738c' \
>   -d '{
> "message" : "This is my message!",
> "topicName" : "FirstTopic"
> }'
Success!

The message receipt is indicated in the server log.
2019-04-12 14:50:47.142  INFO 51396 --- [ntainer#0-0-C-1] i.s.b.t.listeners.KafkaMessageListener   : Received FirstTopic message for partition 0 This is my message!
Similarly, we can send a message to SecondTopic.
$ curl -X POST   'http://localhost:8?token=3193fa24-a0ba-451b-83ff-eb563c3fd43b-cdf12811-7e41-474b-8fa6-e8fefd4a738c'   -H 'Content-Type: application/json'   -H 'Postman-Token: 15fbe075-9c80-4af9-a797-6b5e0979fd1b'   -H 'cache-control: no-cache'   -H 'token: 3193fa24-a0ba-451b-83ff-eb563c3fd43b-cdf12811-7e41-474b-8fa6-e8fefd4a738c'   -d '{
"message" : "This is another message!",
"topicName" : "SecondTopic"
}'
Success!

The message receipt is indicated in the server log.
2019-04-12 14:53:14.972  INFO 51396 --- [ntainer#1-0-C-1] i.s.b.t.listeners.KafkaMessageListener   : Received SecondTopic message for partition 0 This is another message!
The complete code base for this tutorial can be found at my github repository at v1.4.

Wednesday, April 10, 2019

Kafka and Spring

Kafka has become a very popular platform and is being used as a stream, journal and even eventing system. In this post, we explore how to integrate Kafka with spring framework application. First, we add the Kafka bootstrap server details in the application.properties file.

Let's also add dependencies in pom.xml.

Now, for each Kafka topic, we create a listener class. The listener class provides a callback method that is called when any message is retrieved on that topic.

Now we create an endpoint through which we inject a message in the queue. The message is sent to the queue and is retrieved by the listener.

We autowire a KafkaTemplate instance that is used to send the message to the queue.

$ curl -X POST \
>   'http://localhost:8081/send?token=3193fa24-a0ba-451b-83ff-eb563c3fd43b-cdf12811-7e41-474b-8fa6-e8fefd4a738c' \
>   -H 'Content-Type: application/json' \
>   -H 'Postman-Token: e281e3c5-0dae-4bb7-ac8d-6555f66a18c6' \
>   -H 'cache-control: no-cache' \
>   -H 'token: 3193fa24-a0ba-451b-83ff-eb563c3fd43b-cdf12811-7e41-474b-8fa6-e8fefd4a738c' \
>   -d '{
> "message" : "This is my message!"
> }'
Message sent successfully!.
The receipt of message is indicated in the spring server log.

2019-04-10 14:28:03.969  INFO 31091 --- [ntainer#0-0-C-1] i.s.b.t.listeners.MyTopicKafkaListener   : Received Promise message This is my message!