Tuesday, June 19, 2012

Custom ReaderWriterLock using Atomic and synchronized

This post was due for quite long time now. This thought came to mind immediately when I learned about Java 5 concurrency improvements. I was very curious to find how exactly do they achieve this. Because improvements done in java 5 are not language feature. In fact that is implementation and API's using some basic things. These basics are improved Java memory model, use of volatiles, Atomic Compare and set operations and LockSupport park/unpark utilities. I will write separate post to go through how exactly these basic things helps you to achieve something like lets say Reentrant lock or ReaderWriteLock.


Intent of this article is to show how easily we can create our own API for shareable reader lock and exclusive writer lock by mixing new concurrency features with old java synchronization and have a solution which makes use of advantage of both.



So when we use ReaderWriterLock this is what we expect:

i)   Read operations are frequent compared writer operations

ii)  Read operations can take place concurrently

iii) Write operation is exclusive

iv) Any write has to wait for reader to finish but without getting starved



Code snippet below shows way of doing that. It declares 2 separate locks. One for read and other for write. There are 2 atomic integers declared. Basically policy of read write lock is readers can be share critical sections. Write will be exclusive. That means if there is write going on then there wont be any read possible. But if there is reading in progress writer has to wait for some time before taking such exclusive lock.


So to achieve this we need to know if any reader is running before starting write and similarly if any write is in progress before reader can go through readers critical section. Those 2 atomic integers are used to achieve this very thing. This lock even handles case when writer is waiting for reader but at same time many readers come and run through read method.



This may increase writers wait indefinitely as Reads are very frequently run operation in context of ReaderWriterLock. This implementation handles this case by smartly calling writeRequested.increamentAndGet() before asking writer to wait in write method. Next reader coming inside read method will see this update immediately and be on stand by giving writer a preference. Of course assumption is Writes is not frequent operation which makes sense when we refer to ReaderWriterLock.





package readerWriterLock;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.Executors;
public class ReadWriteLock {
AtomicInteger readingInProgress = new AtomicInteger(0);
Object writerLock = new Object();
Object readerLock = new Object();
writeRequested.incrementAndGet();
AtomicInteger writeRequested = new AtomicInteger(0);
System.out.println("Waiting for reader to finish");
public void write(String msg) {
while(!readingInProgress.compareAndSet(0, 0))
{
{ synchronized (writerLock) { try
System.out.println("No readers are here "+readingInProgress.get());
writerLock.wait(1000); }
catch(InterruptedException ie)
}
{ ie.printStackTrace(); }
assert readingInProgress.compareAndSet(0, 0);
}
// synchronize to make sure only one writer from all contending writers gets chance
// This is EXLCUSIVE writerS (multiple) area // ordering of the set flags will make sure there is no other read in progress synchronized(writerLock) {
// notify readers about it
// EXCLUSIVE WRITE HERE System.out.println("Performing write"); } // writer will only decrement when done writeRequested.decrementAndGet();
readerLock.notify();
// as write operations are rare that other write will start before notification is sent out to readers // even that happens reader logic will take care synchronized(readerLock) { } } public void read() { readingInProgress.incrementAndGet();
// each others count to decrement
// while writing happening wait before we read while(!writeRequested.compareAndSet(0, 0)) { // give up read if write is already requested // this will prevent dead lock between simultaneous read and write where both are waiting for readingInProgress.decrementAndGet(); synchronized(writerLock)
catch(InterruptedException ie)
{ System.out.println("notifying writers to continue"); writerLock.notify(); } synchronized (readerLock) { try { // writer will notify reader once it is done System.out.println("waiting for writers to finish"); readerLock.wait(1000); } {
// if writes are very rare then this locking is not even required
ie.printStackTrace(); } } // this will make sure when we go out of this loop reading count is already updated readingInProgress.incrementAndGet(); } assert writeRequested.compareAndSet(0, 0); System.out.println("Reading in progress"); synchronized(writerLock) {
final ReadWriteLock lock = new ReadWriteLock();
readingInProgress.decrementAndGet(); System.out.println("notifying writers to continue"); writerLock.notify(); } } public static void main(String[] args) { int threadPoolSize=50; int writerFactor=5; ExecutorService executor = Executors.newFixedThreadPool(threadPoolSize); List tasks = new ArrayList();
return null;
for(int i = 0; ;i++) { if(i % writerFactor == 0) tasks.add(new Callable() { @Override public String call() throws Exception { lock.write("Message"); return null; } }); else tasks.add(new Callable() { @Override public String call() throws Exception { lock.read(); } });
}
if(i == threadPoolSize) break; } try { executor.invokeAll(tasks); } catch (InterruptedException e) { e.printStackTrace(); } finally { executor.shutdown(); }
}