Sunday, June 4, 2017

Java Concurrency Topics

Issues with Collections

  • ArrayIndexOutofBoundsException with syncronized collections
  • fail-fast with ConcurrentModificationException
  • Hidden Interators: Collector's toString, containsAll, removeAll, retrainAll methods.

Atomic variables


Operations

compareAndSet
getAndSet
lazySet

addAndGet
getAndAdd

incrementAndGet
getAndIncrement

decrementAndGet
getAndDecrement

accumulateAndGet(int x, IntBinaryOperator accumulatorFunction)
getAndAccumulate

getAndUpdate(IntUnaryOperator updateFunction)
getAndUpdate

Concurrent Collections

addIfAbsent(E e)
compareAndSet


Map Operations

putIfAbsent
replace(K key, V oldValue, V newbvalue)
compute(K key, BiFunction<? super K,? super V,? extends V> remappingFunction)
computeIfAbsent(K key, Function<? super K,? extends V> mappingFunction)
computeIfPresent(K key, BiFunction<? super K,? super V,? extendsV> remappingFunction)

ConcurrentHashMap 

finer-grained locking mechanism: lock striping
Weak consistent interator
size and isEmpty are just estimations

Thread States

Interruptible Methods

1. The most common practice: propagating the InterruptedException. Either not catch the InterruptedException, or throwing the exception after capturing it.
2. Restoring interrupt. When a code is part of a Runnable, it is not able to throw an InterruptedException. The only choice is catching the exception, then calling the interrupt on the current thread.

Sychronizers

Examples from Java Documentation

CountDownLatch


class Driver { // ...
   void main() throws InterruptedException {
     CountDownLatch startSignal = new CountDownLatch(1);
     CountDownLatch doneSignal = new CountDownLatch(N);

     for (int i = 0; i < N; ++i) // create and start threads
       new Thread(new Worker(startSignal, doneSignal)).start();

     doSomethingElse();            // don't let run yet
     startSignal.countDown();     // let all threads proceed
     doSomethingElse();
     doneSignal.await();           // wait for all to finish
   }
 }

 class Worker implements Runnable {
   private final CountDownLatch startSignal;
   private final CountDownLatch doneSignal;
   Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
     this.startSignal = startSignal;
     this.doneSignal = doneSignal;
   }
   public void run() {
     try {
       startSignal.await();
       doWork();
       doneSignal.countDown();
     } catch (InterruptedException ex) {} // return;
   }

   void doWork() { ... }
 }

Semaphore


 class Pool {
   private static final int MAX_AVAILABLE = 100;
   private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);

   public Object getItem() throws InterruptedException {
     available.acquire();
     return getNextAvailableItem();
   }

   public void putItem(Object x) {
     if (markAsUnused(x)) {
       available.release();
     }
   }

   // Not a particularly efficient data structure; just for demo

   protected Object[] items = ... whatever kinds of items being managed
   protected boolean[] used = new boolean[MAX_AVAILABLE];

   protected synchronized Object getNextAvailableItem() {
     for (int i = 0; i < MAX_AVAILABLE; ++i) {
       if (!used[i]) {
          used[i] = true;
          return items[i];
       }
     }
     return null; // not reached
   }

   protected synchronized boolean markAsUnused(Object item) {
     for (int i = 0; i < MAX_AVAILABLE; ++i) {
       if (item == items[i]) {
          if (used[i]) {
            used[i] = false;
            return true;
          } else
            return false;
       }
     }
     return false;
   }
 }

CyclicBarrier


 class Solver {
   final int N;
   final float[][] data;
   final CyclicBarrier barrier;

   class Worker implements Runnable {
     int myRow;
     Worker(int row) { myRow = row; }
     public void run() {
       while (!done()) {
         processRow(myRow);

         try {
           barrier.await();
         } catch (InterruptedException ex) {
           return;
         } catch (BrokenBarrierException ex) {
           return;
         }
       }
     }
   }

   public Solver(float[][] matrix) {
     data = matrix;
     N = matrix.length;
     Runnable barrierAction =
       new Runnable() { public void run() { mergeRows(...); }};
     barrier = new CyclicBarrier(N, barrierAction);

     List<Thread> threads = new ArrayList<Thread>(N);
     for (int i = 0; i < N; i++) {
       Thread thread = new Thread(new Worker(i));
       threads.add(thread);
       thread.start();
     }

     // wait until done
     for (Thread thread : threads)
       thread.join();
   }
 }

Phaser


 void runTasks(List<Runnable> tasks) {
   final Phaser phaser = new Phaser(1); // "1" to register self
   // create and start threads
   for (final Runnable task : tasks) {
     phaser.register();
     new Thread() {
       public void run() {
         phaser.arriveAndAwaitAdvance(); // await all creation
         task.run();
       }
     }.start();
   }

   // allow threads to start and deregister self
   phaser.arriveAndDeregister();
 }

 void startTasks(List<Runnable> tasks, final int iterations) {
   final Phaser phaser = new Phaser() {
     protected boolean onAdvance(int phase, int registeredParties) {
       return phase >= iterations || registeredParties == 0;
     }
   };
   phaser.register();
   for (final Runnable task : tasks) {
     phaser.register();
     new Thread() {
       public void run() {
         do {
           task.run();
           phaser.arriveAndAwaitAdvance();
         } while (!phaser.isTerminated());
       }
     }.start();
   }
   phaser.arriveAndDeregister(); // deregister self, don't wait
 }


Thursday, June 1, 2017

Handling Concurrently Executions of Expensive tasks

In order to handle concurrently executions of expensive tasks, Brian Goetz provided an example of Memorizer in his book: “Java Concurrency In Practice”. The following is a slightly modified version of the class.


public class ConcurrentResultCache<A, V> implements Computable<A, V> {
      private static final Logger logger = LoggerFactory.getLogger(ConcurrentResultCache.class);
      private final ConcurrentMap<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>();
      private final Computable<A, V> c;

      public ConcurrentResultCache(final Computable<A, V> c) {
            this.c = c;
      }

      public V compute(final A arg) throws InterruptedException {
            logger.trace("entered compute...");
            // waiting result to be available. intercept any CacellationException.
            while (true) {
                  logger.trace("before first get");
                  Future<V> f = cache.get(arg);
                  logger.trace("after first get");
                  if (f == null) {
                        logger.trace("f is null");
                        // Creating a new task wrapped in a Callable.
                        Callable<V> eval = new Callable<V>() {
                              public V call() throws InterruptedException {
                                    return c.compute(arg);
                              }
                        };
                        FutureTask<V> ft = new FutureTask<V>(eval);
                        f = cache.putIfAbsent(arg, ft);
                        logger.trace("after putIfAbsent()");
                        if (f == null) {
                              // if the task is put into the cache first time, run the task in the SAME thread.
                              f = ft;
                              logger.trace("before run()");
                              ft.run();
                        }
                  }
                  try {
                        logger.trace("before second get");
                        return f.get();
                  } catch (CancellationException e) {
                        cache.remove(arg, f);
                  } catch (ExecutionException e) {
                        throw new IllegalStateException("Task execution exception caught.", e);
                  }
            }
      }
     
      public void remove(final String key) {
            this.cache.remove(key);
      }
}
     
      public void remove(final String key) {
            this.cache.remove(key);
      }
}


As an example, if a Properties need to be loaded in a high traffic application, it could implement the Computable interface.

public class PropertyLoader implements Computable<String, Properties> {
      private static final Logger logger = LoggerFactory.getLogger(PropertyLoader.class);

      @Override
      public Properties compute(final String filePath) throws InterruptedException {
            logger.trace("entered PropertyLoader compute()");
            return readProperties(filePath);
      }

      private Properties readProperties(final String filePath) {
            final String propFile = System.getProperty(filePath);
            final Properties properties = new Properties();
            try {
                  properties.load(new FileReader(propFile));
            } catch (IOException e) {
                  logger.error("Failed to load properties.", e);
            }
            return properties;
      }
}


The usage of the class:

private static ConcurrentResultCache<String, Properties> propertiesCache = new ConcurrentResultCache<String, Properties>(new PropertyLoader());

public Properties getClientThrottlingRateMap() {
    final Properties properties = propertiesCache.compute(propertiesLocation);