Thursday, June 1, 2017

Handling Concurrently Executions of Expensive tasks

In order to handle concurrently executions of expensive tasks, Brian Goetz provided an example of Memorizer in his book: “Java Concurrency In Practice”. The following is a slightly modified version of the class.


public class ConcurrentResultCache<A, V> implements Computable<A, V> {
      private static final Logger logger = LoggerFactory.getLogger(ConcurrentResultCache.class);
      private final ConcurrentMap<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>();
      private final Computable<A, V> c;

      public ConcurrentResultCache(final Computable<A, V> c) {
            this.c = c;
      }

      public V compute(final A arg) throws InterruptedException {
            logger.trace("entered compute...");
            // waiting result to be available. intercept any CacellationException.
            while (true) {
                  logger.trace("before first get");
                  Future<V> f = cache.get(arg);
                  logger.trace("after first get");
                  if (f == null) {
                        logger.trace("f is null");
                        // Creating a new task wrapped in a Callable.
                        Callable<V> eval = new Callable<V>() {
                              public V call() throws InterruptedException {
                                    return c.compute(arg);
                              }
                        };
                        FutureTask<V> ft = new FutureTask<V>(eval);
                        f = cache.putIfAbsent(arg, ft);
                        logger.trace("after putIfAbsent()");
                        if (f == null) {
                              // if the task is put into the cache first time, run the task in the SAME thread.
                              f = ft;
                              logger.trace("before run()");
                              ft.run();
                        }
                  }
                  try {
                        logger.trace("before second get");
                        return f.get();
                  } catch (CancellationException e) {
                        cache.remove(arg, f);
                  } catch (ExecutionException e) {
                        throw new IllegalStateException("Task execution exception caught.", e);
                  }
            }
      }
     
      public void remove(final String key) {
            this.cache.remove(key);
      }
}
     
      public void remove(final String key) {
            this.cache.remove(key);
      }
}


As an example, if a Properties need to be loaded in a high traffic application, it could implement the Computable interface.

public class PropertyLoader implements Computable<String, Properties> {
      private static final Logger logger = LoggerFactory.getLogger(PropertyLoader.class);

      @Override
      public Properties compute(final String filePath) throws InterruptedException {
            logger.trace("entered PropertyLoader compute()");
            return readProperties(filePath);
      }

      private Properties readProperties(final String filePath) {
            final String propFile = System.getProperty(filePath);
            final Properties properties = new Properties();
            try {
                  properties.load(new FileReader(propFile));
            } catch (IOException e) {
                  logger.error("Failed to load properties.", e);
            }
            return properties;
      }
}


The usage of the class:

private static ConcurrentResultCache<String, Properties> propertiesCache = new ConcurrentResultCache<String, Properties>(new PropertyLoader());

public Properties getClientThrottlingRateMap() {
    final Properties properties = propertiesCache.compute(propertiesLocation);


No comments:

Post a Comment