Saturday, 30 August 2014

Concurrency patterns - Active Object

Active Object pattern decouples method execution from its invocation. Think asynchronous method invocation, callbacks etc. To avoid race conditions, incoming client requests are queued and handled by a scheduler. The scheduler picks a queued object and makes it run its logic. It is object's responsibility to know what to do when it gets invoked, hence the Active Object.


Application

  • Android - asynchronous background service sending messages to the UI thread
  • Any kind of a queue / message delivery system
An important aspect is that each of the invoked objects resides in its own thread of control. The scheduler guarantees a serialised access to the queued tasks.

Key Components

  • Proxy: provides interface the clients can use to submit their requests
  • Activation List: a queue of pending client requests
  • Scheduler: decides which request to execute next
  • Active Object: implements the core business logic
  • Callback: contains execution result (i.e. a promise or a future)


Advantages

  • Reduced code complexity: Once pattern's mechanics are in place, the code can be treated as single-threaded.
  • No need for additional synchronization: Concurrent requests are serialized and handled by a single internal thread


Drawbacks

  • Performance overhead: Sophisticated scheduling, spinning and request handling can be expensive in terms of memory and can lead to non-trivial context switching.
  • Programming overhead: Active Object essentially requires you to create a small framework. It can definitely be kept self-contained enough, but it boils down to a simple the fact that you need to be aware of multiple components:
    • Activation List - the queue of incoming requests
    • Callback - yields the results
    • Scheduler thread - watches for incoming requests
    • Scheduler implementation - enqueues requests
    • Proxy - client interface allowing to submit requests
    • Future - an asynchronous response


Example Implementation

A simple counter implementing a sub-set of the AtomicLong. The counter keeps its internal state which is then a subject to race conditions:
 public class ThreadSafeCounter implements Counter {  
   private long value;  
   ..  
 }  
The challenge is to ensure the counter consistently yields the correct results, even when many threads access and modify counter's intrinsic value.

ThreadUnsafeCounter.java represents a naive implementation which fails to handle concurrent access. The failure is proved by a multi-threaded test:
 public class ThreadUnsafeCounterMultiThreadedTest {  
   ..  
   Counter counter = new ThreadUnsafeCounter(INITIAL_VALUE);  
   ..  
   // Note that a test failure is expected  
   @Test(expected = AssertionError.class)  
   public void incrementAndGet() {  
     testExecutor.runTest(incrementAndGetCommand);  
     assertEquals(getExpectedIncrementedValue(), counter.get());  
   }  
   ..  
 }  
On the other hand ThreadSafeCounter.java relies on the Active Object pattern when handling concurrent requests:
 public class ThreadSafeCounter implements Counter {  
   // The internal state, subject to race conditions.  
   private long value;  
   // Activation List: incoming requests (tasks)   
   // are put into a queue  
   private BlockingQueue<Callable<Long>> taskQueue =   
                 new LinkedBlockingQueue<>();  
   // Callback: provides access to the calculated results   
   // (incrementAndGet, etc.)  
   private BlockingQueue<Long> resultQueue =   
                 new LinkedBlockingQueue<>();  
   // Scheduler: a dedicated thread created and started   
   // when the counter is instantiated  
   public ThreadSafeCounter(long value) {  
     ..  
     new Thread(new Runnable() {  
       @Override  
       public void run() {  
         while (true) {  
           // Constantly watching for incoming requests  
           ..  
         }  
       }  
     }).start();  
   }  
   ..  
   // Proxy: allows the clients to submit new tasks  
   private long enqueueTask(Callable<Long> task) {..}  
 }  
The implementation offloads the actual task scheduling to the Executor framework. The execution results are handled asynchronously via futures. For simplicity, I chose to block the clients until the results become available. Still in the ThreadSafeCounter.java:
   // This is the actual task scheduler. It only allows for a single task at a time.  
   ExecutorService executorService = Executors.newSingleThreadExecutor();  
   ..  
   // At some point in the future the counter's new value will be available  
   Future<Long> future = executorService.submit(taskQueue.take());  
   ..  
   // Meanwhile, the client is blocked until the result is ready  
   while (true) {  
     Long result = resultQueue.poll(500, TimeUnit.MILLISECONDS);  
     if (result != null) break;  
   }  
   ..  

Source Code


Resources


2 comments:

  1. Hi Tom,

    Nice post. In your "Drawbacks" section you mention, "...six specific components...", but you don't state which components.

    I think it would improve this article, if you would enumerate which six components.

    ReplyDelete
  2. Hi Android Japan,
    Thank you very much for your comment. I updated the article, hope it makes sense. Now I realise that the exact number of specific components is arguable. Thanks again and have a nice day.

    ReplyDelete