Showing posts with label Atomic Variables. Show all posts
Showing posts with label Atomic Variables. Show all posts

Sunday, June 4, 2017

Java Concurrency Topics

Issues with Collections

  • ArrayIndexOutofBoundsException with syncronized collections
  • fail-fast with ConcurrentModificationException
  • Hidden Interators: Collector's toString, containsAll, removeAll, retrainAll methods.

Atomic variables


Operations

compareAndSet
getAndSet
lazySet

addAndGet
getAndAdd

incrementAndGet
getAndIncrement

decrementAndGet
getAndDecrement

accumulateAndGet(int x, IntBinaryOperator accumulatorFunction)
getAndAccumulate

getAndUpdate(IntUnaryOperator updateFunction)
getAndUpdate

Concurrent Collections

addIfAbsent(E e)
compareAndSet


Map Operations

putIfAbsent
replace(K key, V oldValue, V newbvalue)
compute(K key, BiFunction<? super K,? super V,? extends V> remappingFunction)
computeIfAbsent(K key, Function<? super K,? extends V> mappingFunction)
computeIfPresent(K key, BiFunction<? super K,? super V,? extendsV> remappingFunction)

ConcurrentHashMap 

finer-grained locking mechanism: lock striping
Weak consistent interator
size and isEmpty are just estimations

Thread States

Interruptible Methods

1. The most common practice: propagating the InterruptedException. Either not catch the InterruptedException, or throwing the exception after capturing it.
2. Restoring interrupt. When a code is part of a Runnable, it is not able to throw an InterruptedException. The only choice is catching the exception, then calling the interrupt on the current thread.

Sychronizers

Examples from Java Documentation

CountDownLatch


class Driver { // ...
   void main() throws InterruptedException {
     CountDownLatch startSignal = new CountDownLatch(1);
     CountDownLatch doneSignal = new CountDownLatch(N);

     for (int i = 0; i < N; ++i) // create and start threads
       new Thread(new Worker(startSignal, doneSignal)).start();

     doSomethingElse();            // don't let run yet
     startSignal.countDown();     // let all threads proceed
     doSomethingElse();
     doneSignal.await();           // wait for all to finish
   }
 }

 class Worker implements Runnable {
   private final CountDownLatch startSignal;
   private final CountDownLatch doneSignal;
   Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
     this.startSignal = startSignal;
     this.doneSignal = doneSignal;
   }
   public void run() {
     try {
       startSignal.await();
       doWork();
       doneSignal.countDown();
     } catch (InterruptedException ex) {} // return;
   }

   void doWork() { ... }
 }

Semaphore


 class Pool {
   private static final int MAX_AVAILABLE = 100;
   private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);

   public Object getItem() throws InterruptedException {
     available.acquire();
     return getNextAvailableItem();
   }

   public void putItem(Object x) {
     if (markAsUnused(x)) {
       available.release();
     }
   }

   // Not a particularly efficient data structure; just for demo

   protected Object[] items = ... whatever kinds of items being managed
   protected boolean[] used = new boolean[MAX_AVAILABLE];

   protected synchronized Object getNextAvailableItem() {
     for (int i = 0; i < MAX_AVAILABLE; ++i) {
       if (!used[i]) {
          used[i] = true;
          return items[i];
       }
     }
     return null; // not reached
   }

   protected synchronized boolean markAsUnused(Object item) {
     for (int i = 0; i < MAX_AVAILABLE; ++i) {
       if (item == items[i]) {
          if (used[i]) {
            used[i] = false;
            return true;
          } else
            return false;
       }
     }
     return false;
   }
 }

CyclicBarrier


 class Solver {
   final int N;
   final float[][] data;
   final CyclicBarrier barrier;

   class Worker implements Runnable {
     int myRow;
     Worker(int row) { myRow = row; }
     public void run() {
       while (!done()) {
         processRow(myRow);

         try {
           barrier.await();
         } catch (InterruptedException ex) {
           return;
         } catch (BrokenBarrierException ex) {
           return;
         }
       }
     }
   }

   public Solver(float[][] matrix) {
     data = matrix;
     N = matrix.length;
     Runnable barrierAction =
       new Runnable() { public void run() { mergeRows(...); }};
     barrier = new CyclicBarrier(N, barrierAction);

     List<Thread> threads = new ArrayList<Thread>(N);
     for (int i = 0; i < N; i++) {
       Thread thread = new Thread(new Worker(i));
       threads.add(thread);
       thread.start();
     }

     // wait until done
     for (Thread thread : threads)
       thread.join();
   }
 }

Phaser


 void runTasks(List<Runnable> tasks) {
   final Phaser phaser = new Phaser(1); // "1" to register self
   // create and start threads
   for (final Runnable task : tasks) {
     phaser.register();
     new Thread() {
       public void run() {
         phaser.arriveAndAwaitAdvance(); // await all creation
         task.run();
       }
     }.start();
   }

   // allow threads to start and deregister self
   phaser.arriveAndDeregister();
 }

 void startTasks(List<Runnable> tasks, final int iterations) {
   final Phaser phaser = new Phaser() {
     protected boolean onAdvance(int phase, int registeredParties) {
       return phase >= iterations || registeredParties == 0;
     }
   };
   phaser.register();
   for (final Runnable task : tasks) {
     phaser.register();
     new Thread() {
       public void run() {
         do {
           task.run();
           phaser.arriveAndAwaitAdvance();
         } while (!phaser.isTerminated());
       }
     }.start();
   }
   phaser.arriveAndDeregister(); // deregister self, don't wait
 }