Wednesday, April 22, 2020

Apache Geode - Scaling Stateful application with High availability (comparable to Kafka Consumer Groups)

Stateful Scaling

I have been working on a stateful streaming application. It listens to stream of events from up streams, applies business rules. After applying business rules application creates results. Stream of incoming events keep updating these results. Application publishes down streams whenever particular result changes. There is a source, stream transformation, regrouping of results, mapping of values and finally a sink where results gets notified.
Our application was monolithic initially however as expected to handle more load we were required to scale the app. This seems straightforward if it was a stateless application node. Things get trickier when each node of the application owns a state. Meaning, there is a business rule which suggests that related messages to be owned and processed by one single node only.

So considering the stateful nature of processing,  there were several questions we asked ourselves when we started defining our architecture:
  1. What will be data colocation pattern?
    This is mainly question of partitioning the work load.
  2. Where to maintain the state? local ? or another resilient place?
    Scaling is relatively easy but maintaining a state in individual nodes while allowing fail overs is bit complicated.
  3. What happens when one node goes down?
    As we lost a node, we have also lost a state owned by the node. How other nodes rebuilds that lost state and owns real time event processing of events as per new colocation route.
  4. What happens when new node joins?
    New node either can be given some portion of work being owned by current cluster members or it can keep quite for high availability purposes.
Answers to these questions shape up the required architecture of distributed and highly available streaming application. Lots of answers are already available off the shelf from the streaming solutions like Kafka stream. However, when we evaluated it for our application needs mainly in terms of state handling, we felt some of those features were not necessary due to their cost. For example, resiliency of aggregation sub state in Kafka stream topology was bit too much of serde cost for my application's case as intermediary state results are very heavy (due to some unavoidable reasons).

We could not afford serialization or de-serialization to disk and transferring them over the network to other nodes while we are processing high frequency of events. My app could rebuild the lost state through simple database queries or some other way if member goes down. We felt the need to take full control of the colocation and partitioning. Basically, we needed a distributed system to inform us about membership view changes on failovers and new node addition. Rest can be controlled by custom handlers.


Required processing flow:




As shown in the image above, we needed a dispatcher which can colocate events as per our strategy (e.g. using combination of fields from the event) and route all of them to one single node. So in this case Node 1 gets all green type of events, Node 2 gets all yellow type of events etc.
On top of this, we needed a failover. That means if say Node 4 goes down, then we needed a distributed system among these all nodes to get notified of the node drop from the cluster. This event will be handled such that all purple traffic will be routed now to one of remaining 4 nodes based on again our custom logic. Basically, same colocation strategy comes into play and routes next purple event to say Node 5 now.
Another interesting thing with such failover is recovery. Once purple events are routed to node 5 after fail over, Node 5 needs to recover the entire purple state from other highly available source.

Apache Geode:


Considering this and other internal reasons, one interesting solution attempted was to use Apache Geode distributed IMDG solution. It provided a way to form a distributed system of Nodes with the help of distributed maps called Regions. Regions are primary elements of this data grid. These maps can be partitioned with colocation strategy and/or replicated. Primary use case of such distributed caches is to share a highly available state with possibility of map reduce operation by using Distributed Functions.

Issue with Geode for manually controlled stateful scaling:


It solves one problem of in-memory and highly available event state for recovery and failover. However what was missing is a way to be able to get notified in colocated manner to build all green state on Node 1 etc. For something like Kafka stream it is quite natural. However, events are not Geode's first class citizens. It provides AsyncEventQueue listener to process partitioned regions updates in micro batches and in async fashion. However, problem is that only primary node will get notified. 

Using colocation strategy we can make each node a primary node for one or more color codes. But we faced issues with distribution of work loads. Message frequency and volume of different color codes is different. It is difficult for built in colocation support of Geode to understand this and do a fair partitioning. It often resulted in imbalance colocation in terms of primary data distribution. Secondary data (replicated events) made the event distribution even more difficult.

After several research and POCs on Geode IMDG, I came up with below unconventional solution to solve this stateful and fully customizable colocation problem using Geode.

Solution with Circular Buffer type of Regions:


As mentioned earlier, we needed a full control of partitioning based on custom colocation strategy. And needed an async notification mechanism when a region is updated such that events of one kind goes to same node every time until cluster member ship view changes. 

This was solved by using 2 types of region. One is convention region storing events as a HA store for failover and optionally for distributed function based computing. Another type of regions acting as a Circular buffer one for each kind of events. 


As you can see from the diagram, I have built a custom Geode e vents dispatcher something like Kafka Producer with custom partitioning logic. This partitioning logic is built using global or replicated partition ownership region. When new events is received from outside, event dispatcher first backs it up to HA regions as shown by blue box on the right side. After successful backup, dispatcher uses global ownership map to understand the destination node id for the current event. 
In order to route that event to the node, I am using combination of Event Buffer region and AsyncEventQueues. For each node there is a dedicated Event Buffer region which acts as circular buffer for that node consumer. and an AsyncEventQueue  where gemfire queues up change events for this buffer. The node is a exclusive AsyncEventListener of this AsyncEventQueue. 
With this arrangement, moment event is added into corresponding Event Buffer region, its corresponding Node will be notified in asynchronous fashion in micro batches.  This is quite similar to Kafka Consumer groups on the topic partitioned using own partitioning scheme.

On failover, membership events are handled such that global partition table gets updated with new membership view. Geode prefers Consistency from CAP theorem's perspective. Hence, on such membership view changes, Geode becomes momentarily defers event processing while membership getting stabilized. This allows us to ensure global partition table is updated as per new view and new events gets routed to new owner of that kind of event. In worst case, we can always recover from HA event back up cache using various techniques provided by Geode.   

Implementation details:


Implementation is based on hexagonal architecture encapsulating core application logic in services and external inputs and outputs into ports. Service layer is built in technology agnostic manner to react to notifications needed received from underlying distributed framework which could be Kafka streams or Gemfire or any other. So if we abstract out the interaction required with distributed frameworks and build the app to react to callbacks from the framework we can answer above 4 questions without locking down with underlying technical framework. With this I realized we can build distributed highly available application which is agnostic to technical framework used for distributed consensus.

With this in mind,  main parts of implementation are:
  1. Allowing app to maintain global partition map in the cluster. This partition map provides up to date event ownership information of the cluster giving owner node for incoming event. It helps to solve problem of partitioning and fail overs.
  2. Route incoming events to nodes based on the global partition map. From each event we can find it's partition id or owner node id using partitioning map. Distributed framework should route event to particular physical node using these ids. This node is responsible for the processing of these types of events until membership view changes.
  3. Inform peers when some node goes down or new node joins. With these notifications, ownership map needs to be updated based on the load distribution policy. 
  4. After the framework settles on the membership view, recover lost state based on recent changes and metadata of partitioning map.

I hope this helps others trying to use Geode or other similar IMDG solutions for building stateful applications with flexible colocation strategy.


Saturday, April 4, 2020

Some notes on Vert.x, Netty and NIO

I have been working on an application that uses Vert.x messaging to send very heavy objects across wire to clients. These objects are very long list of measurements of a metric. This list keeps increasing throughout the day every few milliseconds. Up to millions of elements. And there are many types of such metrics. Around thousands.
Naturally, due to such huge message sizes and the frequencies of these messages, we do see memory spike. However, as this is vert.x framework a lot of this memory is consumed via Direct Buffers on
non-heap area.

This short note is just to log details about this usage and to log how such non-heap usage can be monitored while using Vert.x.
It is due to underlying Netty library used by Vert.x. Netty library uses NIO apis to send messages over to clients using Non blocking Channels. When we publish or send messages over Vert.x event bus, they are first encoded to bytes. These bytes are sent over to client via NetSocket. While writing bytes onto sockets, Netty uses Buffer allocators (PooledByteBufAllocator). It can either use DirectBytebuffers or HeapBuffers. To avoid GC overheads, by default, Netty uses Direct Buffers. This behaviour can be controlled by io.netty.noPreferDirect  system property. Using direct buffer allocator netty gets hold of DirectByteBuffer (PooledUnsafeDirectByteBuf) of required size. Depending upon message size, frequency and client side acknowledgement of delivery, Netty can consume more and more of these buffers from the pool and hence increasing the non-heap memory usage.

Such usage can be monitored with couple of steps.
First the process should be started with vm argument: -XX:NativeMemoryTracking=detail.
Then jcmd tool can be used to get summary or detailed native memory usage report as below:
jcmd VM.native_memory summary/detail

In a summary mode output, there are different types of memory usage reported. e.g. Heap, Compiler, Thread, Class, GC etc. In this case, as we are interested in Direct Buffer allocation, memory reported in "Internal" section should be observed.
One sample output for this Internal section:
                  Internal (reserved=33458KB, committed=33458KB)
                            (malloc=33394KB #23980)
                            (mmap: reserved=64KB, committed=64KB)



I have a sample program on github to demonstrate this. There is a Vert.x event bus publisher and a consumer main applications. It is not an ideal example of Vert.x publisher but just goes to show how Vert.x uses non-heap memory. As we change the size of message to be published, increase in total non heap memory can be seen after getting jcmd summary report for the producer process.

Sunday, March 29, 2020

G1 Collector Summary

I have been reading this amazing book on Garbage collections and java performance - Java Performance Companion. It has the best explanation of how G1 collector works, at least what I have seen so far. This is just summary for quick reference from this book about G1.

In G1 collector goal is to meet pause time requirement for bigger heaps. All previous collectors CMS, Parallel and Serial, had problem as heap size grows. CMS specially has fragmentation issues and pause times increases with the increase in the heap. Compaction has to be done with Full GC only. G1 solves these problems with the idea of Regions. Instead of traditional generational heap structure, it splits heap into multiple regions. Number of regions are chosen based on heap size requirements such that there would be roughly around 2000 regions of equal size based on configured heap size. e.g. 16 gb heap / 2000 = one region size.

Two main categories of regions:
Available Regions - Regions which are totally free for allocations.
CSets - Collection Sets. These are regions whose live objects to be collected and moved to available regions in next GC. As result, regions in CSets becomes available region at the end of collection cycle.

There are still 2 phase runs. Young generation phase run and old generation phase run.
However collection phase is all about which regions to be collected. Normally it only collects young regions but in mixed mode run it also has few regions from old region group to be collected.

Young generation phase is stop-the-world parallel collection phase. It collects entire young generation into available regions. Old generation phase is kind of similar to CMS with mix of stop-the-world and concurrent phases. But it has fewer phases compared to CMS.


When Young generation phase kicks in -
Number of regions in young generation keeps changing. This is to meet the pause time goal. Smaller the pause time target smaller the young generation regions. After every cycle based on these G1 heuristics number of eden regions to be used are set. Any new memory allocation happens into eden region taken from available regions. When number of total eden region reaches this limit, young generation collection starts.


When mix mode collection happens -
As part of every young generation, objects are getting promoted from survivor to old generation regions. There are few parameters which decides when we need to collect these old generation regions as well so that we can avoid full GC or possibly go out of memory.

-XX:InitiatingHeapOccupancyPercent - percentage of old regions to total heap. default 45%
-XX:G1MixedGCCountTarget  - how many mixed mode runs to be performed. this helps deciding number of old regions from CSet per mixed mode GC run.
-XX:G1HeapWastePercent - memory percentage of total region to be considered as a overkill for a collection phase. default 5%. meaning it is ok to stop collecting the region if collections only claims 5% of region memory.

In summary what happens is that - as old generation size increases and reaches IHOP, G1 schedules old generation phases. It is mix of stop-the-world and concurrent phases. At end of these phases what we know is that where are live objects located on old regions and which are top regions in terms of GC efficiency. It adds these regions into CSets. And G1 enters into Mixed mode collections.
Meaning in next round of collection instead of just collecting young regions, G1 will also has few regions from Old regions group as indicated by their CSet entry.  Based on above parameters G1 will keep doing mixed mode collection for next few cycles and eventually when G1 heuristics are met it goes back to normal young collection mode.


When Full GC happens -
while allocating humungous objects if there are no consecutive available regions.
If marking phase in old generation collection does not finish before it runs out of available regions.

At high level, there are 2 parts of the G1 run:
Concurrent Cycles - which involves initial marking, scanning, remark and cleanup. Please note that intial marking and remark are stop-the-world steps. Cleanup phase here is just to add free regions into available regions list.
Mixed collections - which involves freeing up CSet regions. which comprises of both young and old generation regions.