Showing posts with label concurrency. Show all posts
Showing posts with label concurrency. Show all posts

Tuesday, October 24, 2017

Java Concurrency Part 1 - Thread Synchronization with ReentrantLock and Condition

Examples of thread synchronization between wait()/notifyAll() and reentrantLock/condition (await/signal).

1. Resource sharing:

ReentrantLock/Condition/await/signal


public class Exchanger {
       private String message = null;
       private boolean empty = true;
       private Object lock = new Object();

       public void push(String message) {
             synchronized (lock) {
                    try {
                           while (!empty) {
                                 try {
                                        lock.wait();
                                 } catch (InterruptedException e) {
                                        e.printStackTrace();
                                 }
                           }
                           this.message = message;
                           this.empty = false;
                    } finally {
                           lock.notifyAll();
                    }
             }
       }

       public String pull() {
             synchronized (lock) {
                    try {
                           while (empty) {
                                 try {
                                        lock.wait();
                                 } catch (InterruptedException e) {
                                        e.printStackTrace();
                                 }
                           }
                           final String result = message;
                           this.message = null;
                           this.empty = true;
                           return result;
                    } finally {
                           lock.notifyAll();
                    }
             }
       }

       public static void main(String[] args) {
             final Exchanger exchanger = new Exchanger();
             final String[] stringArray = new String[] { "a", "b", "c", "d", "e", "f" };
             new Thread(() -> {
                    for (String str : stringArray) {
                           exchanger.push(str);
                    }
                    System.out.println("push EXIT");
             }).start();

             new Thread(() -> {
                    int i = 0;
                    final int size = stringArray.length;
                    do {
                           i++;
                           String result = exchanger.pull();
                           System.out.println("" + i + ":" + result);
                    } while (i < size);
                    System.out.println("pull EXIT");
             }).start();
       }
}



object wait/notifyAll


public class Exchanger2 {
       private String message = null;
       private boolean empty = true;
       private Lock lock = new ReentrantLock();
       private Condition messageReady = lock.newCondition();

       public void push(String message) {
             lock.lock();
             try {
                    while (!empty) {
                           try {
                                 messageReady.await();
                           } catch (InterruptedException e) {
                                 e.printStackTrace();
                           }
                    }
                    this.message = message;
                    this.empty = false;
                    messageReady.signal();
             } finally {
                    lock.unlock();
             }

       }

       public String pull() {
             lock.lock();
             try {
                    while (empty) {
                           try {
                                 messageReady.await();
                           } catch (InterruptedException e) {
                                 e.printStackTrace();
                           }
                    }
                    final String result = message;
                    this.message = null;
                    this.empty = true;
                    messageReady.signal();
                    return result;
             } finally {
                    lock.unlock();
             }
       }

       public static void main(String[] args) {
             final Exchanger2 exchanger = new Exchanger2();
             final String[] stringArray = new String[] { "a", "b", "c", "d", "e", "f" };
             new Thread(() -> {
                    for (String str : stringArray) {
                           exchanger.push(str);
                    }
                    System.out.println("push EXIT");
             }).start();

             new Thread(() -> {
                    int i = 0;
                    final int size = stringArray.length;
                    do {
                           i++;
                           String result = exchanger.pull();
                           System.out.println("" + i + ":" + result);
                    } while (i < size);
                    System.out.println("pull EXIT");
             }).start();
       }
}


2. Synchronization between two actions


public class JavaReentrantLock {
       private static final Lock lock1 = new ReentrantLock();
       private static final Lock lock2 = new ReentrantLock();
       private static int counter = 0;
       private static Random random = new Random();

       private static final CountDownLatch latchStartLatch = new CountDownLatch(1);
       private static final CountDownLatch latchGroupLatch = new CountDownLatch(2);

       public static void main(String[] args) {
             final Thread thread1 = new Thread(() -> {
                    System.out.println("action1 starting...");
                    action1();
             });
             final Thread thread2 = new Thread(() -> {
                    System.out.println("action2 starting...");
                    action2();
             });
             latchStartLatch.countDown();
             thread1.start();
             thread2.start();
             while (true) {
                    try {
                           latchGroupLatch.await();
                           break;
                    } catch (InterruptedException e) {
                           e.printStackTrace();
                    }
             }
       }

       private static void println(final String value) {
             counter++;
             System.out.println("value(" + counter + "):" + value);
             if (counter >= 100) {
                    System.exit(0);
             }
       }

       private static void action1() {
             while (true) {
                    try {
                           latchStartLatch.await();
                           break;
                    } catch (InterruptedException e) {
                           e.printStackTrace();
                    }
             }
             latchGroupLatch.countDown();
             while (true) {
                    lock1.lock();
                    try {
                           lock2.lock();
                           try {
                                 println("action1");
                           } finally {
                                 lock2.unlock();
                           }
                           try {
                                  TimeUnit.MICROSECONDS.sleep(random.nextInt(1000));
                           } catch (InterruptedException e) {
                                 e.printStackTrace();
                           }
                    } finally {
                           lock1.unlock();
                    }
                    try {
                           TimeUnit.MICROSECONDS.sleep(random.nextInt(1000));
                    } catch (InterruptedException e) {
                           e.printStackTrace();
                    }
             }
       }

       private static void action2() {
             while (true) {
                    try {
                           latchStartLatch.await();
                           break;
                    } catch (InterruptedException e) {
                           e.printStackTrace();
                    }
             }
             while (true) {
                    lock2.lock();
                    try {
                           lock1.lock();
                           try {
                                 println("action2");
                           } finally {
                                 lock1.unlock();
                           }
                           try {
                                  TimeUnit.MICROSECONDS.sleep(random.nextInt(1000));
                           } catch (InterruptedException e) {
                                 e.printStackTrace();
                           }
                    } finally {
                           lock2.unlock();
                    }
                    try {
                           TimeUnit.MICROSECONDS.sleep(random.nextInt(1000));
                    } catch (InterruptedException e) {
                           e.printStackTrace();
                    }
             }
       }
}