PerItemEvictionStrategyCache.java

/*
 * The contents of this file are subject to the terms of the Common Development and
 * Distribution License (the License). You may not use this file except in compliance with the
 * License.
 *
 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
 * specific language governing permission and limitations under the License.
 *
 * When distributing Covered Software, include this CDDL Header Notice in each file and include
 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
 * Header, with the fields enclosed by brackets [] replaced by your own identifying
 * information: "Portions copyright [year] [name of copyright owner]".
 *
 * Copyright 2014-2016 ForgeRock AS.
 */

package org.forgerock.util;

import static org.forgerock.util.Reject.checkNotNull;
import static org.forgerock.util.promise.Promises.newResultPromise;

import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;

import org.forgerock.util.promise.Promise;
import org.forgerock.util.promise.ResultHandler;
import org.forgerock.util.time.Duration;

/**
 * PerItemEvictionStrategyCache is a thread-safe write-through cache.
 * <p>
 * Instead of storing directly the value in the backing Map, it requires the
 * consumer to provide a value factory (a Callable). A new FutureTask
 * encapsulates the callable, is executed and is placed inside a
 * ConcurrentHashMap if absent.
 * <p>
 * The final behavior is that, even if two concurrent Threads are borrowing an
 * object from the cache, given that they provide an equivalent value factory,
 * the first one will compute the value while the other will get the result from
 * the Future (and will wait until the result is computed or a timeout occurs).
 *
 * @param <K>
 *         Type of the key
 * @param <V>
 *         Type of the value
 */
public class PerItemEvictionStrategyCache<K, V> {

    // @Checkstyle:off (automatic formatting to 16 but Checkstyle expects 8 or 12)
    private static final Function<Exception, Duration, Exception> ON_EXCEPTION_NO_TIMEOUT =
            new Function<Exception, Duration, Exception>() {
                @Override
                public Duration apply(Exception e) throws Exception {
                    return Duration.ZERO;
                }
            };
    // @Checkstyle:on

    private final ScheduledExecutorService executorService;
    private final ConcurrentMap<K, CacheEntry<V>> cache = new ConcurrentHashMap<>();
    private final AsyncFunction<V, Duration, Exception> defaultTimeoutFunction;
    private Duration maxTimeout;

    /**
     * Build a new {@link PerItemEvictionStrategyCache} using the given scheduled executor.
     *
     * @param executorService
     *         scheduled executor for registering expiration callbacks.
     * @param defaultTimeout
     *         the default cache entry timeout
     */
    public PerItemEvictionStrategyCache(final ScheduledExecutorService executorService, final Duration defaultTimeout) {
        this(executorService, new AsyncFunction<V, Duration, Exception>() {
            @Override
            public Promise<Duration, Exception> apply(V value) {
                return newResultPromise(defaultTimeout);
            }
        });
    }

    /**
     * Build a new {@link PerItemEvictionStrategyCache} using the given scheduled executor.
     *
     * @param executorService
     *         scheduled executor for registering expiration callbacks.
     * @param defaultTimeoutFunction
     *         the function that will compute the cache entry timeout (must not be {@literal null})
     *         the default timeout to cache the entries
     */
    public PerItemEvictionStrategyCache(final ScheduledExecutorService executorService,
            final AsyncFunction<V, Duration, Exception> defaultTimeoutFunction) {
        this.executorService = checkNotNull(executorService);
        this.defaultTimeoutFunction = checkNotNull(defaultTimeoutFunction);
    }

    /**
     * Borrow (and create before hand if absent) a cache entry. If another
     * Thread has created (or the creation is undergoing) the value, this
     * method waits indefinitely for the value to be available.
     *
     * @param key
     *         entry key
     * @param callable
     *         cached value factory
     * @return the cached value
     * @throws InterruptedException
     *         if the current thread was interrupted while waiting
     * @throws ExecutionException
     *         if the cached value computation threw an exception
     */
    public V getValue(final K key, final Callable<V> callable) throws InterruptedException,
            ExecutionException {
        return getValue(key, callable, defaultTimeoutFunction);
    }

    /**
     * Borrow (and create before hand if absent) a cache entry. If another
     * Thread has created (or the creation is undergoing) the value, this
     * method waits indefinitely for the value to be available.
     *
     * @param key
     *         entry key
     * @param callable
     *         cached value factory
     * @param expire
     *         function to override the global cache's timeout
     * @return the cached value
     * @throws InterruptedException
     *         if the current thread was interrupted while waiting
     * @throws ExecutionException
     *         if the cached value computation threw an exception
     */
    public V getValue(final K key, final Callable<V> callable, final AsyncFunction<V, Duration, Exception> expire)
            throws InterruptedException, ExecutionException {
        try {
            return createIfAbsent(key, callable, expire).get();
        } catch (InterruptedException | RuntimeException | ExecutionException e) {
            evict(key);
            throw e;
        }
    }

    private Future<V> createIfAbsent(final K key, final Callable<V> callable,
            final AsyncFunction<V, Duration, Exception> timeoutFunction)
            throws InterruptedException, ExecutionException {
        // See the javadoc of the class for the intent of the Future and FutureTask.
        CacheEntry<V> cacheEntry = cache.get(key);
        if (cacheEntry == null) {
            // First call: no value cached for that key
            final FutureTask<V> futureTask = new FutureTask<>(callable);
            final CacheEntry<V> futureCacheEntry = new CacheEntry<>(futureTask);
            cacheEntry = cache.putIfAbsent(key, futureCacheEntry);
            if (cacheEntry == null) {
                // after the double check, it seems we are still the first to want to cache that value.
                cacheEntry = futureCacheEntry;

                // Compute the value
                futureTask.run();

                scheduleEviction(key, futureCacheEntry, timeoutFunction);
            }
        }
        return cacheEntry.getFutureTask();
    }

    private void scheduleEviction(final K key, final CacheEntry<V> cacheEntry,
            final AsyncFunction<V, Duration, Exception> timeoutFunction)
            throws ExecutionException, InterruptedException {
        newResultPromise(cacheEntry.getFutureTask().get())
                .thenAsync(timeoutFunction)
                .thenCatch(ON_EXCEPTION_NO_TIMEOUT)
                .thenCatchRuntimeException(ON_EXCEPTION_NO_TIMEOUT)
                .thenOnResult(new ResultHandler<Duration>() {
                    @Override
                    public void handleResult(Duration timeout) {
                        Runnable eviction = new Runnable() {
                            @Override
                            public void run() {
                                // The cache can be cleared and another entry for the same key can be created
                                // before the eviction is really scheduled : so ensure that we remove the expected
                                // cache entry
                                if (cache.remove(key, cacheEntry)) {
                                    cacheEntry.cancelExpiration();
                                }
                            }
                        };

                        if (timeout == null || timeout.isZero()) {
                            // Fast path : no need to schedule, evict it now
                            // Do not do "executorService.execute(eviction);" as we have no real guarantee that it will
                            // be executed now
                            eviction.run();
                        } else {
                            // Cap the timeout if requested
                            if (maxTimeout != null) {
                                timeout = timeout.compareTo(maxTimeout) < 0 ? timeout : maxTimeout;
                            }

                            if (!timeout.isUnlimited()) {
                                // Schedule the eviction
                                ScheduledFuture<?> scheduledFuture = executorService.schedule(eviction,
                                        timeout.getValue(), timeout.getUnit());
                                cacheEntry.setScheduledHandler(scheduledFuture);
                            }
                        }
                    }
                });
    }

    /**
     * Clean-up the cache entries.
     */
    public void clear() {
        for (K key : cache.keySet()) {
            evict(key);
        }
    }

    /**
     * Returns the number of cached values.
     *
     * @return the number of cached values
     */
    public int size() {
        return cache.size();
    }

    /**
     * Returns whether this cache is empty or not.
     *
     * @return {@literal true} if the cache does not contain any values, {@literal false} otherwise.
     */
    public boolean isEmpty() {
        return cache.isEmpty();
    }

    /**
     * Evict a cached value from the cache.
     *
     * @param key
     *         the entry key
     */
    public void evict(K key) {
        CacheEntry<V> entry = cache.remove(key);
        if (entry != null) {
            entry.cancelExpiration();
        }
    }

    /**
     * Gets the maximum timeout (can be {@literal null}).
     *
     * @return the maximum timeout
     */
    public Duration getMaxTimeout() {
        return maxTimeout;
    }

    /**
     * Sets the maximum timeout. If the timeout returned by the {@literal timeoutFunction} is greater than this
     * specified maximum timeout, then the maximum timeout is used instead of the returned one to cache the entry.
     *
     * @param maxTimeout
     *         the maximum timeout to use.
     */
    public void setMaxTimeout(Duration maxTimeout) {
        this.maxTimeout = maxTimeout;
    }

    private static class CacheEntry<V> {
        private final FutureTask<V> futureTask;
        private ScheduledFuture<?> scheduledHandler;

        CacheEntry(FutureTask<V> futureTask) {
            this.futureTask = futureTask;
        }

        void setScheduledHandler(ScheduledFuture<?> scheduledHandler) {
            this.scheduledHandler = scheduledHandler;
        }

        FutureTask<V> getFutureTask() {
            return futureTask;
        }

        void cancelExpiration() {
            if (scheduledHandler != null) {
                scheduledHandler.cancel(false);
            }
        }
    }
}