Wednesday, April 22, 2020

Apache Geode - Scaling Stateful application with High availability (comparable to Kafka Consumer Groups)

Stateful Scaling

I have been working on a stateful streaming application. It listens to stream of events from up streams, applies business rules. After applying business rules application creates results. Stream of incoming events keep updating these results. Application publishes down streams whenever particular result changes. There is a source, stream transformation, regrouping of results, mapping of values and finally a sink where results gets notified.
Our application was monolithic initially however as expected to handle more load we were required to scale the app. This seems straightforward if it was a stateless application node. Things get trickier when each node of the application owns a state. Meaning, there is a business rule which suggests that related messages to be owned and processed by one single node only.

So considering the stateful nature of processing,  there were several questions we asked ourselves when we started defining our architecture:
  1. What will be data colocation pattern?
    This is mainly question of partitioning the work load.
  2. Where to maintain the state? local ? or another resilient place?
    Scaling is relatively easy but maintaining a state in individual nodes while allowing fail overs is bit complicated.
  3. What happens when one node goes down?
    As we lost a node, we have also lost a state owned by the node. How other nodes rebuilds that lost state and owns real time event processing of events as per new colocation route.
  4. What happens when new node joins?
    New node either can be given some portion of work being owned by current cluster members or it can keep quite for high availability purposes.
Answers to these questions shape up the required architecture of distributed and highly available streaming application. Lots of answers are already available off the shelf from the streaming solutions like Kafka stream. However, when we evaluated it for our application needs mainly in terms of state handling, we felt some of those features were not necessary due to their cost. For example, resiliency of aggregation sub state in Kafka stream topology was bit too much of serde cost for my application's case as intermediary state results are very heavy (due to some unavoidable reasons).

We could not afford serialization or de-serialization to disk and transferring them over the network to other nodes while we are processing high frequency of events. My app could rebuild the lost state through simple database queries or some other way if member goes down. We felt the need to take full control of the colocation and partitioning. Basically, we needed a distributed system to inform us about membership view changes on failovers and new node addition. Rest can be controlled by custom handlers.


Required processing flow:




As shown in the image above, we needed a dispatcher which can colocate events as per our strategy (e.g. using combination of fields from the event) and route all of them to one single node. So in this case Node 1 gets all green type of events, Node 2 gets all yellow type of events etc.
On top of this, we needed a failover. That means if say Node 4 goes down, then we needed a distributed system among these all nodes to get notified of the node drop from the cluster. This event will be handled such that all purple traffic will be routed now to one of remaining 4 nodes based on again our custom logic. Basically, same colocation strategy comes into play and routes next purple event to say Node 5 now.
Another interesting thing with such failover is recovery. Once purple events are routed to node 5 after fail over, Node 5 needs to recover the entire purple state from other highly available source.

Apache Geode:


Considering this and other internal reasons, one interesting solution attempted was to use Apache Geode distributed IMDG solution. It provided a way to form a distributed system of Nodes with the help of distributed maps called Regions. Regions are primary elements of this data grid. These maps can be partitioned with colocation strategy and/or replicated. Primary use case of such distributed caches is to share a highly available state with possibility of map reduce operation by using Distributed Functions.

Issue with Geode for manually controlled stateful scaling:


It solves one problem of in-memory and highly available event state for recovery and failover. However what was missing is a way to be able to get notified in colocated manner to build all green state on Node 1 etc. For something like Kafka stream it is quite natural. However, events are not Geode's first class citizens. It provides AsyncEventQueue listener to process partitioned regions updates in micro batches and in async fashion. However, problem is that only primary node will get notified. 

Using colocation strategy we can make each node a primary node for one or more color codes. But we faced issues with distribution of work loads. Message frequency and volume of different color codes is different. It is difficult for built in colocation support of Geode to understand this and do a fair partitioning. It often resulted in imbalance colocation in terms of primary data distribution. Secondary data (replicated events) made the event distribution even more difficult.

After several research and POCs on Geode IMDG, I came up with below unconventional solution to solve this stateful and fully customizable colocation problem using Geode.

Solution with Circular Buffer type of Regions:


As mentioned earlier, we needed a full control of partitioning based on custom colocation strategy. And needed an async notification mechanism when a region is updated such that events of one kind goes to same node every time until cluster member ship view changes. 

This was solved by using 2 types of region. One is convention region storing events as a HA store for failover and optionally for distributed function based computing. Another type of regions acting as a Circular buffer one for each kind of events. 


As you can see from the diagram, I have built a custom Geode e vents dispatcher something like Kafka Producer with custom partitioning logic. This partitioning logic is built using global or replicated partition ownership region. When new events is received from outside, event dispatcher first backs it up to HA regions as shown by blue box on the right side. After successful backup, dispatcher uses global ownership map to understand the destination node id for the current event. 
In order to route that event to the node, I am using combination of Event Buffer region and AsyncEventQueues. For each node there is a dedicated Event Buffer region which acts as circular buffer for that node consumer. and an AsyncEventQueue  where gemfire queues up change events for this buffer. The node is a exclusive AsyncEventListener of this AsyncEventQueue. 
With this arrangement, moment event is added into corresponding Event Buffer region, its corresponding Node will be notified in asynchronous fashion in micro batches.  This is quite similar to Kafka Consumer groups on the topic partitioned using own partitioning scheme.

On failover, membership events are handled such that global partition table gets updated with new membership view. Geode prefers Consistency from CAP theorem's perspective. Hence, on such membership view changes, Geode becomes momentarily defers event processing while membership getting stabilized. This allows us to ensure global partition table is updated as per new view and new events gets routed to new owner of that kind of event. In worst case, we can always recover from HA event back up cache using various techniques provided by Geode.   

Implementation details:


Implementation is based on hexagonal architecture encapsulating core application logic in services and external inputs and outputs into ports. Service layer is built in technology agnostic manner to react to notifications needed received from underlying distributed framework which could be Kafka streams or Gemfire or any other. So if we abstract out the interaction required with distributed frameworks and build the app to react to callbacks from the framework we can answer above 4 questions without locking down with underlying technical framework. With this I realized we can build distributed highly available application which is agnostic to technical framework used for distributed consensus.

With this in mind,  main parts of implementation are:
  1. Allowing app to maintain global partition map in the cluster. This partition map provides up to date event ownership information of the cluster giving owner node for incoming event. It helps to solve problem of partitioning and fail overs.
  2. Route incoming events to nodes based on the global partition map. From each event we can find it's partition id or owner node id using partitioning map. Distributed framework should route event to particular physical node using these ids. This node is responsible for the processing of these types of events until membership view changes.
  3. Inform peers when some node goes down or new node joins. With these notifications, ownership map needs to be updated based on the load distribution policy. 
  4. After the framework settles on the membership view, recover lost state based on recent changes and metadata of partitioning map.

I hope this helps others trying to use Geode or other similar IMDG solutions for building stateful applications with flexible colocation strategy.


Saturday, April 4, 2020

Some notes on Vert.x, Netty and NIO

I have been working on an application that uses Vert.x messaging to send very heavy objects across wire to clients. These objects are very long list of measurements of a metric. This list keeps increasing throughout the day every few milliseconds. Up to millions of elements. And there are many types of such metrics. Around thousands.
Naturally, due to such huge message sizes and the frequencies of these messages, we do see memory spike. However, as this is vert.x framework a lot of this memory is consumed via Direct Buffers on
non-heap area.

This short note is just to log details about this usage and to log how such non-heap usage can be monitored while using Vert.x.
It is due to underlying Netty library used by Vert.x. Netty library uses NIO apis to send messages over to clients using Non blocking Channels. When we publish or send messages over Vert.x event bus, they are first encoded to bytes. These bytes are sent over to client via NetSocket. While writing bytes onto sockets, Netty uses Buffer allocators (PooledByteBufAllocator). It can either use DirectBytebuffers or HeapBuffers. To avoid GC overheads, by default, Netty uses Direct Buffers. This behaviour can be controlled by io.netty.noPreferDirect  system property. Using direct buffer allocator netty gets hold of DirectByteBuffer (PooledUnsafeDirectByteBuf) of required size. Depending upon message size, frequency and client side acknowledgement of delivery, Netty can consume more and more of these buffers from the pool and hence increasing the non-heap memory usage.

Such usage can be monitored with couple of steps.
First the process should be started with vm argument: -XX:NativeMemoryTracking=detail.
Then jcmd tool can be used to get summary or detailed native memory usage report as below:
jcmd VM.native_memory summary/detail

In a summary mode output, there are different types of memory usage reported. e.g. Heap, Compiler, Thread, Class, GC etc. In this case, as we are interested in Direct Buffer allocation, memory reported in "Internal" section should be observed.
One sample output for this Internal section:
                  Internal (reserved=33458KB, committed=33458KB)
                            (malloc=33394KB #23980)
                            (mmap: reserved=64KB, committed=64KB)



I have a sample program on github to demonstrate this. There is a Vert.x event bus publisher and a consumer main applications. It is not an ideal example of Vert.x publisher but just goes to show how Vert.x uses non-heap memory. As we change the size of message to be published, increase in total non heap memory can be seen after getting jcmd summary report for the producer process.