Showing posts with label "Java Concurrency In Practice". Show all posts
Showing posts with label "Java Concurrency In Practice". Show all posts

Thursday, June 1, 2017

Handling Concurrently Executions of Expensive tasks

In order to handle concurrently executions of expensive tasks, Brian Goetz provided an example of Memorizer in his book: “Java Concurrency In Practice”. The following is a slightly modified version of the class.


public class ConcurrentResultCache<A, V> implements Computable<A, V> {
      private static final Logger logger = LoggerFactory.getLogger(ConcurrentResultCache.class);
      private final ConcurrentMap<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>();
      private final Computable<A, V> c;

      public ConcurrentResultCache(final Computable<A, V> c) {
            this.c = c;
      }

      public V compute(final A arg) throws InterruptedException {
            logger.trace("entered compute...");
            // waiting result to be available. intercept any CacellationException.
            while (true) {
                  logger.trace("before first get");
                  Future<V> f = cache.get(arg);
                  logger.trace("after first get");
                  if (f == null) {
                        logger.trace("f is null");
                        // Creating a new task wrapped in a Callable.
                        Callable<V> eval = new Callable<V>() {
                              public V call() throws InterruptedException {
                                    return c.compute(arg);
                              }
                        };
                        FutureTask<V> ft = new FutureTask<V>(eval);
                        f = cache.putIfAbsent(arg, ft);
                        logger.trace("after putIfAbsent()");
                        if (f == null) {
                              // if the task is put into the cache first time, run the task in the SAME thread.
                              f = ft;
                              logger.trace("before run()");
                              ft.run();
                        }
                  }
                  try {
                        logger.trace("before second get");
                        return f.get();
                  } catch (CancellationException e) {
                        cache.remove(arg, f);
                  } catch (ExecutionException e) {
                        throw new IllegalStateException("Task execution exception caught.", e);
                  }
            }
      }
     
      public void remove(final String key) {
            this.cache.remove(key);
      }
}
     
      public void remove(final String key) {
            this.cache.remove(key);
      }
}


As an example, if a Properties need to be loaded in a high traffic application, it could implement the Computable interface.

public class PropertyLoader implements Computable<String, Properties> {
      private static final Logger logger = LoggerFactory.getLogger(PropertyLoader.class);

      @Override
      public Properties compute(final String filePath) throws InterruptedException {
            logger.trace("entered PropertyLoader compute()");
            return readProperties(filePath);
      }

      private Properties readProperties(final String filePath) {
            final String propFile = System.getProperty(filePath);
            final Properties properties = new Properties();
            try {
                  properties.load(new FileReader(propFile));
            } catch (IOException e) {
                  logger.error("Failed to load properties.", e);
            }
            return properties;
      }
}


The usage of the class:

private static ConcurrentResultCache<String, Properties> propertiesCache = new ConcurrentResultCache<String, Properties>(new PropertyLoader());

public Properties getClientThrottlingRateMap() {
    final Properties properties = propertiesCache.compute(propertiesLocation);