Thursday, August 24, 2017

A Concurrent Result Cache to avoid duplicating effort of expensive operation

A Concurrent Result Cache solution adopted from the code sample in the book: Java Concurrency in Practice by Brian Goetz.

There are occasions that an expensive operation needs to be ran in a multi-thread environment. The following code is a solution to be used to avoid running the expensive operations more than once. 

The Interface of expensive computation:


public interface Computable<A, V> {
         public V compute(final A arg) throws InterruptedException;
}


The concurrent result cache implementation:

This implementation guarantees that only one expensive execution will be invoked, other thread will be waiting for the Future to return.

public class ConcurrentResultCache<A, V> implements Computable<A, V> {
         private static final Logger logger = LoggerFactory.getLogger(ConcurrentResultCache.class);
         private final ConcurrentMap<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>();
         private final Computable<A, V> c;

         public ConcurrentResultCache(final Computable<A, V> c) {
                 this.c = c;
         }

         public V compute(final A arg) throws InterruptedException {
                 logger.trace("entered compute...");
                 // waiting result to be available. Intercept any CacellationException.
                 while (true) {
                          logger.trace("before first get");
                          Future<V> f = cache.get(arg);
                          logger.trace("after first get");
                          if (f == null) {
                                  logger.trace("f is null");
                                  // Creating a new task wrapped in a Callable.
                                  Callable<V> eval = new Callable<V>() {
                                           public V call() throws InterruptedException {
                                                   return c.compute(arg);
                                           }
                                  };
                                  FutureTask<V> ft = new FutureTask<V>(eval);
                                  f = cache.putIfAbsent(arg, ft);
                                  logger.trace("after putIfAbsent()");
                                  if (f == null) {
                                           // if the task is put into the cache first time, run the task
                                         // in the SAME thread.
                                           f = ft;
                                           logger.trace("before run()");
                                           ft.run();
                                  }
                          }
                          try {
                                  logger.trace("before second get");
                                  return f.get();
                          } catch (CancellationException e) {
                                  cache.remove(arg, f);
                          } catch (ExecutionException e) {
                                  throw new IllegalStateException("Task execution exception caught.", e);
                          }
                 }
         }
        
         public void remove(final String key) {
                 this.cache.remove(key);
         }
}


One example of expensive operation: Property File upload.


public class PropertyLoader implements Computable<String, Properties> {
         private static final Logger logger = LoggerFactory.getLogger(PropertyLoader.class);

         @Override
         public Properties compute(final String filePath) throws InterruptedException {
                 logger.trace("entered PropertyLoader compute()");
                 return readProperties(filePath);
         }

         private Properties readProperties(final String filePath) {
                 final String propFile = System.getProperty(filePath);
                 final Properties properties = new Properties();
                 try {
                          properties.load(new FileReader(propFile));
                 } catch (IOException e) {
                          logger.error("Failed to load properties.", e);
                 }
                 return properties;
         }
}


A usage example:


public class PropertyUpdateManager {
         private static final Logger logger = LoggerFactory.getLogger(PropertyUpdateManager.class);

         private static final String ENV_PROPERTIES_FILE = "envPropertiesFile";
         private static ConcurrentResultCache<String, Properties> propertiesCache = new ConcurrentResultCache<String, Properties>(
                          new PropertyLoader());

         private static PropertyUpdateConfiguration propertyUpdateConfiguration = null;
         private static boolean cacheCleared = false;

         public static Properties getPropertyMap() {
                 if (propertyUpdateConfiguration == null || isTimeToUpdate(propertyUpdateConfiguration)) {
                          try {
                                  clearPropertiesCache(ENV_PROPERTIES_FILE);
                                  cacheCleared = false;
                                  Properties properties = propertiesCache.compute(DFP_ENV_PROPERTIES_FILE);
                          } catch (InterruptedException e) {
                                  logger.warn("properties update has been interrupted.", e);
                                  properties = new Properties();
                          }
                 }
                 return Properties;
         }

         public static synchronized void clearPropertiesCache(final String key) {
                 if (!cacheCleared) {
                          if (propertiesCache != null) {
                                  propertiesCache.remove(key);
                          }
                          cacheCleared = true;
                 }
         }

         private static boolean isTimeToUpdate(final PropertyUpdateConfiguration original) {
                 final long currentDt = System.currentTimeMillis();
                 final long timeSinceLastUpdate = currentDt - original.getLastUpdateDt();
                 logger.debug("timeSincelastUpdate:{}, updateDurationMs:{}", timeSinceLastUpdate,
                                  original.getUpdateDurationMs());
                 return timeSinceLastUpdate > original.getUpdateDurationMs();
         }
}