Promises.java

/*
 * The contents of this file are subject to the terms of the Common Development and
 * Distribution License (the License). You may not use this file except in compliance with the
 * License.
 *
 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
 * specific language governing permission and limitations under the License.
 *
 * When distributing Covered Software, include this CDDL Header Notice in each file and include
 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
 * Header, with the fields enclosed by brackets [] replaced by your own identifying
 * information: "Portions copyright [year] [name of copyright owner]".
 *
 * Copyright 2015-2016 ForgeRock AS.
 */

package org.forgerock.util.promise;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.forgerock.util.AsyncFunction;
import org.forgerock.util.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Utility methods for creating and composing {@link Promise}s.
 */
public final class Promises {
    // TODO: n-of, etc.

    /**
     * These completed promise implementations provide an optimization for
     * synchronous processing, which have been found to provide small but
     * significant benefit:
     *
     * <ul>
     *     <li>A small reduction in GC overhead (less frequent GCs = more
     *     deterministic response times)</li>
     *     <li>A reduction of the cost associated with calling then() on a
     *     pre-completed promise versus the implementation in
     *     org.forgerock.util.promise.PromiseImpl#then(Function, Function).
     *     Specifically, a reduction in two volatile accesses as well as memory
     *     again.</li>
     * </ul>
     *
     * @param <V> {@inheritDoc}
     * @param <E> {@inheritDoc}
     */
    private static abstract class CompletedPromise<V, E extends Exception> implements Promise<V, E> {

        private static final Logger LOGGER = LoggerFactory.getLogger(CompletedPromise.class);

        @Override
        public final boolean cancel(final boolean mayInterruptIfRunning) {
            return false;
        }

        @Override
        public final V get() throws ExecutionException {
            if (hasResult()) {
                return getResult();
            } else if (hasException()) {
                throw new ExecutionException(getException());
            } else {
                throw new ExecutionException(getRuntimeException());
            }
        }

        @Override
        public final V get(final long timeout, final TimeUnit unit) throws ExecutionException {
            return get();
        }

        @Override
        public final V getOrThrow() throws E {
            if (hasResult()) {
                return getResult();
            } else if (hasException()) {
                throw getException();
            } else {
                throw getRuntimeException();
            }
        }

        @Override
        public final V getOrThrow(final long timeout, final TimeUnit unit) throws E {
            return getOrThrow();
        }

        @Override
        public final V getOrThrowUninterruptibly() throws E {
            return getOrThrow();
        }

        @Override
        public final V getOrThrowUninterruptibly(final long timeout, final TimeUnit unit) throws E {
            return getOrThrow();
        }

        @Override
        public final boolean isCancelled() {
            return false;
        }

        @Override
        public final boolean isDone() {
            return true;
        }

        @Override
        public final Promise<V, E> thenOnException(final ExceptionHandler<? super E> onException) {
            if (hasException()) {
                try {
                    onException.handleException(getException());
                } catch (RuntimeException e) {
                    LOGGER.error("Ignored unexpected exception thrown by ExceptionHandler", e);
                }
            }
            return this;
        }

        @Override
        public final Promise<V, E> thenOnResult(final ResultHandler<? super V> onResult) {
            if (hasResult()) {
                try {
                    onResult.handleResult(getResult());
                } catch (RuntimeException e) {
                    LOGGER.error("Ignored unexpected exception thrown by ResultHandler", e);
                }
            }
            return this;
        }

        @Override
        public final Promise<V, E> thenOnResultOrException(final ResultHandler<? super V> onResult,
                final ExceptionHandler<? super E> onException) {
            return thenOnResult(onResult).thenOnException(onException);
        }

        @Override
        public final Promise<V, E> thenOnResultOrException(final Runnable onResultOrException) {
            if (hasResult() || hasException()) {
                try {
                    onResultOrException.run();
                } catch (RuntimeException e) {
                    LOGGER.error("Ignored unexpected exception thrown by Runnable", e);
                }
            }
            return this;
        }

        @Override
        public final <VOUT> Promise<VOUT, E> then(final Function<? super V, VOUT, E> onResult) {
            return then(onResult, Promises.<VOUT, E>exceptionIdempotentFunction());
        }

        @Override
        public <EOUT extends Exception> Promise<V, EOUT> thenCatch(Function<? super E, V, EOUT> onException) {
            return then(Promises.<V, EOUT>resultIdempotentFunction(), onException);
        }

        @Override
        public Promise<V, E> thenCatchRuntimeException(
                Function<? super RuntimeException, V, E> onRuntimeException) {
            return then(Promises.<V, E>resultIdempotentFunction(), Promises.<V, E>exceptionIdempotentFunction(),
                    onRuntimeException);
        }

        @Override
        public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> then(
                final Function<? super V, VOUT, EOUT> onResult,
                final Function<? super E, VOUT, EOUT> onException) {
            return then(onResult, onException, Promises.<VOUT, EOUT>runtimeExceptionIdempotentFunction());
        }

        @Override
        @SuppressWarnings("unchecked")
        public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> then(
                final Function<? super V, VOUT, EOUT> onResult,
                final Function<? super E, VOUT, EOUT> onException,
                final Function<? super RuntimeException, VOUT, EOUT> onRuntimeException) {
            try {
                if (hasResult()) {
                    return newResultPromise(onResult.apply(getResult()));
                } else if (hasException()) {
                    return newResultPromise(onException.apply(getException()));
                } else if (hasRuntimeException()) {
                    return newResultPromise(onRuntimeException.apply(getRuntimeException()));
                } else {
                    throw new IllegalStateException("Unexpected state");
                }
            } catch (final RuntimeException e) {
                return new RuntimeExceptionPromise<>(e);
            } catch (final Exception e) {
                return newExceptionPromise((EOUT) e);
            }
        }

        @Override
        public final Promise<V, E> thenAlways(final Runnable always) {
            try {
                always.run();
            } catch (RuntimeException e) {
                LOGGER.error("Ignored unexpected exception thrown by Runnable", e);
            }
            return this;
        }

        @Override
        public Promise<V, E> thenFinally(Runnable onFinally) {
            return thenAlways(onFinally);
        }

        @Override
        public final <VOUT> Promise<VOUT, E> thenAsync(
                final AsyncFunction<? super V, VOUT, E> onResult) {
            return thenAsync(onResult, Promises.<VOUT, E>exceptionIdempotentAsyncFunction());
        }

        @Override
        public final <EOUT extends Exception> Promise<V, EOUT> thenCatchAsync(
                final AsyncFunction<? super E, V, EOUT> onException) {
            return thenAsync(Promises.<V, EOUT>resultIdempotentAsyncFunction(), onException);
        }

        @Override
        public Promise<V, E> thenCatchRuntimeExceptionAsync(
                AsyncFunction<? super RuntimeException, V, E> onRuntimeException) {
            return thenAsync(Promises.<V, E>resultIdempotentAsyncFunction(),
                    Promises.<V, E>exceptionIdempotentAsyncFunction(),
                    onRuntimeException);
        }

        @Override
        public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> thenAsync(
                final AsyncFunction<? super V, VOUT, EOUT> onResult,
                final AsyncFunction<? super E, VOUT, EOUT> onException) {
            return thenAsync(onResult, onException, Promises.<VOUT, EOUT>runtimeExceptionIdempotentAsyncFunction());
        }

        @Override
        @SuppressWarnings("unchecked")
        public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> thenAsync(
                final AsyncFunction<? super V, VOUT, EOUT> onResult,
                final AsyncFunction<? super E, VOUT, EOUT> onException,
                final AsyncFunction<? super RuntimeException, VOUT, EOUT> onRuntimeException) {
            try {
                if (hasResult()) {
                    return (Promise<VOUT, EOUT>) onResult.apply(getResult());
                } else if (hasException()) {
                    return (Promise<VOUT, EOUT>) onException.apply(getException());
                } else if (hasRuntimeException()) {
                    return (Promise<VOUT, EOUT>) onRuntimeException.apply(getRuntimeException());
                } else {
                    throw new IllegalStateException("Unexpected state");
                }
            } catch (final RuntimeException e) {
                return new RuntimeExceptionPromise<>(e);
            } catch (final Exception e) {
                return newExceptionPromise((EOUT) e);
            }
        }

        @Override
        public Promise<V, E> thenOnRuntimeException(RuntimeExceptionHandler onRuntimeException) {
            if (getRuntimeException() != null) {
                try {
                    onRuntimeException.handleRuntimeException(getRuntimeException());
                } catch (RuntimeException e) {
                    LOGGER.error("Ignored unexpected exception thrown by RuntimeExceptionHandler", e);
                }
            }
            return this;
        }

        abstract RuntimeException getRuntimeException();

        abstract E getException();

        abstract V getResult();

        abstract boolean hasRuntimeException();

        abstract boolean hasException();

        abstract boolean hasResult();
    }

    private static final class RuntimeExceptionPromise<V, E extends Exception> extends CompletedPromise<V, E> {
        private final RuntimeException runtimeException;

        private RuntimeExceptionPromise(RuntimeException runtimeException) {
            this.runtimeException = runtimeException;
        }

        @Override
        RuntimeException getRuntimeException() {
            return runtimeException;
        }

        @Override
        E getException() {
            return null;
        }

        @Override
        V getResult() {
            return null;
        }

        @Override
        boolean hasRuntimeException() {
            return true;
        }

        @Override
        boolean hasException() {
            return false;
        }

        @Override
        boolean hasResult() {
            return false;
        }
    }

    private static final class ExceptionPromise<V, E extends Exception> extends CompletedPromise<V, E> {
        private final E exception;

        private ExceptionPromise(final E exception) {
            this.exception = exception;
        }

        @Override
        RuntimeException getRuntimeException() {
            return null;
        }

        @Override
        E getException() {
            return exception;
        }

        @Override
        V getResult() {
            throw new IllegalStateException();
        }

        @Override
        boolean hasRuntimeException() {
            return false;
        }

        @Override
        boolean hasException() {
            return true;
        }

        @Override
        boolean hasResult() {
            return false;
        }
    }

    private static final class ResultPromise<V, E extends Exception> extends
            CompletedPromise<V, E> {
        private final V value;

        private ResultPromise(final V value) {
            this.value = value;
        }

        @Override
        RuntimeException getRuntimeException() {
            return null;
        }

        @Override
        E getException() {
            throw new IllegalStateException();
        }

        @Override
        V getResult() {
            return value;
        }

        @Override
        boolean hasRuntimeException() {
            return false;
        }

        @Override
        boolean hasException() {
            return false;
        }

        @Override
        boolean hasResult() {
            return true;
        }
    }

    private static final AsyncFunction<Exception, Object, Exception> EXCEPTION_IDEM_ASYNC_FUNC =
        new AsyncFunction<Exception, Object, Exception>() {
            @Override
            public Promise<Object, Exception> apply(final Exception exception) throws Exception {
                return newExceptionPromise(exception);
            }
        };

    private static final Function<Exception, Object, Exception> EXCEPTION_IDEM_FUNC =
        new Function<Exception, Object, Exception>() {
            @Override
            public Object apply(final Exception exception) throws Exception {
                throw exception;
            }
        };

    private static final AsyncFunction<RuntimeException, Object, Exception> RUNTIME_EXCEPTION_IDEM_ASYNC_FUNC =
        new AsyncFunction<RuntimeException, Object, Exception>() {
            @Override
            public Promise<Object, Exception> apply(final RuntimeException runtimeException) throws Exception {
                return newRuntimeExceptionPromise(runtimeException);
            }
        };

    private static final Function<RuntimeException, Object, Exception> RUNTIME_EXCEPTION_IDEM_FUNC =
        new Function<RuntimeException, Object, Exception>() {
            @Override
            public Object apply(final RuntimeException runtimeException) {
                throw runtimeException;
            }
        };

    private static final AsyncFunction<Object, Object, Exception> RESULT_IDEM_ASYNC_FUNC =
        new AsyncFunction<Object, Object, Exception>() {
            @Override
            public Promise<Object, Exception> apply(final Object object) throws Exception {
                return newResultPromise(object);
            }
        };

    private static final Function<Object, Object, Exception> RESULT_IDEM_FUNC =
        new Function<Object, Object, Exception>() {
            @Override
            public Object apply(final Object value) throws Exception {
                return value;
            }
        };

    /**
     * Returns a {@link Promise} representing an asynchronous task which has
     * already failed with the provided runtime exception. Attempts to get the result will
     * immediately fail, and any listeners registered against the returned
     * promise will be immediately invoked in the same thread as the caller.
     *
     * @param <V>
     *            The type of the task's result, or {@link Void} if the task
     *            does not return anything (i.e. it only has side-effects).
     * @param <E>
     *            The type of the exception thrown by the task if it fails, or
     *            {@link NeverThrowsException} if the task cannot fail.
     * @param exception
     *            The exception indicating why the asynchronous task has failed.
     * @return A {@link Promise} representing an asynchronous task which has
     *         already failed with the provided exception.
     */
    public static <V, E extends Exception> Promise<V, E> newRuntimeExceptionPromise(final RuntimeException exception) {
        return new RuntimeExceptionPromise<>(exception);
    }

    /**
     * Returns a {@link Promise} representing an asynchronous task which has
     * already failed with the provided exception. Attempts to get the result will
     * immediately fail, and any listeners registered against the returned
     * promise will be immediately invoked in the same thread as the caller.
     *
     * @param <V>
     *            The type of the task's result, or {@link Void} if the task
     *            does not return anything (i.e. it only has side-effects).
     * @param <E>
     *            The type of the exception thrown by the task if it fails, or
     *            {@link NeverThrowsException} if the task cannot fail.
     * @param exception
     *            The exception indicating why the asynchronous task has failed.
     * @return A {@link Promise} representing an asynchronous task which has
     *         already failed with the provided exception.
     */
    public static <V, E extends Exception> Promise<V, E> newExceptionPromise(final E exception) {
        return new ExceptionPromise<>(exception);
    }

    /**
     * Returns a {@link Promise} representing an asynchronous task which has
     * already succeeded with the provided result. Attempts to get the result
     * will immediately return the result, and any listeners registered against
     * the returned promise will be immediately invoked in the same thread as
     * the caller.
     *
     * @param <V>
     *            The type of the task's result, or {@link Void} if the task
     *            does not return anything (i.e. it only has side-effects).
     * @param <E>
     *            The type of the exception thrown by the task if it fails, or
     *            {@link NeverThrowsException} if the task cannot fail.
     * @param result
     *            The result of the asynchronous task.
     * @return A {@link Promise} representing an asynchronous task which has
     *         already succeeded with the provided result.
     */
    public static <V, E extends Exception> Promise<V, E> newResultPromise(final V result) {
        return new ResultPromise<>(result);
    }

    /**
     * Returns a {@link Promise} which will be completed once all of the
     * provided promises have succeeded, or as soon as one of them fails.
     *
     * @param <V>
     *            The type of the tasks' result, or {@link Void} if the tasks do
     *            not return anything (i.e. they only has side-effects).
     * @param <E>
     *            The type of the exception thrown by the tasks if they fail, or
     *            {@link NeverThrowsException} if the tasks cannot fail.
     * @param promises
     *            The list of tasks to be combined.
     * @return A {@link Promise} which will be completed once all of the
     *         provided promises have succeeded, or as soon as one of them
     *         fails.
     */
    public static <V, E extends Exception> Promise<List<V>, E> when(
            final List<Promise<V, E>> promises) {
        final int size = promises.size();
        final AtomicInteger remaining = new AtomicInteger(size);
        final List<V> results = new ArrayList<>(size);
        final PromiseImpl<List<V>, E> composite = PromiseImpl.create();
        for (final Promise<V, E> promise : promises) {
            promise.thenOnResult(new ResultHandler<V>() {
                @Override
                public void handleResult(final V value) {
                    synchronized (results) {
                        results.add(value);
                    }
                    if (remaining.decrementAndGet() == 0) {
                        composite.handleResult(results);
                    }
                }
            })  .thenOnException(new ExceptionHandler<E>() {
                @Override
                public void handleException(final E exception) {
                    composite.handleException(exception);
                }
            })  .thenOnRuntimeException(new RuntimeExceptionHandler() {
                @Override
                public void handleRuntimeException(RuntimeException exception) {
                    composite.handleRuntimeException(exception);
                }
            });
        }
        if (promises.isEmpty()) {
            composite.handleResult(results);
        }
        return composite;
    }

    /**
     * Returns a {@link Promise} which will be completed once all of the
     * provided promises have succeeded, or as soon as one of them fails.
     *
     * @param <V>
     *            The type of the tasks' result, or {@link Void} if the tasks do
     *            not return anything (i.e. they only has side-effects).
     * @param <E>
     *            The type of the exception thrown by the tasks if they fail, or
     *            {@link NeverThrowsException} if the tasks cannot fail.
     * @param promises
     *            The list of tasks to be combined.
     * @return A {@link Promise} which will be completed once all of the
     *         provided promises have succeeded, or as soon as one of them
     *         has thrown an exception.
     */
    @SafeVarargs
    public static <V, E extends Exception> Promise<List<V>, E> when(final Promise<V, E>... promises) {
        return when(Arrays.asList(promises));
    }

    @SuppressWarnings("unchecked")
    static <VOUT, E extends Exception> AsyncFunction<E, VOUT, E> exceptionIdempotentAsyncFunction() {
        return (AsyncFunction<E, VOUT, E>) EXCEPTION_IDEM_ASYNC_FUNC;
    }

    @SuppressWarnings("unchecked")
    static <VOUT, E extends Exception> Function<E, VOUT, E> exceptionIdempotentFunction() {
        return (Function<E, VOUT, E>) EXCEPTION_IDEM_FUNC;
    }

    @SuppressWarnings("unchecked")
    static <VOUT, E extends Exception> AsyncFunction<RuntimeException, VOUT, E>
        runtimeExceptionIdempotentAsyncFunction() {
        return (AsyncFunction<RuntimeException, VOUT, E>) RUNTIME_EXCEPTION_IDEM_ASYNC_FUNC;
    }

    @SuppressWarnings("unchecked")
    static <VOUT, E extends Exception> Function<RuntimeException, VOUT, E> runtimeExceptionIdempotentFunction() {
        return (Function<RuntimeException, VOUT, E>) RUNTIME_EXCEPTION_IDEM_FUNC;
    }

    @SuppressWarnings("unchecked")
    static <V, E extends Exception> AsyncFunction<V, V, E> resultIdempotentAsyncFunction() {
        return (AsyncFunction<V, V, E>) RESULT_IDEM_ASYNC_FUNC;
    }

    @SuppressWarnings("unchecked")
    static <V, E extends Exception> Function<V, V, E> resultIdempotentFunction() {
        return (Function<V, V, E>) RESULT_IDEM_FUNC;
    }

    private Promises() {
        // Prevent instantiation.
    }
}