Wednesday, June 24, 2020

Building a multitenant service

If you build any production service that is valuable, very soon you find a customer who wants to use it but wants it white-labeled for him. You also very quickly find out that there is a need to data isolation, i.e. different customers want their data to be kept in their own databases. Because of these reasons, it is always a good idea to design a service keeping multi-tenancy in mind.
In this tutorial we will look at some of the important features required for a multi-tenant service and how do we leverage springframework to deliver a service that is truly enabled for a modern multi-tenant service.
Here are couple of the important aspects of a multi-tenant service.
  1. Everybody gets their own endpoints
  2. Everybody can be given their own databases
Let's design how our endpoint URLs would look like keeping tenancy in mind. We add a discriminator in the URL that identified a tenant. For example our OAuth URLs would become something like below.
http://example.com/tenant1/oauth/token
http://example.com/tenant1/oauth/check_token
To accomplish, we define our RequestMapping with an embedded tenant variable that becomes part of each of the URLs.

 public static final String BASE_ENDPOINT = "/{tenant}";
 public static final String USER_ENDPOINT = BASE_ENDPOINT + "/user";
 public static final String REGISTRATION_ENDPOINT = BASE_ENDPOINT + "/registration";



As we can see, the first part of the URL defines the tenant. It is not mandatory but I also design the service so that a header is expected that also defines the tenant. This is just to guard against some client mistakenly calling a wrong tenant because the same tenant has to be added in two places. We will use following header.
X-tenant:tenant1
Every incoming request into the system needs to know the tenant and the authenticated user that is part of the request. We define a sequence of filters for this purpose. Since these filters need to be called in a particular order, we define the order as follow.

public static final int TENANT_HEADER_PRECEDENCE = Ordered.HIGHEST_PRECEDENCE;
public static final int SEED_DATA_PRECEDENCE = TENANT_HEADER_PRECEDENCE - 1;
public static final int TENANT_PRECEDENCE = SEED_DATA_PRECEDENCE - 1;
public static final int USER_PRECEDENCE = Ordered.LOWEST_PRECEDENCE;

As you can see we are going to define four filters which will perform specific functions.
  1. TenantHeader filter will extract tenant header from the incoming request, match the URL piece with the header and set it in a ThreadLocal variable.
  2. SeedData filter is only require to create some seed data to make the service usable. We need a default tenant in the system so that we can start making some requests. This doesn't do anything most of the time.
  3. Tenant filter will extract the tenant object and set it into another ThreadLocal variable.
  4. User filter is the last in the precedence and will extract the current authenticated user and will store it into a ThreadLocal.
We are using following TutorialRequestContext class to store these thread local variables.

Now let's look at these filters one by one.

This filter just extracts the X-tenant header stores in the thread local.

This filter checks that there should be atleast one tenant in the system, if not found, it inserts a default tenant. This is required so that we can use rest calls.

This filter extract the complete Tenant object from the database and stores it into the thread local.

This filter extract currently authenticate user and populates it in the thread local.
In the last tutorial, we used the spring provided default ClientDetailsService, we create our own service in this tutorial to make sure we can have a schema that we like. To do that we need a entity, TutorialClientDetails and a service TutorialClientDetailsService.

This entity just implements the ClientDetails interface

This service needs to implement a method loadClientByClientId.

Now that all the foundation work is in place, we get down to making our service multitenant. The first things that we need to do is to remove all the old dataSources that we had defined. We will now define a routing data source that will choose the right data source based on the tenant that we are taking to. This is where our tenant discriminator thread local will be usedful. Take a look at the following class.

Here we implement a method determineCurrentLookupKey which uses thread local to identify the current tenant and returns the key.

The bulk of smartness lies in the class TutorialMultitenantConfig, specifically the dataSource bean that returns the default dataSource for the system. What we are assuming that within our resource directory, we will have a tenants subdirectory and within that we will have one properties file per tenant. The property file will look like below.
 

Here we have added usual spring data source properties alongwith a name property which will be the name of the tenant. This name will be matched with the name in the URL and the header.
In the class TutorialMultitenantConfig, look at the definition of variable TENANT_PROPERTIES_RESOURCE, this basically looks up for all the *.properties file in the tenant directory.  Lines 61 through 74, create a data source object for each of the tenants and store these in a map with key being the tenant name. We remember determineCurrentLookupKey method which returned the name of current tenant, that return value is used to fetch appropriate data source object for the request being processed. Line 88 defines a default data source that is used if there is no data source present for the given tenant.

Now our multi tenanted service is ready and it is time to test. The first thing that we need to do is create couple of more databases with exact same schema as the first database. Please keep in mind that this has to be done manually even if you have defined ddl-auto property. Just take a mysqldump of the first database and import in two other databases.
We also need to make sure that the system atleast has one tenant defined in each of the databases. This is required in order to make sure we are able to use the rest calls. The best approach is to have a default tenant and then take a dump so that default tenant is also copied in each of the databases. We have created an admin user which has ADMIN role. We call the user endpoint that will return the list of all the users.
$ curl --request GET \
>   --url http://localhost:8081/tenant1/user \
>   --header 'authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE1OTI5Mzc1MzksInVzZXJfbmFtZSI6ImFkbWluIiwiYXV0aG9yaXRpZXMiOlsiVVNFUiIsIkFETUlOIl0sImp0aSI6IjRmOTdiNThkLTI1NTEtNDA4Yi04ZWM4LWUzZGZmZWQ3MTQ2NiIsImNsaWVudF9pZCI6InN1cGVyc2VjcmV0Y2xpZW50Iiwic2NvcGUiOlsicmVhZCIsImNvZGUiLCJ3cml0ZSJdfQ.fWt_H-ORHP44xOIljoqMfVIeGkqJGQqUBMj8paVxAPM' \
>   --header 'cache-control: no-cache' \
>   --header 'content-type: application/json' \
>   --header 'postman-token: 76b3525e-bc3b-e629-91b6-a75253f657d8' \
>   --header 'x-tenant: tenant1' |python -m json.tool

[
    {
        "createdAt": 1592936869000,
        "createdBy": "UnAuthenticated",
        "email": "admin@springframework.in",
        "fullname": "Spring Tutorial Admin",
        "grantedAuthorities": [
            "ADMIN",
            "USER"
        ],
        "id": 6,
        "mask": 1,
        "tenantId": 1,
        "updatedAt": 1592936869000,
        "updatedBy": "UnAuthenticated",
        "username": "admin"
    },
    {
        "createdAt": 1592904896000,
        "createdBy": "UnAuthenticated",
        "email": "defaultuser@defaultadmin.com",
        "fullname": "Default User",
        "grantedAuthorities": [],
        "id": 2,
        "mask": 0,
        "tenantId": 1,
        "updatedAt": 1592904896000,
        "updatedBy": "UnAuthenticated",
        "username": "defaultuser"
    },
    {
        "createdAt": 1592904900000,
        "createdBy": "UnAuthenticated",
        "email": "vinay@avasthi.com",
        "fullname": "Vinay Avasthi",
        "grantedAuthorities": [],
        "id": 3,
        "mask": 1,
        "tenantId": 1,
        "updatedAt": 1592904900000,
        "updatedBy": "UnAuthenticated",
        "username": "vavasthi"
    }
]

Now we run the same query in tenant2
$ curl --request GET   --url http://localhost:8081/tenant2/user   --header 'authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE1OTI5MzgzMDcsInVzZXJfbmFtZSI6ImFkbWluIiwiYXV0aG9yaXRpZXMiOlsiVVNFUiIsIkFETUlOIl0sImp0aSI6IjJmOWVkOTYxLTk5MjktNDI3Zi1iZGU4LTc5NGIwYWNhMGYzNiIsImNsaWVudF9pZCI6InN1cGVyc2VjcmV0Y2xpZW50Iiwic2NvcGUiOlsicmVhZCIsImNvZGUiLCJ3cml0ZSJdfQ.KNzjB_JqJDaVI5vZNK-OcXDgvM5Uwt4I8tsVCazerpU'   --header 'cache-control: no-cache'   --header 'content-type: application/json'   --header 'postman-token: b1a9651c-e4c0-7b2e-c7b3-e0b3e06fa40e'   --header 'x-tenant: tenant2' |python -m json.tool
[
    {
        "createdAt": 1592904896000,
        "createdBy": "UnAuthenticated",
        "email": "defaultuser@defaultadmin.com",
        "fullname": "Default User",
        "grantedAuthorities": [],
        "id": 2,
        "mask": 0,
        "tenantId": 1,
        "updatedAt": 1592904896000,
        "updatedBy": "UnAuthenticated",
        "username": "defaultuser"
    },
    {
        "createdAt": 1592904900000,
        "createdBy": "UnAuthenticated",
        "email": "vinay@avasthi.com",
        "fullname": "Vinay Avasthi",
        "grantedAuthorities": [],
        "id": 3,
        "mask": 1,
        "tenantId": 1,
        "updatedAt": 1592904900000,
        "updatedBy": "UnAuthenticated",
        "username": "vavasthi"
    },
    {
        "createdAt": 1592938002000,
        "createdBy": "UnAuthenticated",
        "email": "ut2@springframework.in",
        "fullname": "User in T2",
        "grantedAuthorities": [],
        "id": 6,
        "mask": 1,
        "tenantId": 1,
        "updatedAt": 1592938002000,
        "updatedBy": "UnAuthenticated",
        "username": "userint2"
    },
{
        "createdAt": 1592937749000,
        "createdBy": "UnAuthenticated",
        "email": "admin@springframework.in",
        "fullname": "Springframework Tenant2 Administrator",
        "grantedAuthorities": [
            "ADMIN",
            "USER"
        ],
        "id": 5,
        "mask": 1,
        "tenantId": 1,
        "updatedAt": 1592937749000,
        "updatedBy": "UnAuthenticated",
        "username": "admin"
    }
]
As we can see, we are seeing two totally different sets of users which are stored in two totally different sets of databases.

Complete code for this blog post is available in my github repository here

Sunday, June 21, 2020

OAuth2 and JWT Tokens Part 1

In this blog post we look at how do we make our spring server become an OAuth2 Authorization server and start producing JWT tokens.

The first step that we take is disable all the filters that we had added earlier. We don't want to authorize using old style REST calls.  

Now, the first step is to enable our authorization server. We create a new java configuration file that extends AuthorizationServerConfigurerAdapter.

In the configuration , we autowire AuthenticationManager, DataSource and a UserDetailsService. There are two types of auth tokens in OAuth2. The first one are the usual tokens that are requested by users by providing their username and password. The second set of tokens are not tied to individual users, these are called client tokens and could be used for services talking to each other.

We configure ClientDetailsService to use Jdbc client service. We add a password encoder in the configuration and provide a data source that would be used to store persistent data. In our example, we are using SCryptPasswordEncoder. Here we are using spring provided Jdbc client service but one could implement ClientDetailsService and ClientRegistrationService interfaces and provide their own custom implementation for client service.

The next steps is to enable web security and provide an authentication manager bean for performing authentication.

We have seen earlier that we used a UserDetailsService to configure the authorization server. We are not going to use the spring provided service but write our own. 

In the UserDetailsService, we need to provide our own implementation for a method loadByUsername. In this method, we basically load the user entity from our database and create an object of type UserDetails and return it. The object also requires a list of GrantedAuthority

UserDetails is an interface provided in springframework, we create our own class that implements the interface.

Similar to UserDetails, GrantedAuthority is also an interface provided by springframework, we implement that interface to provide our concrete implementation of GrantedAuthority. For a very simple understanding, GrantedAuthority is like a user role. We can use this role later to provide access control on endpoints.

These changes will make the server ready for OAuth2 service. We still have a testing nightmare. Because now all our endpoints are behind this authentication filter, there is no way for us to create new clients and users. We could directly insert values in database, but we still have to worry about how to encrypt the password before inserting into the data. To get out of this situation, I add two endpoints, one for handling clients and another for handling users. These endpoints need to be configured so that they don't go through the authentication service. These need to be removed before the service goes into production.

We define a pojo that mimics the OauthClientDetails schema in the database. It looks like below.

Now we add a repository for handling OauthClientDetails table.

We also add a service layer for Client handling.


We already had a UserRepository, we change it to encrypt the password before use store the password.

We also add a service layer for User.

Now that we have all the layers required, we add the endpoint for Client.


The next thing to modify is the endpoint layer for user.

Now we are ready with our code for create new clients and users. We still have the small issue because if we hit these endpoints, we will get unauthorized error. So we need to put these in the exception list. For this we go to our SecurityConfiguration that we had defined in one of the earlier tutorials.

Look at the method public void configure(WebSecurity web) throws Exception.  We have added following two lines to ignore evaluation of authentication filters for two families of URL.
        web.ignoring().antMatchers("/oauth/client/**");
        web.ignoring().antMatchers("/user/**");
Of course this is very dangerous and we have added it only for testing purpose. In a future tutorial we will have a more elegant solution for this.

Now we are ready. We can test the service with following curl or equivalent command.

curl -X POST \
  http://localhost:8081/oauth/token \
  -H 'authorization: Basic YW5kcm9pZC1jbGllbnQ6YW5kcm9pZC1zZWNyZXQ=' \
  -H 'cache-control: no-cache' \
  -H 'content-type: application/x-www-form-urlencoded' \
  -H 'postman-token: 3a349bc0-1230-adbe-4b79-9b938728a101' \
  -d 'grant_type=password&password=mypassword&username=myuser&client_id=my-client&client_secret=my-secret'

This is the response that we get

{
    "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE1OTI3MjI5NTgsInVzZXJfbmFtZSI6InZhdmFzdGhpIiwianRpIjoiOGExZDYxN2ItZDU4OC00Nzc5LThlOTQtYTBiNWZkYzcxOTg2IiwiY2xpZW50X2lkIjoiYW5kcm9pZC1jbGllbnQiLCJzY29wZSI6WyJjb2RlIl19.n7hCnBdnjMC8vuCsHxkOznfd06iJctGNeypx2fXWla4",
    "token_type": "bearer",
    "refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE1OTI3MjMwNTgsInVzZXJfbmFtZSI6InZhdmFzdGhpIiwianRpIjoiNGU4ODg3OTYtZWZiZS00ZDc5LTg1YmMtN2EzNzFhM2I4Yzg0IiwiY2xpZW50X2lkIjoiYW5kcm9pZC1jbGllbnQiLCJzY29wZSI6WyJjb2RlIl0sImF0aSI6IjhhMWQ2MTdiLWQ1ODgtNDc3OS04ZTk0LWEwYjVmZGM3MTk4NiJ9.xuElTWYSLscgdcqj0t-4t6prJbVOfHVqM331UUjfPBQ",
    "expires_in": 99,
    "scope": "code",
    "jti": "8a1d617b-d588-4779-8e94-a0b5fdc71986"
}
We can get the refresh token by calling the same endpoint with grant_type refresh_token.

curl -X POST \
>   http://localhost:8081/oauth/token \
>   -H 'authorization: Basic YW5kcm9pZC1jbGllbnQ6YW5kcm9pZC1zZWNyZXQ=' \
>   -H 'cache-control: no-cache' \
>   -H 'content-type: application/x-www-form-urlencoded' \
>   -H 'postman-token: 0adf261f-ed1e-85b3-67a5-21430fee8b38' \
>   -d 'grant_type=refresh_token&refresh_token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE1OTI3MjMxOTAsInVzZXJfbmFtZSI6InZhdmFzdGhpIiwianRpIjoiNGMwNjczOTQtZjllNS00NDVjLTg5MzItMmRiMDM4N2U2ZjIxIiwiY2xpZW50X2lkIjoiYW5kcm9pZC1jbGllbnQiLCJzY29wZSI6WyJjb2RlIl0sImF0aSI6ImE1ZmY5MjhiLWQ1YjEtNDQ5Yy04N2I4LTU3ODgwYzY1NjM3NiJ9.RS2K7N5XCHQNov02WNu1QK1AqOvgc7MuNzeCydt7Ajs'
We get following response.
{
    "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE1OTI3MjMxNzQsInVzZXJfbmFtZSI6InZhdmFzdGhpIiwianRpIjoiY2E1OWU3NzktZDRkZS00NDU1LWFlNjItNDQ2NTAxYzUxZjhmIiwiY2xpZW50X2lkIjoiYW5kcm9pZC1jbGllbnQiLCJzY29wZSI6WyJjb2RlIl19.WgVgpBWuDmdJUk3UG8MTKOBsXn5zbEGO8gyqg2akN8o",
    "token_type": "bearer",
    "refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE1OTI3MjMxOTAsInVzZXJfbmFtZSI6InZhdmFzdGhpIiwianRpIjoiNGMwNjczOTQtZjllNS00NDVjLTg5MzItMmRiMDM4N2U2ZjIxIiwiY2xpZW50X2lkIjoiYW5kcm9pZC1jbGllbnQiLCJzY29wZSI6WyJjb2RlIl0sImF0aSI6ImNhNTllNzc5LWQ0ZGUtNDQ1NS1hZTYyLTQ0NjUwMWM1MWY4ZiJ9.3zDqHLSwVLq_Dg8r4Ppf5tfvzxwT_7FEwdTV67l3VYQ",
    "expires_in": 99,
    "scope": "code",
    "jti": "ca59e779-d4de-4455-ae62-446501c51f8f"
}
We can verify a token by accessing oauth/check_token endpoint and providing the token.

curl --request POST \
>   --url http://localhost:8081/oauth/check_token \
>   --header 'authorization: Basic YW5kcm9pZC1jbGllbnQ6YW5kcm9pZC1zZWNyZXQ=' \
>   --header 'cache-control: no-cache' \
>   --header 'content-type: application/x-www-form-urlencoded' \
>   --header 'postman-token: ef035580-4cba-a7a1-deaa-dcf5f59e41fc' \
>   --data token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE1OTI3MjM2MTEsInVzZXJfbmFtZSI6InZhdmFzdGhpIiwianRpIjoiODZhZjZhYmUtZjA4YS00YzIzLThkYzYtYjY2ZmM3ZWQ0YWRiIiwiY2xpZW50X2lkIjoiYW5kcm9pZC1jbGllbnQiLCJzY29wZSI6WyJjb2RlIl19.Kn284yuxxHSxVtEc3D8H5YjVjPi0yN6oX1hDwEx5bOo
We get the following response.
{
    "client_id": "android-client",
    "exp": 1592723611,
    "jti": "86af6abe-f08a-4c23-8dc6-b66fc7ed4adb",
    "scope": [
        "code"
    ],
    "user_name": "myuser"
}
This is all about enabling JWT OAuth tokens with any spring server. We will continue this tutorial in next part with some more important details. The complete code for the working server is here.

Friday, June 5, 2020

Auditing in MySQL

One of the important requirements in many RDBMS based workloads is to have audit log where any row in any table is stamped with who changed it and  when was it changed? To do this in every place where the table is modified is extremely cumbersome. In this blog post we look at how we can enable the springframework based auditing framework to perform this activity.

Before we do that, we need to upgrade the versions for our dependencies since I have been using this project for quite sometime. Also I decided to use lombok logging annotation and removed all the dependencies on log4j.

Here are the modifications to the pom.xml for setting dependencies.


Now we look at our User entity. We need to add auditing fields to this table. Since in a realistic project one would have multiple entities, we create a abstract entity base class with all the audit fields. We also move the primary key to the abstract base class. When we do that, hibernate provides multiple ways of mapping the entities to the database schema. We want all the parent attributes to be stored in a single table with the child class attribute. To accomplish that, we need to add @MappedSuperClass to the base class.

As we can see, we have added five attributes in the base class. One is the id for all our entities and rest four will be used for auditing purposes.
At this time we also add another layer to our code. Currently all the Endpoints directly call the Repository layer, this causes a problem if we want to write functions that can be reused across different endpoints. An example of this need is retrieveUser method that takes an argument that could be a username or a email. Currently this method lies in the Endpoint layer as a private method. This is a useful method in many different contexts, so we create a new UserService layer and move this method there.


Now, let's get to the original task of enabling auditing. First we define a Auditing Config as below.

We had earlier defined a ThreadLocal that is used by the auditAware method defined above to extract currently logged in user and return its userId. As we can see the audit fields in the AbstractBaseEntity expects a Long for @createdBy and  @LastModifiedBy fields. The EntityAuditConfig also has annotation @EnableJpaAuditing which is required.
At this point we also add a new endpoint called ProfileEndpoint which can be used to manage the entity that represents a user profile. This entity currently only contains a url.
Now if we perform any operation on any of the endpoint, we will see the auditing fields automatically populated. Give it a spin. It is a life saver in many productions applications. I have had situations where users changed their passwords, forgot them and then complained saying that they have been hacked. 
The complete code for this and previous posts can be found at my github repository. This tutorial changes are under v1.5.

Monday, February 24, 2020

Hadoop and Spark Locally

As a continuation of the last post, we now look at how to make it deployable in a proper Spark/Hadoop cluster. We will not go into the details of the setup of these clusters themselves but more into how do we make sure a program that we developed earlier could run as a job in a cluster.
We will continue with the setup in our local machine. I am using a Mac so the instructions are with respect to that but most instructions would be common to any other platform.
If Spark is processing data from a database and writing into a hive, pretty much what we did in the last post would work. The problem arises if some of the data being processed exists as flat files. If we want to submit our jobs to a Spark cluster, we can not use local files because the jobs are not running in the local file system. 
The best approach is to either use a hdfs cluster or deploy a single node hdfs on your machine. Here I am enumerating the steps to set up a single node hdfs cluster on a Mac OS X machine.
  • Download the Hadoop distribution for your machine here.
  • Hadoop distribution is available in the form of a .tar.gz file and you can expand it in some directory on your machine. The expansion will create a directory of the form hadoop-x.y.z assuming your Hadoop version is x.y.z. set the environment variable HADOOP_HOME to the full pathname of this Hadoop directory.
  • Add $HADOOP_HOME/bin to the PATH variable.
  • Now we need to update the configuration files for hadoop.
$ cd $HADOOP_HOME/etc/hadoop
$ vi core-site.xml

We update the file with the following properties.

$ vi hdfs-site.xml

We update the file with the following properties.
$ vi mapred-site.xml

We update the file with the following properties.
$ vi yarn-site.xml

We update the file with the following properties.
Now we start the Hadoop.
$ cd $HADOOP_HOME
$ sbin/start-all.sh
Now we can access files stored into the HDFS in our spark jobs.
The next post will go more into the details of how to process files in spark.

Tuesday, February 18, 2020

Hive and Spark

In this blog post, we take a slight deviation from core issues related to the spring framework and look at an issue that spring programmers might face regularly. Recently I was looking with Spark and found a need to read the data from MySQL, do some processing and write it back to a hive instance. We will look at this issue in this post.
We start by making sure our hive instance is backed by a database. To do this, we do the following.
$ cp $HIVE_HOME/conf/hive-default.xml.template $HIVE_HOME/conf/hive-site.xml
$ vi gnu/apache-hive-3.1.2-bin/conf/hive-site.xml

We edit the hive-site.xml file and make sure it is configured as below.
We can configure the values to suit our needs. But make sure the MySQL username, password, database URL exists and has relevant permissions.
Now we create another database in MySQL which will contain the data that we need to process. I am calling this database mystuff, with username mystuff, password mystuff123. We need to run following commands in MySQL to make sure everything exists and permissions are appropriate.
create database mystuff;
Query OK, 1 row affected (0.00 sec)

mysql> create user mystuff@localhost identified by 'mystuff123';
Query OK, 0 rows affected (0.00 sec)

mysql> create user mystuff@'%' identified by 'mystuff123';
Query OK, 0 rows affected (0.01 sec)

mysql> grant all on mystuff.* to mystuff@localhost;
Query OK, 0 rows affected (0.00 sec)

mysql> grant all on mystuff.* to mystuff@'%';
Query OK, 0 rows affected (0.00 sec)

Now we create a plain java project in IntelliJ with the following pom.xml file.
Now to look at the problem at hand. We have a table in MySQL with the following structure.
mysql> desc mydata;
+-------+--------------+------+-----+---------+-------+
| Field | Type         | Null | Key | Default | Extra |
+-------+--------------+------+-----+---------+-------+
| id    | int          | YES  |     | NULL    |       |
| k     | varchar(10)  | YES  |     | NULL    |       |
| v     | varchar(255) | YES  |     | NULL    |       |
+-------+--------------+------+-----+---------+-------+
3 rows in set (0.00 sec)

We want to flatten this table such that each key gets converted to a column for each id. So assume our current data is as below.
mysql> select * from mydata;
+------+-------+-------------------+
| id   | k     | v                 |
+------+-------+-------------------+
|    1 | NAME  | John Doe.         |
|    1 | EMAIL | jd@example.com    |
|    2 | NAME  | Jane Doe          |
|    2 | EMAIL | janed@example.com |
+------+-------+-------------------+
2 rows in set (0.00 sec)
We want to load this data and convert it into a flattened table that has three columns i.e. id, name. email. Then we want to populate this into table person into hive with flattened data.
hive (default)> select * from person;
OK
person.id person.email person.name
1 jd@example.com.  John Doe
2 janed@example.com Jane Doe
Time taken: 0.094 seconds, Fetched: 2 row(s)

The following code will perform the above conversion.
Line numbers 11 through 17 create a SparkSession for hive operations. The key instruction here is enableHiveSupport. Lines 20 through 26 create a SparkSession that will be used for MySQL operations. Lines 29 through 37 load the complete contents of the table. Lines 39 through 42 will group the results by the id and pivot the table on the field K. Line 46 through 49 creates a data frame for hiveSession and writes the contents in a table with named person.

Friday, April 12, 2019

Spring and more Kafka

In the last post, we saw how to integrate Kafka with Spring Boot application. The post was a very simple implementation of Kafka. The real world is much more complex. You have to deal with multiple topics, you need multiple partitions. In this post, we explore more details of a spring boot application with Kafka.
Let's start with the fact that we have multiple topics that we are dealing with. The very first thing that we need to do is to separate out the properties for each of the topics.

We can no longer use default KafkaTemplate, we will have to create our own. The best way is to define a KafkaProducer. To create a KafkaProducer, we need to create KarkaProducerConfig. Here are two sets of KafkaProducerConfig class, one each for the topic.


Looking at the producer config, we can observe that we create two beans which are qualified by names and return an appropriate KafkaTemplate which is later used to send the message. Similar to producer config, we need to create consumer config. Consumer config is used by the listener to listen for the message.


As we can see in each of the consumer configs, we create a bean which returns a ConcurrentKafkaListenerContainerFactory. The beans are qualified by a name so that we can use an appropriate container factory for receiving messages.
We also modify the MyTopicMessage and add a member variable topicName that will help us distinguish the topic to which the message needs to be sent.

We also modify the endpoint so that the message can be sent to the appropriate topic.

Now we modify the Listener to integrate everything so that the appropriate message can be received. Look at @KafkaListener annotation, we pass on the ListenerContainerFactory as an argument to receive a message from a queue.
Now we can test the server. The flow of the service is as below.

  1. The message is posted to the endpoint as a POST request
  2. The @RestController receives the message and based on the topicName in the request, it sends to message to the topic with the same name.
  3. The listener receives the message from the appropriate queue.
$ curl -X POST \
>   'http://localhost:8081/send?token=3193fa24-a0ba-451b-83ff-eb563c3fd43b-cdf12811-7e41-474b-8fa6-e8fefd4a738c' \
>   -H 'Content-Type: application/json' \
>   -H 'Postman-Token: 15fbe075-9c80-4af9-a797-6b5e0979fd1b' \
>   -H 'cache-control: no-cache' \
>   -H 'token: 3193fa24-a0ba-451b-83ff-eb563c3fd43b-cdf12811-7e41-474b-8fa6-e8fefd4a738c' \
>   -d '{
> "message" : "This is my message!",
> "topicName" : "FirstTopic"
> }'
Success!

The message receipt is indicated in the server log.
2019-04-12 14:50:47.142  INFO 51396 --- [ntainer#0-0-C-1] i.s.b.t.listeners.KafkaMessageListener   : Received FirstTopic message for partition 0 This is my message!
Similarly, we can send a message to SecondTopic.
$ curl -X POST   'http://localhost:8?token=3193fa24-a0ba-451b-83ff-eb563c3fd43b-cdf12811-7e41-474b-8fa6-e8fefd4a738c'   -H 'Content-Type: application/json'   -H 'Postman-Token: 15fbe075-9c80-4af9-a797-6b5e0979fd1b'   -H 'cache-control: no-cache'   -H 'token: 3193fa24-a0ba-451b-83ff-eb563c3fd43b-cdf12811-7e41-474b-8fa6-e8fefd4a738c'   -d '{
"message" : "This is another message!",
"topicName" : "SecondTopic"
}'
Success!

The message receipt is indicated in the server log.
2019-04-12 14:53:14.972  INFO 51396 --- [ntainer#1-0-C-1] i.s.b.t.listeners.KafkaMessageListener   : Received SecondTopic message for partition 0 This is another message!
The complete code base for this tutorial can be found at my github repository at v1.4.

Wednesday, April 10, 2019

Kafka and Spring

Kafka has become a very popular platform and is being used as a stream, journal and even eventing system. In this post, we explore how to integrate Kafka with spring framework application. First, we add the Kafka bootstrap server details in the application.properties file.

Let's also add dependencies in pom.xml.

Now, for each Kafka topic, we create a listener class. The listener class provides a callback method that is called when any message is retrieved on that topic.

Now we create an endpoint through which we inject a message in the queue. The message is sent to the queue and is retrieved by the listener.

We autowire a KafkaTemplate instance that is used to send the message to the queue.

$ curl -X POST \
>   'http://localhost:8081/send?token=3193fa24-a0ba-451b-83ff-eb563c3fd43b-cdf12811-7e41-474b-8fa6-e8fefd4a738c' \
>   -H 'Content-Type: application/json' \
>   -H 'Postman-Token: e281e3c5-0dae-4bb7-ac8d-6555f66a18c6' \
>   -H 'cache-control: no-cache' \
>   -H 'token: 3193fa24-a0ba-451b-83ff-eb563c3fd43b-cdf12811-7e41-474b-8fa6-e8fefd4a738c' \
>   -d '{
> "message" : "This is my message!"
> }'
Message sent successfully!.
The receipt of message is indicated in the spring server log.

2019-04-10 14:28:03.969  INFO 31091 --- [ntainer#0-0-C-1] i.s.b.t.listeners.MyTopicKafkaListener   : Received Promise message This is my message!

Sunday, March 10, 2019

Spring Boot and Docker Containers

With microservices based deployment, the first step is to dockerize your software. In our case, we want to create docker images for each of our microservices so that we can orchestrate them better. I have decided to use container registry provided by Google to build and upload the images.

The first thing we need to do is to create a dependency to spring-cloud-dependencies pom. Then we add a dependency to spring-cloud-config-server. Then we add a dockerfile-maven-plugin. Now we need to keep in mind that in the configuration for dockerfile-maven-plugin we need to provide the repository. If can see that our repository starts with gcr.io/. This makes sure that the image after creation is pushed to the container registry hosted by Google. If you want to have some other registry, you need to provide it in the form hostname:portnumber.
Now we can issue the following command the docker image would be build and pushed to google registry.

$ mvn deploy -DskipTests
[INFO] 
[INFO] --- dockerfile-maven-plugin:1.4.10:push (default) @ customer ---
[INFO] Using Google application default credentials
[INFO] loaded credentials for user account with clientId=764086051850-6qr4p6gpi6hn506pt8ejuq83di341hur.apps.googleusercontent.com
[INFO] The push refers to repository [gcr.io/myproject/rae/customer]
[INFO] Image 967d96afcc46: Preparing
[INFO] Image 36e051842720: Preparing
[INFO] Image d1646aaa6540: Preparing
[INFO] Image 19382582b926: Preparing
[INFO] Image 41715d8d7d2b: Preparing
[INFO] Image f3a38968d075: Preparing
[INFO] Image a327787b3c73: Preparing
[INFO] Image 5bb0785f2eee: Preparing
[INFO] Image f3a38968d075: Waiting
[INFO] Image a327787b3c73: Waiting
[INFO] Image 5bb0785f2eee: Waiting
[INFO] Image 36e051842720: Layer already exists
[INFO] Image 41715d8d7d2b: Layer already exists
[INFO] Image d1646aaa6540: Layer already exists
[INFO] Image 19382582b926: Layer already exists
[INFO] Image 967d96afcc46: Pushing
[INFO] Image a327787b3c73: Layer already exists
[INFO] Image 5bb0785f2eee: Layer already exists
[INFO] Image f3a38968d075: Layer already exists
[INFO] Image 967d96afcc46: Pushed
[INFO] 0.0.1-SNAPSHOT: digest: sha256:f6bad4811f867dd75225797bee684ea43c0ddaf2b83de1b419a9f75e9a3941bc size: 2001
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 47.638 s
[INFO] Finished at: 2019-03-10T19:58:27+05:30
[INFO] Final Memory: 55M/857M
[INFO] ------------------------------------------------------------------------

We can see below that the image is now pushed to google container registry.
$ docker images
REPOSITORY                     TAG                 IMAGE ID            CREATED             SIZE
gcr.io/myproject/rae/customer   0.0.1-SNAPSHOT      a566e2f28705        19 seconds ago      518MB
gcr.io/myproject/rae/customer                       8c61d1a5aef4        13 minutes ago      518MB

Tuesday, February 12, 2019

12. Adding GIT release information

In the previous post, we saw how to enable actuator endpoints on our spring server. Once we have done that, it is a good idea to add GIT release information to the server in order to get the information related to the currently deployed server at runtime.
We add the git-commit-id-plugin to our pom.xml.

   <plugin>
    <groupid>pl.project13.maven</groupid>
    <artifactid>git-commit-id-plugin</artifactid>
    <version>2.2.1</version>
   </plugin>
The next things to do is to create a git.properties file in the resource directory of the project.

The git-commit-id is added by the maven plugin so it is a good idea to build the project using maven.

$ mvn clean package -DskipTests
Now we can run the server and check the /manage/info endpoint using curl command.
$ curl -X GET http://localhost:9091/manage/info
{"git":{"branch":"master","commit":{"id":"6fb94c0","time":1549868235.000000000}}

11. Spring Actuators

Spring provides actuators that are a helpful set of tools to debug the application on runtime. Here is how to enable them. We first add actuator dependency in the pom.xml
<dependency>
  <groupid>org.springframework.boot</groupid>
  <artifactid>spring-boot-starter-actuator</artifactid>
</dependency>

Now we define a prefix for all the actuator endpoints. We add the following line into the application.properties file. This enables all the actuator endpoints. We can enable specific endpoints by adding a comma delimited list of endpoints. We also deploy management endpoint on a separate port so that we can block its access from something like ELB.
management.endpoints.web.base-path=/manage
management.server.port=9091
management.endpoints.web.exposure.include=*
Since we already have a security filter defined, we need to exempt health and info endpoint from security check. We add the following URLs int he SecurityConfiguration configure method.
@Override
    public void configure(WebSecurity web) throws Exception {
        web.ignoring().antMatchers("/manage/health");
        web.ignoring().antMatchers("/manage/info");
        web.ignoring().antMatchers("/webjars/**");
        web.ignoring().antMatchers("/error");
        web.ignoring().antMatchers("/swagger-ui.html");
        web.ignoring().antMatchers("/v2/api-docs/**");
        web.ignoring().antMatchers("/swagger-resources/**");
    }
Here we have added paths related to error, actuator, and swagger.
This enables actuator endpoints for our server. We can query these endpoints and following is the sample response.

$ curl -X GET http://localhost:9091/manage/health
{"status":"UP"}