The challenge is to ensure the counter consistently yields the correct results, even when many threads access and modify counter's intrinsic value.
ThreadUnsafeCounter.java represents a naive implementation which fails to handle concurrent access. The failure is proved by a multi-threaded test:
public class ThreadUnsafeCounterMultiThreadedTest {
..
Counter counter = new ThreadUnsafeCounter(INITIAL_VALUE);
..
// Note that a test failure is expected
@Test(expected = AssertionError.class)
public void incrementAndGet() {
testExecutor.runTest(incrementAndGetCommand);
assertEquals(getExpectedIncrementedValue(), counter.get());
}
..
}
On the other hand ThreadSafeCounter.java relies on the Active Object pattern when handling concurrent requests:
public class ThreadSafeCounter implements Counter {
// The internal state, subject to race conditions.
private long value;
// Activation List: incoming requests (tasks)
// are put into a queue
private BlockingQueue<Callable<Long>> taskQueue =
new LinkedBlockingQueue<>();
// Callback: provides access to the calculated results
// (incrementAndGet, etc.)
private BlockingQueue<Long> resultQueue =
new LinkedBlockingQueue<>();
// Scheduler: a dedicated thread created and started
// when the counter is instantiated
public ThreadSafeCounter(long value) {
..
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
// Constantly watching for incoming requests
..
}
}
}).start();
}
..
// Proxy: allows the clients to submit new tasks
private long enqueueTask(Callable<Long> task) {..}
}
The implementation offloads the actual task scheduling to the Executor framework. The execution results are handled asynchronously via futures. For simplicity, I chose to block the clients until the results become available. Still in the ThreadSafeCounter.java:
// This is the actual task scheduler. It only allows for a single task at a time.
ExecutorService executorService = Executors.newSingleThreadExecutor();
..
// At some point in the future the counter's new value will be available
Future<Long> future = executorService.submit(taskQueue.take());
..
// Meanwhile, the client is blocked until the result is ready
while (true) {
Long result = resultQueue.poll(500, TimeUnit.MILLISECONDS);
if (result != null) break;
}
..
Source Code
Resources