Issues with Collections
- ArrayIndexOutofBoundsException with syncronized collections
- fail-fast with ConcurrentModificationException
- Hidden Interators: Collector's toString, containsAll, removeAll, retrainAll methods.
Atomic variables
- AtomicBoolean
- AtomicInteger
- AtomicIntegerArray
- AtomicIntegerFieldUpdater
- AtomicLong
- AtomicLongArray
- AtomicLongFieldUpdater
- AtomicMarkableReference
- boolean compareAndSet(V expectedReference, V newReference, boolean expectedMark, boolean newMark)
- AtomicReference
- AtomicReferenceArray
- AtomicReferenceFieldUpdater
- AtomicStampedReference
- boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp)
- DoubleAccumulator
- DoubleAdder
- LongAccumulator
- LongAdder
Operations
compareAndSetgetAndSet
lazySet
addAndGet
getAndAdd
incrementAndGet
getAndIncrement
decrementAndGet
getAndDecrement
accumulateAndGet(int x, IntBinaryOperator accumulatorFunction) |
getAndUpdate(IntUnaryOperator updateFunction) |
Concurrent Collections
List Operations
addIfAbsent(E e)
compareAndSet
Map Operations
putIfAbsent
replace(K key, V oldValue, V newbvalue)
V compute(K key, BiFunction<? super K,? super V,? extends V> remappingFunction)
V computeIfAbsent(K key, Function<? super K,? extends V> mappingFunction)
V computeIfPresent(K key, BiFunction<? super K,? super V,? extendsV> remappingFunction)
V compute(K key, BiFunction<? super K,? super V,? extends V> remappingFunction)
V computeIfPresent(K key, BiFunction<? super K,? super V,? extendsV> remappingFunction)
ConcurrentHashMap
finer-grained locking mechanism: lock striping
Weak consistent interator
size and isEmpty are just estimations
Thread States
NEW
RUNNABLE
BLOCKED
WAITING
TIMED_WAITING
- A thread that is waiting for another thread to perform an action for up to a specified waiting time is in this state.
TERMINATED
Interruptible Methods
1. The most common practice: propagating the InterruptedException. Either not catch the InterruptedException, or throwing the exception after capturing it.
2. Restoring interrupt. When a code is part of a Runnable, it is not able to throw an InterruptedException. The only choice is catching the exception, then calling the interrupt on the current thread.
Sychronizers
Examples from Java DocumentationCountDownLatch
class Driver { // ...
void main() throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);
for (int i = 0; i < N; ++i) // create and start threads
new Thread(new Worker(startSignal, doneSignal)).start();
doSomethingElse(); // don't let run yet
startSignal.countDown(); // let all threads proceed
doSomethingElse();
doneSignal.await(); // wait for all to finish
}
}
class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await();
doWork();
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
void doWork() { ... }
}
|
Semaphore
class Pool {
private static final int MAX_AVAILABLE = 100;
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
public Object getItem() throws InterruptedException {
available.acquire();
return getNextAvailableItem();
}
public void putItem(Object x) {
if (markAsUnused(x)) {
available.release();
}
}
// Not a particularly efficient data structure; just for demo
protected Object[] items = ... whatever kinds of items being managed
protected boolean[] used = new boolean[MAX_AVAILABLE];
protected synchronized Object getNextAvailableItem() {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (!used[i]) {
used[i] = true;
return items[i];
}
}
return null; // not reached
}
protected synchronized boolean markAsUnused(Object item) {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (item == items[i]) {
if (used[i]) {
used[i] = false;
return true;
} else
return false;
}
}
return false;
}
}
|
CyclicBarrier
class Solver {
final int N;
final float[][] data;
final CyclicBarrier barrier;
class Worker implements Runnable {
int myRow;
Worker(int row) { myRow = row; }
public void run() {
while (!done()) {
processRow(myRow);
try {
barrier.await();
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
}
public Solver(float[][] matrix) {
data = matrix;
N = matrix.length;
Runnable barrierAction =
new Runnable() { public void run() { mergeRows(...); }};
barrier = new CyclicBarrier(N, barrierAction);
List<Thread> threads = new ArrayList<Thread>(N);
for (int i = 0; i < N; i++) {
Thread thread = new Thread(new Worker(i));
threads.add(thread);
thread.start();
}
// wait until done
for (Thread thread : threads)
thread.join();
}
}
|
Phaser
void runTasks(List<Runnable> tasks) {
final Phaser phaser = new Phaser(1); // "1" to register self
// create and start threads
for (final Runnable task : tasks) {
phaser.register();
new Thread() {
public void run() {
phaser.arriveAndAwaitAdvance(); // await all creation
task.run();
}
}.start();
}
// allow threads to start and deregister self
phaser.arriveAndDeregister();
}
void startTasks(List<Runnable> tasks, final int iterations) {
final Phaser phaser = new Phaser() {
protected boolean onAdvance(int phase, int registeredParties) {
return phase >= iterations || registeredParties == 0;
}
};
phaser.register();
for (final Runnable task : tasks) {
phaser.register();
new Thread() {
public void run() {
do {
task.run();
phaser.arriveAndAwaitAdvance();
} while (!phaser.isTerminated());
}
}.start();
}
phaser.arriveAndDeregister(); // deregister self, don't wait
}
|