Wednesday, April 22, 2020

Apache Geode - Scaling Stateful application with High availability (comparable to Kafka Consumer Groups)

Stateful Scaling

I have been working on a stateful streaming application. It listens to stream of events from up streams, applies business rules. After applying business rules application creates results. Stream of incoming events keep updating these results. Application publishes down streams whenever particular result changes. There is a source, stream transformation, regrouping of results, mapping of values and finally a sink where results gets notified.
Our application was monolithic initially however as expected to handle more load we were required to scale the app. This seems straightforward if it was a stateless application node. Things get trickier when each node of the application owns a state. Meaning, there is a business rule which suggests that related messages to be owned and processed by one single node only.

So considering the stateful nature of processing,  there were several questions we asked ourselves when we started defining our architecture:
  1. What will be data colocation pattern?
    This is mainly question of partitioning the work load.
  2. Where to maintain the state? local ? or another resilient place?
    Scaling is relatively easy but maintaining a state in individual nodes while allowing fail overs is bit complicated.
  3. What happens when one node goes down?
    As we lost a node, we have also lost a state owned by the node. How other nodes rebuilds that lost state and owns real time event processing of events as per new colocation route.
  4. What happens when new node joins?
    New node either can be given some portion of work being owned by current cluster members or it can keep quite for high availability purposes.
Answers to these questions shape up the required architecture of distributed and highly available streaming application. Lots of answers are already available off the shelf from the streaming solutions like Kafka stream. However, when we evaluated it for our application needs mainly in terms of state handling, we felt some of those features were not necessary due to their cost. For example, resiliency of aggregation sub state in Kafka stream topology was bit too much of serde cost for my application's case as intermediary state results are very heavy (due to some unavoidable reasons).

We could not afford serialization or de-serialization to disk and transferring them over the network to other nodes while we are processing high frequency of events. My app could rebuild the lost state through simple database queries or some other way if member goes down. We felt the need to take full control of the colocation and partitioning. Basically, we needed a distributed system to inform us about membership view changes on failovers and new node addition. Rest can be controlled by custom handlers.


Required processing flow:




As shown in the image above, we needed a dispatcher which can colocate events as per our strategy (e.g. using combination of fields from the event) and route all of them to one single node. So in this case Node 1 gets all green type of events, Node 2 gets all yellow type of events etc.
On top of this, we needed a failover. That means if say Node 4 goes down, then we needed a distributed system among these all nodes to get notified of the node drop from the cluster. This event will be handled such that all purple traffic will be routed now to one of remaining 4 nodes based on again our custom logic. Basically, same colocation strategy comes into play and routes next purple event to say Node 5 now.
Another interesting thing with such failover is recovery. Once purple events are routed to node 5 after fail over, Node 5 needs to recover the entire purple state from other highly available source.

Apache Geode:


Considering this and other internal reasons, one interesting solution attempted was to use Apache Geode distributed IMDG solution. It provided a way to form a distributed system of Nodes with the help of distributed maps called Regions. Regions are primary elements of this data grid. These maps can be partitioned with colocation strategy and/or replicated. Primary use case of such distributed caches is to share a highly available state with possibility of map reduce operation by using Distributed Functions.

Issue with Geode for manually controlled stateful scaling:


It solves one problem of in-memory and highly available event state for recovery and failover. However what was missing is a way to be able to get notified in colocated manner to build all green state on Node 1 etc. For something like Kafka stream it is quite natural. However, events are not Geode's first class citizens. It provides AsyncEventQueue listener to process partitioned regions updates in micro batches and in async fashion. However, problem is that only primary node will get notified. 

Using colocation strategy we can make each node a primary node for one or more color codes. But we faced issues with distribution of work loads. Message frequency and volume of different color codes is different. It is difficult for built in colocation support of Geode to understand this and do a fair partitioning. It often resulted in imbalance colocation in terms of primary data distribution. Secondary data (replicated events) made the event distribution even more difficult.

After several research and POCs on Geode IMDG, I came up with below unconventional solution to solve this stateful and fully customizable colocation problem using Geode.

Solution with Circular Buffer type of Regions:


As mentioned earlier, we needed a full control of partitioning based on custom colocation strategy. And needed an async notification mechanism when a region is updated such that events of one kind goes to same node every time until cluster member ship view changes. 

This was solved by using 2 types of region. One is convention region storing events as a HA store for failover and optionally for distributed function based computing. Another type of regions acting as a Circular buffer one for each kind of events. 


As you can see from the diagram, I have built a custom Geode e vents dispatcher something like Kafka Producer with custom partitioning logic. This partitioning logic is built using global or replicated partition ownership region. When new events is received from outside, event dispatcher first backs it up to HA regions as shown by blue box on the right side. After successful backup, dispatcher uses global ownership map to understand the destination node id for the current event. 
In order to route that event to the node, I am using combination of Event Buffer region and AsyncEventQueues. For each node there is a dedicated Event Buffer region which acts as circular buffer for that node consumer. and an AsyncEventQueue  where gemfire queues up change events for this buffer. The node is a exclusive AsyncEventListener of this AsyncEventQueue. 
With this arrangement, moment event is added into corresponding Event Buffer region, its corresponding Node will be notified in asynchronous fashion in micro batches.  This is quite similar to Kafka Consumer groups on the topic partitioned using own partitioning scheme.

On failover, membership events are handled such that global partition table gets updated with new membership view. Geode prefers Consistency from CAP theorem's perspective. Hence, on such membership view changes, Geode becomes momentarily defers event processing while membership getting stabilized. This allows us to ensure global partition table is updated as per new view and new events gets routed to new owner of that kind of event. In worst case, we can always recover from HA event back up cache using various techniques provided by Geode.   

Implementation details:


Implementation is based on hexagonal architecture encapsulating core application logic in services and external inputs and outputs into ports. Service layer is built in technology agnostic manner to react to notifications needed received from underlying distributed framework which could be Kafka streams or Gemfire or any other. So if we abstract out the interaction required with distributed frameworks and build the app to react to callbacks from the framework we can answer above 4 questions without locking down with underlying technical framework. With this I realized we can build distributed highly available application which is agnostic to technical framework used for distributed consensus.

With this in mind,  main parts of implementation are:
  1. Allowing app to maintain global partition map in the cluster. This partition map provides up to date event ownership information of the cluster giving owner node for incoming event. It helps to solve problem of partitioning and fail overs.
  2. Route incoming events to nodes based on the global partition map. From each event we can find it's partition id or owner node id using partitioning map. Distributed framework should route event to particular physical node using these ids. This node is responsible for the processing of these types of events until membership view changes.
  3. Inform peers when some node goes down or new node joins. With these notifications, ownership map needs to be updated based on the load distribution policy. 
  4. After the framework settles on the membership view, recover lost state based on recent changes and metadata of partitioning map.

I hope this helps others trying to use Geode or other similar IMDG solutions for building stateful applications with flexible colocation strategy.


Saturday, April 4, 2020

Some notes on Vert.x, Netty and NIO

I have been working on an application that uses Vert.x messaging to send very heavy objects across wire to clients. These objects are very long list of measurements of a metric. This list keeps increasing throughout the day every few milliseconds. Up to millions of elements. And there are many types of such metrics. Around thousands.
Naturally, due to such huge message sizes and the frequencies of these messages, we do see memory spike. However, as this is vert.x framework a lot of this memory is consumed via Direct Buffers on
non-heap area.

This short note is just to log details about this usage and to log how such non-heap usage can be monitored while using Vert.x.
It is due to underlying Netty library used by Vert.x. Netty library uses NIO apis to send messages over to clients using Non blocking Channels. When we publish or send messages over Vert.x event bus, they are first encoded to bytes. These bytes are sent over to client via NetSocket. While writing bytes onto sockets, Netty uses Buffer allocators (PooledByteBufAllocator). It can either use DirectBytebuffers or HeapBuffers. To avoid GC overheads, by default, Netty uses Direct Buffers. This behaviour can be controlled by io.netty.noPreferDirect  system property. Using direct buffer allocator netty gets hold of DirectByteBuffer (PooledUnsafeDirectByteBuf) of required size. Depending upon message size, frequency and client side acknowledgement of delivery, Netty can consume more and more of these buffers from the pool and hence increasing the non-heap memory usage.

Such usage can be monitored with couple of steps.
First the process should be started with vm argument: -XX:NativeMemoryTracking=detail.
Then jcmd tool can be used to get summary or detailed native memory usage report as below:
jcmd VM.native_memory summary/detail

In a summary mode output, there are different types of memory usage reported. e.g. Heap, Compiler, Thread, Class, GC etc. In this case, as we are interested in Direct Buffer allocation, memory reported in "Internal" section should be observed.
One sample output for this Internal section:
                  Internal (reserved=33458KB, committed=33458KB)
                            (malloc=33394KB #23980)
                            (mmap: reserved=64KB, committed=64KB)



I have a sample program on github to demonstrate this. There is a Vert.x event bus publisher and a consumer main applications. It is not an ideal example of Vert.x publisher but just goes to show how Vert.x uses non-heap memory. As we change the size of message to be published, increase in total non heap memory can be seen after getting jcmd summary report for the producer process.

Sunday, March 29, 2020

G1 Collector Summary

I have been reading this amazing book on Garbage collections and java performance - Java Performance Companion. It has the best explanation of how G1 collector works, at least what I have seen so far. This is just summary for quick reference from this book about G1.

In G1 collector goal is to meet pause time requirement for bigger heaps. All previous collectors CMS, Parallel and Serial, had problem as heap size grows. CMS specially has fragmentation issues and pause times increases with the increase in the heap. Compaction has to be done with Full GC only. G1 solves these problems with the idea of Regions. Instead of traditional generational heap structure, it splits heap into multiple regions. Number of regions are chosen based on heap size requirements such that there would be roughly around 2000 regions of equal size based on configured heap size. e.g. 16 gb heap / 2000 = one region size.

Two main categories of regions:
Available Regions - Regions which are totally free for allocations.
CSets - Collection Sets. These are regions whose live objects to be collected and moved to available regions in next GC. As result, regions in CSets becomes available region at the end of collection cycle.

There are still 2 phase runs. Young generation phase run and old generation phase run.
However collection phase is all about which regions to be collected. Normally it only collects young regions but in mixed mode run it also has few regions from old region group to be collected.

Young generation phase is stop-the-world parallel collection phase. It collects entire young generation into available regions. Old generation phase is kind of similar to CMS with mix of stop-the-world and concurrent phases. But it has fewer phases compared to CMS.


When Young generation phase kicks in -
Number of regions in young generation keeps changing. This is to meet the pause time goal. Smaller the pause time target smaller the young generation regions. After every cycle based on these G1 heuristics number of eden regions to be used are set. Any new memory allocation happens into eden region taken from available regions. When number of total eden region reaches this limit, young generation collection starts.


When mix mode collection happens -
As part of every young generation, objects are getting promoted from survivor to old generation regions. There are few parameters which decides when we need to collect these old generation regions as well so that we can avoid full GC or possibly go out of memory.

-XX:InitiatingHeapOccupancyPercent - percentage of old regions to total heap. default 45%
-XX:G1MixedGCCountTarget  - how many mixed mode runs to be performed. this helps deciding number of old regions from CSet per mixed mode GC run.
-XX:G1HeapWastePercent - memory percentage of total region to be considered as a overkill for a collection phase. default 5%. meaning it is ok to stop collecting the region if collections only claims 5% of region memory.

In summary what happens is that - as old generation size increases and reaches IHOP, G1 schedules old generation phases. It is mix of stop-the-world and concurrent phases. At end of these phases what we know is that where are live objects located on old regions and which are top regions in terms of GC efficiency. It adds these regions into CSets. And G1 enters into Mixed mode collections.
Meaning in next round of collection instead of just collecting young regions, G1 will also has few regions from Old regions group as indicated by their CSet entry.  Based on above parameters G1 will keep doing mixed mode collection for next few cycles and eventually when G1 heuristics are met it goes back to normal young collection mode.


When Full GC happens -
while allocating humungous objects if there are no consecutive available regions.
If marking phase in old generation collection does not finish before it runs out of available regions.

At high level, there are 2 parts of the G1 run:
Concurrent Cycles - which involves initial marking, scanning, remark and cleanup. Please note that intial marking and remark are stop-the-world steps. Cleanup phase here is just to add free regions into available regions list.
Mixed collections - which involves freeing up CSet regions. which comprises of both young and old generation regions.








Wednesday, May 13, 2015

A reader's review on Clean Coding Techniques book

I have been reading "Clean Code - A handbook of Agile Softward Craftsmanship" by Robert Martin for few days. I had read Refactoring techniques by Martin Fowler earlier. I observed that such books or those concepts are not as popular or as talked about as Design patterns or Algorithms. Often you will hear discussion happening on desing patterns. Let it be a high level design decision or an interview. However, functions, comments, formatting, namings etc. which are building blocks of writing code, are not taught as often.
Most interesting thing with "Clean Code" book is that it talks about how to write code. Writing code is normally learnt either by experience or by code reviews from senior. "Clean Code" fills up that gap by providing various guidelines to write clean functions, comments, formatting, namings and other building blocks of code.

I found below guidelines extremely useful in day to day coding. Though I have been following few of them earlier, book helps to bring them to together in some context and all at one place.
  • One level of Abstraction per function:
  • Output Arguments
  • Don’t Return Null

I would like to share one example following these clean coding guidelines. Function should be written such that it has all details at one level of abstraction. Having some parts with details and others with abstracted by using inner functions makes reading difficult.

Lets see below example. As you can see getClientInterestRate has some if checks on client type, which are too much of details to reader. Even though how small this function is, one would have to logically partition the code in order to understand it.


public  InterestRate getClientInterestRate(String clientType, BigDecimal amount)
{
 NavigableMap<Long, InterestRate> map = this.interests.get(clientType);

 long lookupValue = amount.longValue();
 if("CORPORATE".equalsIgnoreCase(clientType))
 { 
  lookupValue = lookupValue / MILLION;
 }

 Entry<Long, InterestRate> entry = map.floorEntry(lookupValue);
 InterestRate value = entry.getValue();
 return value;
}

So why not to partition the code to have consistent abstraction as shown below. Here top level function getClientInterestRate is clearly easier to read compared to earlier version. This version of getClientInterestRate would reveal intention of the function much easily.

public  InterestRate getClientInterestRate(String clientType, BigDecimal amount)
{
 long clientAmount = convertAmountToClientBasedUnit(clientType, amount);
 InterestRate clientInterest = getInterestRateForLessThanOrEqualToAmount(clientAmount, clientType);
 return clientInterest;
}


private InterestRate getInterestRateForLessThanOrEqualToAmount(long lookupAmount, String clientType)
{
 NavigableMap<Long, InterestRate> clientInterestRates = getAllInterestsForClient(clientType);
 Entry<Long, InterestRate> interestRateMapEntry = clientInterestRates.floorEntry(lookupAmount);
 if(interestRateMapEntry == null)
  return NO_MARKUP;

 InterestRate matchingInterestRate = interestRateMapEntry.getValue();
 return matchingInterestRate;
}


private NavigableMap<Long, InterestRate> getAllInterestsForClient(String clientType)
{
 NavigableMap<Long, InterestRate> map = getAllInterests().get(clientType);
 return map;
}

private convertAmountToClientBasedUnit(String clientType, BigDecimal amount)
{
 long lookupValue = amount.longValue();
 if("CORPORATE".equalsIgnoreCase(clientType))
 {
  lookupValue = lookupValue / MILLION;
 }

 return lookupValue;
}

Thursday, December 25, 2014

Streaming Large (Unknown) data in Python and Java Lazily

While working with large (or unknown in terms of size) collection of data we always ask ourselves few questions. Whether my process has enough memory to hold this collection of data, if I am doing repetitive loops on this data, if some independent iterations can be combined, can we avoid unnecessary intermediate copies etc. In Java once when it comes to collections there are several collection api's. Some built in, some open source third party jars like google-collections Guava, to name a few. Despite of their good work within API implementation it is always about how we as a developer chose to solve our problem using them.
Also in case of huge collections main worry still remains - what if we do not have enough memory to hold a collection. In this blog I want to discuss approach taken by Python and Java Generators and Streams using streaming and Lazy loading concepts to improve memory footprints of collections and their operations.
Just recently I started learning Python and came across this very well known feature (now) - Generators. There are good descriptions and examples with the code to show what Generators are which can be found here. They are similar to Iterators in Python or Java except that they don't need entire collection created in advance. As their name suggests it is generator, generator of data when you are really using it. While you loop over it. How many times it happens that we load entire collection, say from external source like db or file, loop on it, filter out something and then return result to the caller. Loading source collection completely in memory does'nt make sense in this case if it is huge in size. And mainly if you don't need it.

def readNext(fileName, chunk_size=1024):
  
    # this line will be called only once irrespective of multiple calls to same generator
    fileHandle = open(fileName)

    while True:
        data = fileHandle.read(chunk_size)
        if not data:
            break

        # after yeild data will be returned to the caller
        # when next iteration of the loop calls readNext, execution will resume with next iteration
        # of this while loop till it yields again.
        yield data

# caller code
for piece in readNext('bigFile.txt'):
    processData(piece)

readNext is generator, generator of file contents. On first readNext call code executes till yeild statement. Yield returns data which will stored in piece variable. When readNext called next time it will resume iteration from while True statement. Skipping first line which which opens file handle. Basically somemhow generator knows where it last yeild value and resumes execution from there.

This is very concise and beautiful pattern. File contents are never loaded into memory. Data is streamed to caller as and when needed. Same thing could have been achieved by writing Iterator. However It can't get any close to Generator in terms of conciseness.

Now switching to Java, starting from Java 8 we have this new package, Stream. This package has various classes that helps you to work on data collections as streams of data. This notion is based on the fact that Stream is not a data structure . It represents operations to be performed before returning next value from underlying source. So we have a stream, intermediate operations and Terminal operation. Nothing is computed or stored unless terminal operation is called. Once we invoke terminal operation on stream data is scanned through lazily and only that we need.
// Stream that represents infinite series of integers
IntStream infiniteStream = IntStream.iterate(0, i -> i + 1);


// As stream loads collection lazily it knows it only needs to limit to 100 elements from infinite series
int result = infiniteStream.limit(100).map(i->i*2).sum();
System.out.println("Sum of finite values "+result);
As shown in example above we have stream of data coming through from infiniteStream. We definitely can't hold entire data collection in memory. Even though that won't stop is from performing our operations on it. We apply 2 intermediate operation "limit" and "map" over the original stream. At last we use terminal operation "sum". For such processing we don't even want to store entire collection in memory anyway. Java streams are not only limited to solve such problem. There are many more useful features of Streams.

I like this approach at api level provided by Python and Java using Generators ans Streams to let us decide when and how deep we want to use our source data. This not just allows us to control memory footprint but also makes our code more readable.

Saturday, November 15, 2014

State Machine with Akka Actors using Become/UnBecome

I came across very interesting use of Scala and Akka's Become/Unbecome feature to write State Transition Machine based on Actor Model.

As we know, Akka is about Actors and their communication. Communication between actor can happen by sending messages. They dont call methods on each other directly. Every Actor possesses a personal mailbox. It goes and picks new messages from mailbox. Reacts to them. And then send mails to other Actor as part of it's reaction. Pretty Neat !



This is how sample Actor looks like listening to messages and reacting to them:


class SampleActor extends Actor with ActorLogging{

  def receive = {

    case "Hello" => log.info("Hello")

    case _      => log.info("Unknown")

  }

}

"receive" is method in Actor class which gets called when it receives a message. So whenever SampleActor gets a message of type Hello, it will react to it by "log.info("Hello")".

Now what if we can change behaviour of an Actor ? Changing behaviour means which type of messages it will accept and how it will react to them. Thats where we use Become/UnBecome feature of Actors.

class SampleActor extends Actor with ActorLogging {

  import context._

  def receive = {
    case "Hello" =>
    log.info("Hello")
    become(bye)
    case _      => log.info("Unknown")
  }

  def bye: Receive = {
  case "Bye" =>
  log.info("Bye")
  //unbecome() // if unbecome now then it will fall back to default message handling behaviour as implemented in receive method
  case _      => log.info("Unknown")
  }
}

As shown above with method "def bye: Receive" we have added new type of behaviour into SampleActor to respond to "Bye" events. This method is message handler method of type PartialFunction[Any, Unit]. Once it receives "Bye" it will respond to it by "log.info("Bye")". After handling "Bye" message an actor can decide to go back to original state by calling unbecome() on the context. So in other words, Actor's message handling behaviours are stacked. Newly Become behaviour gets added on top of the stack. Once it calls Unbecome behaviour on top of the stack it gets popped out and we are back to old behaviour.

It sounds more like changing strategies. However unlike strategy pattern we can change a behaviour of an Actor so that it will listen to completely different types of messages. Not just responding differently to same type of messages as in the case of strategy pattern.
If we apply this concept to some problem context then we can think of this as Actor changing its state. In Every State he can listen to messages applicable to that state. Based on certain criteria we decide to change State and then it will receive/accept messages applicable to that State.

A good example, to be really able to appreciate this feature, would be Dining Philopher Problem. Just to brief on Dining Philosopher problem, We have a dining table with 5 philosopher. Philosophers thinks a lot. After thinking for a some time he gets hungry. To eat he first picks one chopstick on left. Then one on his right. Once he has both, he can start eating. Done with the eating, he drops chopsticks one at a time starting with right stick first. Then goes back to thinking. This example as mentioned here  and code here is very good to understand the power of changing behavior using become/unbecome.
Dining philosopher is very old school example to help us understand problems around resource sharing in concurrent applications. With Akka, you can think about this from Event based or Reactive perspective. Diagram below shows state transitions for philosopher and Chopstick. It shows how each entity transit from one state to another based on events that it receives.


When modeled above problem using Actor model this how it will look. Both actors receive few events either from other actor or self generated one.

So we have 2 types Actors here. Philosopher and ChopStick. Each can receive certain event and then respond to them by sending out certain new events. On receiving events Actor can move to new state as shown in State Transition Diagram above. These events and states have certain logical ordering. Philopher can never process "Eating" Event while in "Waiting for Left Chostick" State, because only when he grabs both chopsticks he can start eating. That's where State Pattern comes handy. It allows you to identify and toggle behaviour through different states by applying some rules. Without any complicated if-this-then-that kind of code.

I am just taking small portion from that code to explain how Become/UnBecome can be used in Actors to achieve state transition as shown above. Just to note, Philosophers are mentioned as Hakker there.

class Philosopher(name: String, left: ActorRef, right: ActorRef) extends Actor {

import context._

//All hakkers start in a non-eating state
  def receive = {
    case Think =>
      println("%s starts to think".format(name))
      startThinking(5.seconds)
  }

  private def startThinking(duration: FiniteDuration): Unit = {
    become(thinking)
    system.scheduler.scheduleOnce(duration, self, Eat)
  }


  def thinking: Receive = {
    case Eat =>
      become(hungry)
      left ! Take(self)
      right ! Take(self)
  }

  .
  .
  .
}

Philosopher takes reference to left and righ chopstick. Default behaviour is to listen to Think event. Once he starts thinking he goes to "thinking" State. Now he can listen to "Eat" events. As you can see, the way it responds to Eat event is to first try grabbing left chopstick. Now here Philosopher actor sends "Take" message/request to left chopstick actor.


class Chopstick extends Actor {

  import context._

  //When a Chopstick is taken by a hakker
  //It will refuse to be taken by other hakkers
  //But the owning hakker can put it back
  def takenBy(hakker: ActorRef): Receive = {
    case Take(otherHakker) =>
      otherHakker ! Busy(self)
    case Put(`hakker`) =>
      become(available)
  }

  //When a Chopstick is available, it can be taken by a hakker
  def available: Receive = {
    case Take(hakker) =>
      become(takenBy(hakker))
      hakker ! Taken(self)
  }

  //A Chopstick begins its existence as available
  def receive = available
}

Chopstick actors are by default in "available" state which means that it can listen to "Take" kind of events. Once it receivs "Take" event in "available" state it simply gives Philosopher an access and sends "Taken" ack message back to the original Philosopher actor.

This is very classical problem with quite obvious state based behaviour. Event based communution in Actor model and Become/Unbecome feature really makes it very easy to implement. All state changes happens with simple Become and UnBecome switches. All we have to do is to write message handlers (PartialFunction[Any, Unit]) and keep switching between them. Just Imagine writing same state pattern implementation using only Java. There will be definitely more number of lines and not sure if that will be as readable as this one here. And above all we are using Akka. Instead of calling methods directly we are sending/reacting to events. Which means It can only get more scalable from here.

Wednesday, October 22, 2014

Why Atomic lazySet is faster ?

Using synchronized block, Lock api, Volatile, Atomic CAS instructions  we make sure that we can execute certain logic exclusively and atomically. Whenever such synchronization action occure we always read that, it acts as a memory barrier and obeys happens-before protocol. That way we are sure that the value read or written to shared variable by current thread is communicated to rest of the world.

 So what is memory barrier ? 

As we know CPU has various cores. Each core has multi level local caches and shared caches. And there may be more than one processors. Any update done to shared variable by a thread running on one core should be somehow made visible to other cores and CPUs. The way it is done is called as cache coherence. To execute a program faster JIT compiles and CPU Cores can do various reordering of statements given that end result is not changed. This reordering matters a lot to shared variables in multi processor environment. As there are multiple threads running parallely and sharing a variable value it is very important that at some point in the program execution you get a chance to force these execution units to maintain some kind of ordering.
A Memory barrier Make sure that at barrier point depending upon barrier type all instructions queued in store/load barrier queuers/caches are flushed to main memory. Any references to it from other processors caches are invalidated. So that it fetches from memory next time.
In simple words, memory barriers make sure certain writes/reads of shared variables happens now. Just keeps all smart reordering aside. There are various such barriers like StoreStore, StoreLoad, LoadStore, LoadLoad. These are mainly to make sure, for a barrier of type XY, X type of operations provided before barrier will not be reordered with Y instructions provided after barrier point.
There are better documents to know more about these memory barriers and different types of barriers here, here and here. We will see how reading about different types of these barriers helped me to appreciate and understand (to some extent) how LazySet operation works.

So whats differene between Atomic*.lazySet way of setting volatile. By the way just if there is any confusion between Atomic* and volatile, Atomic* objects holds their state in volatile variables. So any set/lazyset/get on Atomic* objects is actually using underlying volatile variable to provide those values.

We know that we use volatile to make sure thread never caches its value into local register or relies on local core caches. Every time we read and write volatile variable it fetches its globally visible value across all CPUs. This is achieved by StoreLoad and Load barriers during/after read/write operations on volatile varialble.

Lets say we have these non-local variables.
volatile int v;
int i = 0;

Write:
v = 5
<< StoreLoad >>


Read
<< LoadLoad ..or some other Load barrier >>
int i = v


So In simple words this means that when we write something to non-local volatile fields there is a barrier after the update instruction to make sure this update is visible to all other CPUs. And when we read any volatile variable Load barrier before that makes sure that we read latest value of volatile variable  instead of using one cached locally to this core level cache or cpu level cache.

So what exactly StoreLoad do ?
StoreLoad is memory barrier which makes sure that volatile updates are followed by flushing of stores buffers to memory so that the update is visible to other CPUs and it is guaranteed to have performed before any further load instruction. As per Dougs cookbook - "StoreLoad barriers are needed on nearly all recent multiprocessors, and are usually the most expensive kind. Part of the reason they are expensive is that they must disable mechanisms that ordinarily bypass cache to satisfy loads from write-buffers."
StoreLoads main intention is to read latest value of variable after its own write to it. This latest value is required to make sure this processor is seeing latest updates done by may be other processors instead of just using its own last set value. Now it will sound essential to the multiple writer case. But lets assume there is only one writer thread to this variable. So as explained very well here, if we guarantee to update a volatile varialbe only from one thread then such expensive storeload memorey barrier can be avoided.

So why LazySet ?
With Single writer style of sharing volatile variable, we can make use of Lazyset. Lazyset write is followed by StoreStore memory barrier. StoreStore barrier just makes sure that LazySet store operation is guaranteed to have happened before any further store after barrier point. And that no 2 lazyset/(or other) writes can be reordered. With Single write implementation thats what we want to happen anyway. Given that is the definition of StoreStore, main purpose we use them in Single writer over normal volatile set is that it is very cheap than StoreLoad barrier. All those expensive steps, that StoreLoad performs to avoid local caches for read operations, are not performed in StoreStore. Further load operations can rely on local copy of data as there is no one else changing values. This gives more freedom to compiler and CPU's to perform better optimization.

So same code if we use LazySet api to write volatile will look like this:

Write:
v = 5
<< StoreStore >>


So why lazyset is faster, because it does not add expensive barriers when we write to the variable. In normal volatile write it uses a relatively expensive barrier which forces to a read, which follows this write, to fetch current latest value from memory. Which makes sense if there are multiple writers but otherwise using Lazyset is smarter choice.

References:
http://mechanical-sympathy.blogspot.sg/2011/07/memory-barriersfences.html
http://mechanical-sympathy.blogspot.sg/2011/09/single-writer-principle.html
Doug Lea Cookbook - http://g.oswego.edu/dl/jmm/cookbook.html
http://psy-lob-saw.blogspot.sg/2012/12/atomiclazyset-is-performance-win-for.html