Thursday, December 25, 2014

Streaming Large (Unknown) data in Python and Java Lazily

While working with large (or unknown in terms of size) collection of data we always ask ourselves few questions. Whether my process has enough memory to hold this collection of data, if I am doing repetitive loops on this data, if some independent iterations can be combined, can we avoid unnecessary intermediate copies etc. In Java once when it comes to collections there are several collection api's. Some built in, some open source third party jars like google-collections Guava, to name a few. Despite of their good work within API implementation it is always about how we as a developer chose to solve our problem using them.
Also in case of huge collections main worry still remains - what if we do not have enough memory to hold a collection. In this blog I want to discuss approach taken by Python and Java Generators and Streams using streaming and Lazy loading concepts to improve memory footprints of collections and their operations.
Just recently I started learning Python and came across this very well known feature (now) - Generators. There are good descriptions and examples with the code to show what Generators are which can be found here. They are similar to Iterators in Python or Java except that they don't need entire collection created in advance. As their name suggests it is generator, generator of data when you are really using it. While you loop over it. How many times it happens that we load entire collection, say from external source like db or file, loop on it, filter out something and then return result to the caller. Loading source collection completely in memory does'nt make sense in this case if it is huge in size. And mainly if you don't need it.

def readNext(fileName, chunk_size=1024):
  
    # this line will be called only once irrespective of multiple calls to same generator
    fileHandle = open(fileName)

    while True:
        data = fileHandle.read(chunk_size)
        if not data:
            break

        # after yeild data will be returned to the caller
        # when next iteration of the loop calls readNext, execution will resume with next iteration
        # of this while loop till it yields again.
        yield data

# caller code
for piece in readNext('bigFile.txt'):
    processData(piece)

readNext is generator, generator of file contents. On first readNext call code executes till yeild statement. Yield returns data which will stored in piece variable. When readNext called next time it will resume iteration from while True statement. Skipping first line which which opens file handle. Basically somemhow generator knows where it last yeild value and resumes execution from there.

This is very concise and beautiful pattern. File contents are never loaded into memory. Data is streamed to caller as and when needed. Same thing could have been achieved by writing Iterator. However It can't get any close to Generator in terms of conciseness.

Now switching to Java, starting from Java 8 we have this new package, Stream. This package has various classes that helps you to work on data collections as streams of data. This notion is based on the fact that Stream is not a data structure . It represents operations to be performed before returning next value from underlying source. So we have a stream, intermediate operations and Terminal operation. Nothing is computed or stored unless terminal operation is called. Once we invoke terminal operation on stream data is scanned through lazily and only that we need.
// Stream that represents infinite series of integers
IntStream infiniteStream = IntStream.iterate(0, i -> i + 1);


// As stream loads collection lazily it knows it only needs to limit to 100 elements from infinite series
int result = infiniteStream.limit(100).map(i->i*2).sum();
System.out.println("Sum of finite values "+result);
As shown in example above we have stream of data coming through from infiniteStream. We definitely can't hold entire data collection in memory. Even though that won't stop is from performing our operations on it. We apply 2 intermediate operation "limit" and "map" over the original stream. At last we use terminal operation "sum". For such processing we don't even want to store entire collection in memory anyway. Java streams are not only limited to solve such problem. There are many more useful features of Streams.

I like this approach at api level provided by Python and Java using Generators ans Streams to let us decide when and how deep we want to use our source data. This not just allows us to control memory footprint but also makes our code more readable.

Saturday, November 15, 2014

State Machine with Akka Actors using Become/UnBecome

I came across very interesting use of Scala and Akka's Become/Unbecome feature to write State Transition Machine based on Actor Model.

As we know, Akka is about Actors and their communication. Communication between actor can happen by sending messages. They dont call methods on each other directly. Every Actor possesses a personal mailbox. It goes and picks new messages from mailbox. Reacts to them. And then send mails to other Actor as part of it's reaction. Pretty Neat !



This is how sample Actor looks like listening to messages and reacting to them:


class SampleActor extends Actor with ActorLogging{

  def receive = {

    case "Hello" => log.info("Hello")

    case _      => log.info("Unknown")

  }

}

"receive" is method in Actor class which gets called when it receives a message. So whenever SampleActor gets a message of type Hello, it will react to it by "log.info("Hello")".

Now what if we can change behaviour of an Actor ? Changing behaviour means which type of messages it will accept and how it will react to them. Thats where we use Become/UnBecome feature of Actors.

class SampleActor extends Actor with ActorLogging {

  import context._

  def receive = {
    case "Hello" =>
    log.info("Hello")
    become(bye)
    case _      => log.info("Unknown")
  }

  def bye: Receive = {
  case "Bye" =>
  log.info("Bye")
  //unbecome() // if unbecome now then it will fall back to default message handling behaviour as implemented in receive method
  case _      => log.info("Unknown")
  }
}

As shown above with method "def bye: Receive" we have added new type of behaviour into SampleActor to respond to "Bye" events. This method is message handler method of type PartialFunction[Any, Unit]. Once it receives "Bye" it will respond to it by "log.info("Bye")". After handling "Bye" message an actor can decide to go back to original state by calling unbecome() on the context. So in other words, Actor's message handling behaviours are stacked. Newly Become behaviour gets added on top of the stack. Once it calls Unbecome behaviour on top of the stack it gets popped out and we are back to old behaviour.

It sounds more like changing strategies. However unlike strategy pattern we can change a behaviour of an Actor so that it will listen to completely different types of messages. Not just responding differently to same type of messages as in the case of strategy pattern.
If we apply this concept to some problem context then we can think of this as Actor changing its state. In Every State he can listen to messages applicable to that state. Based on certain criteria we decide to change State and then it will receive/accept messages applicable to that State.

A good example, to be really able to appreciate this feature, would be Dining Philopher Problem. Just to brief on Dining Philosopher problem, We have a dining table with 5 philosopher. Philosophers thinks a lot. After thinking for a some time he gets hungry. To eat he first picks one chopstick on left. Then one on his right. Once he has both, he can start eating. Done with the eating, he drops chopsticks one at a time starting with right stick first. Then goes back to thinking. This example as mentioned here  and code here is very good to understand the power of changing behavior using become/unbecome.
Dining philosopher is very old school example to help us understand problems around resource sharing in concurrent applications. With Akka, you can think about this from Event based or Reactive perspective. Diagram below shows state transitions for philosopher and Chopstick. It shows how each entity transit from one state to another based on events that it receives.


When modeled above problem using Actor model this how it will look. Both actors receive few events either from other actor or self generated one.

So we have 2 types Actors here. Philosopher and ChopStick. Each can receive certain event and then respond to them by sending out certain new events. On receiving events Actor can move to new state as shown in State Transition Diagram above. These events and states have certain logical ordering. Philopher can never process "Eating" Event while in "Waiting for Left Chostick" State, because only when he grabs both chopsticks he can start eating. That's where State Pattern comes handy. It allows you to identify and toggle behaviour through different states by applying some rules. Without any complicated if-this-then-that kind of code.

I am just taking small portion from that code to explain how Become/UnBecome can be used in Actors to achieve state transition as shown above. Just to note, Philosophers are mentioned as Hakker there.

class Philosopher(name: String, left: ActorRef, right: ActorRef) extends Actor {

import context._

//All hakkers start in a non-eating state
  def receive = {
    case Think =>
      println("%s starts to think".format(name))
      startThinking(5.seconds)
  }

  private def startThinking(duration: FiniteDuration): Unit = {
    become(thinking)
    system.scheduler.scheduleOnce(duration, self, Eat)
  }


  def thinking: Receive = {
    case Eat =>
      become(hungry)
      left ! Take(self)
      right ! Take(self)
  }

  .
  .
  .
}

Philosopher takes reference to left and righ chopstick. Default behaviour is to listen to Think event. Once he starts thinking he goes to "thinking" State. Now he can listen to "Eat" events. As you can see, the way it responds to Eat event is to first try grabbing left chopstick. Now here Philosopher actor sends "Take" message/request to left chopstick actor.


class Chopstick extends Actor {

  import context._

  //When a Chopstick is taken by a hakker
  //It will refuse to be taken by other hakkers
  //But the owning hakker can put it back
  def takenBy(hakker: ActorRef): Receive = {
    case Take(otherHakker) =>
      otherHakker ! Busy(self)
    case Put(`hakker`) =>
      become(available)
  }

  //When a Chopstick is available, it can be taken by a hakker
  def available: Receive = {
    case Take(hakker) =>
      become(takenBy(hakker))
      hakker ! Taken(self)
  }

  //A Chopstick begins its existence as available
  def receive = available
}

Chopstick actors are by default in "available" state which means that it can listen to "Take" kind of events. Once it receivs "Take" event in "available" state it simply gives Philosopher an access and sends "Taken" ack message back to the original Philosopher actor.

This is very classical problem with quite obvious state based behaviour. Event based communution in Actor model and Become/Unbecome feature really makes it very easy to implement. All state changes happens with simple Become and UnBecome switches. All we have to do is to write message handlers (PartialFunction[Any, Unit]) and keep switching between them. Just Imagine writing same state pattern implementation using only Java. There will be definitely more number of lines and not sure if that will be as readable as this one here. And above all we are using Akka. Instead of calling methods directly we are sending/reacting to events. Which means It can only get more scalable from here.

Wednesday, October 22, 2014

Why Atomic lazySet is faster ?

Using synchronized block, Lock api, Volatile, Atomic CAS instructions  we make sure that we can execute certain logic exclusively and atomically. Whenever such synchronization action occure we always read that, it acts as a memory barrier and obeys happens-before protocol. That way we are sure that the value read or written to shared variable by current thread is communicated to rest of the world.

 So what is memory barrier ? 

As we know CPU has various cores. Each core has multi level local caches and shared caches. And there may be more than one processors. Any update done to shared variable by a thread running on one core should be somehow made visible to other cores and CPUs. The way it is done is called as cache coherence. To execute a program faster JIT compiles and CPU Cores can do various reordering of statements given that end result is not changed. This reordering matters a lot to shared variables in multi processor environment. As there are multiple threads running parallely and sharing a variable value it is very important that at some point in the program execution you get a chance to force these execution units to maintain some kind of ordering.
A Memory barrier Make sure that at barrier point depending upon barrier type all instructions queued in store/load barrier queuers/caches are flushed to main memory. Any references to it from other processors caches are invalidated. So that it fetches from memory next time.
In simple words, memory barriers make sure certain writes/reads of shared variables happens now. Just keeps all smart reordering aside. There are various such barriers like StoreStore, StoreLoad, LoadStore, LoadLoad. These are mainly to make sure, for a barrier of type XY, X type of operations provided before barrier will not be reordered with Y instructions provided after barrier point.
There are better documents to know more about these memory barriers and different types of barriers here, here and here. We will see how reading about different types of these barriers helped me to appreciate and understand (to some extent) how LazySet operation works.

So whats differene between Atomic*.lazySet way of setting volatile. By the way just if there is any confusion between Atomic* and volatile, Atomic* objects holds their state in volatile variables. So any set/lazyset/get on Atomic* objects is actually using underlying volatile variable to provide those values.

We know that we use volatile to make sure thread never caches its value into local register or relies on local core caches. Every time we read and write volatile variable it fetches its globally visible value across all CPUs. This is achieved by StoreLoad and Load barriers during/after read/write operations on volatile varialble.

Lets say we have these non-local variables.
volatile int v;
int i = 0;

Write:
v = 5
<< StoreLoad >>


Read
<< LoadLoad ..or some other Load barrier >>
int i = v


So In simple words this means that when we write something to non-local volatile fields there is a barrier after the update instruction to make sure this update is visible to all other CPUs. And when we read any volatile variable Load barrier before that makes sure that we read latest value of volatile variable  instead of using one cached locally to this core level cache or cpu level cache.

So what exactly StoreLoad do ?
StoreLoad is memory barrier which makes sure that volatile updates are followed by flushing of stores buffers to memory so that the update is visible to other CPUs and it is guaranteed to have performed before any further load instruction. As per Dougs cookbook - "StoreLoad barriers are needed on nearly all recent multiprocessors, and are usually the most expensive kind. Part of the reason they are expensive is that they must disable mechanisms that ordinarily bypass cache to satisfy loads from write-buffers."
StoreLoads main intention is to read latest value of variable after its own write to it. This latest value is required to make sure this processor is seeing latest updates done by may be other processors instead of just using its own last set value. Now it will sound essential to the multiple writer case. But lets assume there is only one writer thread to this variable. So as explained very well here, if we guarantee to update a volatile varialbe only from one thread then such expensive storeload memorey barrier can be avoided.

So why LazySet ?
With Single writer style of sharing volatile variable, we can make use of Lazyset. Lazyset write is followed by StoreStore memory barrier. StoreStore barrier just makes sure that LazySet store operation is guaranteed to have happened before any further store after barrier point. And that no 2 lazyset/(or other) writes can be reordered. With Single write implementation thats what we want to happen anyway. Given that is the definition of StoreStore, main purpose we use them in Single writer over normal volatile set is that it is very cheap than StoreLoad barrier. All those expensive steps, that StoreLoad performs to avoid local caches for read operations, are not performed in StoreStore. Further load operations can rely on local copy of data as there is no one else changing values. This gives more freedom to compiler and CPU's to perform better optimization.

So same code if we use LazySet api to write volatile will look like this:

Write:
v = 5
<< StoreStore >>


So why lazyset is faster, because it does not add expensive barriers when we write to the variable. In normal volatile write it uses a relatively expensive barrier which forces to a read, which follows this write, to fetch current latest value from memory. Which makes sense if there are multiple writers but otherwise using Lazyset is smarter choice.

References:
http://mechanical-sympathy.blogspot.sg/2011/07/memory-barriersfences.html
http://mechanical-sympathy.blogspot.sg/2011/09/single-writer-principle.html
Doug Lea Cookbook - http://g.oswego.edu/dl/jmm/cookbook.html
http://psy-lob-saw.blogspot.sg/2012/12/atomiclazyset-is-performance-win-for.html