001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2015-2016 ForgeRock AS.
015 */
016
017package org.forgerock.util.promise;
018
019import java.util.ArrayList;
020import java.util.Arrays;
021import java.util.List;
022import java.util.concurrent.ExecutionException;
023import java.util.concurrent.TimeUnit;
024import java.util.concurrent.atomic.AtomicInteger;
025
026import org.forgerock.util.AsyncFunction;
027import org.forgerock.util.Function;
028import org.slf4j.Logger;
029import org.slf4j.LoggerFactory;
030
031/**
032 * Utility methods for creating and composing {@link Promise}s.
033 */
034public final class Promises {
035    // TODO: n-of, etc.
036
037    /**
038     * These completed promise implementations provide an optimization for
039     * synchronous processing, which have been found to provide small but
040     * significant benefit:
041     *
042     * <ul>
043     *     <li>A small reduction in GC overhead (less frequent GCs = more
044     *     deterministic response times)</li>
045     *     <li>A reduction of the cost associated with calling then() on a
046     *     pre-completed promise versus the implementation in
047     *     org.forgerock.util.promise.PromiseImpl#then(Function, Function).
048     *     Specifically, a reduction in two volatile accesses as well as memory
049     *     again.</li>
050     * </ul>
051     *
052     * @param <V> {@inheritDoc}
053     * @param <E> {@inheritDoc}
054     */
055    private static abstract class CompletedPromise<V, E extends Exception> implements Promise<V, E> {
056
057        private static final Logger LOGGER = LoggerFactory.getLogger(CompletedPromise.class);
058
059        @Override
060        public final boolean cancel(final boolean mayInterruptIfRunning) {
061            return false;
062        }
063
064        @Override
065        public final V get() throws ExecutionException {
066            if (hasResult()) {
067                return getResult();
068            } else if (hasException()) {
069                throw new ExecutionException(getException());
070            } else {
071                throw new ExecutionException(getRuntimeException());
072            }
073        }
074
075        @Override
076        public final V get(final long timeout, final TimeUnit unit) throws ExecutionException {
077            return get();
078        }
079
080        @Override
081        public final V getOrThrow() throws E {
082            if (hasResult()) {
083                return getResult();
084            } else if (hasException()) {
085                throw getException();
086            } else {
087                throw getRuntimeException();
088            }
089        }
090
091        @Override
092        public final V getOrThrow(final long timeout, final TimeUnit unit) throws E {
093            return getOrThrow();
094        }
095
096        @Override
097        public final V getOrThrowUninterruptibly() throws E {
098            return getOrThrow();
099        }
100
101        @Override
102        public final V getOrThrowUninterruptibly(final long timeout, final TimeUnit unit) throws E {
103            return getOrThrow();
104        }
105
106        @Override
107        public final boolean isCancelled() {
108            return false;
109        }
110
111        @Override
112        public final boolean isDone() {
113            return true;
114        }
115
116        @Override
117        public final Promise<V, E> thenOnException(final ExceptionHandler<? super E> onException) {
118            if (hasException()) {
119                try {
120                    onException.handleException(getException());
121                } catch (RuntimeException e) {
122                    LOGGER.error("Ignored unexpected exception thrown by ExceptionHandler", e);
123                }
124            }
125            return this;
126        }
127
128        @Override
129        public final Promise<V, E> thenOnResult(final ResultHandler<? super V> onResult) {
130            if (hasResult()) {
131                try {
132                    onResult.handleResult(getResult());
133                } catch (RuntimeException e) {
134                    LOGGER.error("Ignored unexpected exception thrown by ResultHandler", e);
135                }
136            }
137            return this;
138        }
139
140        @Override
141        public final Promise<V, E> thenOnResultOrException(final ResultHandler<? super V> onResult,
142                final ExceptionHandler<? super E> onException) {
143            return thenOnResult(onResult).thenOnException(onException);
144        }
145
146        @Override
147        public final Promise<V, E> thenOnResultOrException(final Runnable onResultOrException) {
148            if (hasResult() || hasException()) {
149                try {
150                    onResultOrException.run();
151                } catch (RuntimeException e) {
152                    LOGGER.error("Ignored unexpected exception thrown by Runnable", e);
153                }
154            }
155            return this;
156        }
157
158        @Override
159        public final <VOUT> Promise<VOUT, E> then(final Function<? super V, VOUT, E> onResult) {
160            return then(onResult, Promises.<VOUT, E>exceptionIdempotentFunction());
161        }
162
163        @Override
164        public <EOUT extends Exception> Promise<V, EOUT> thenCatch(Function<? super E, V, EOUT> onException) {
165            return then(Promises.<V, EOUT>resultIdempotentFunction(), onException);
166        }
167
168        @Override
169        public Promise<V, E> thenCatchRuntimeException(
170                Function<? super RuntimeException, V, E> onRuntimeException) {
171            return then(Promises.<V, E>resultIdempotentFunction(), Promises.<V, E>exceptionIdempotentFunction(),
172                    onRuntimeException);
173        }
174
175        @Override
176        public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> then(
177                final Function<? super V, VOUT, EOUT> onResult,
178                final Function<? super E, VOUT, EOUT> onException) {
179            return then(onResult, onException, Promises.<VOUT, EOUT>runtimeExceptionIdempotentFunction());
180        }
181
182        @Override
183        @SuppressWarnings("unchecked")
184        public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> then(
185                final Function<? super V, VOUT, EOUT> onResult,
186                final Function<? super E, VOUT, EOUT> onException,
187                final Function<? super RuntimeException, VOUT, EOUT> onRuntimeException) {
188            try {
189                if (hasResult()) {
190                    return newResultPromise(onResult.apply(getResult()));
191                } else if (hasException()) {
192                    return newResultPromise(onException.apply(getException()));
193                } else if (hasRuntimeException()) {
194                    return newResultPromise(onRuntimeException.apply(getRuntimeException()));
195                } else {
196                    throw new IllegalStateException("Unexpected state");
197                }
198            } catch (final RuntimeException e) {
199                return new RuntimeExceptionPromise<>(e);
200            } catch (final Exception e) {
201                return newExceptionPromise((EOUT) e);
202            }
203        }
204
205        @Override
206        public final Promise<V, E> thenAlways(final Runnable always) {
207            try {
208                always.run();
209            } catch (RuntimeException e) {
210                LOGGER.error("Ignored unexpected exception thrown by Runnable", e);
211            }
212            return this;
213        }
214
215        @Override
216        public Promise<V, E> thenFinally(Runnable onFinally) {
217            return thenAlways(onFinally);
218        }
219
220        @Override
221        public final <VOUT> Promise<VOUT, E> thenAsync(
222                final AsyncFunction<? super V, VOUT, E> onResult) {
223            return thenAsync(onResult, Promises.<VOUT, E>exceptionIdempotentAsyncFunction());
224        }
225
226        @Override
227        public final <EOUT extends Exception> Promise<V, EOUT> thenCatchAsync(
228                final AsyncFunction<? super E, V, EOUT> onException) {
229            return thenAsync(Promises.<V, EOUT>resultIdempotentAsyncFunction(), onException);
230        }
231
232        @Override
233        public Promise<V, E> thenCatchRuntimeExceptionAsync(
234                AsyncFunction<? super RuntimeException, V, E> onRuntimeException) {
235            return thenAsync(Promises.<V, E>resultIdempotentAsyncFunction(),
236                    Promises.<V, E>exceptionIdempotentAsyncFunction(),
237                    onRuntimeException);
238        }
239
240        @Override
241        public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> thenAsync(
242                final AsyncFunction<? super V, VOUT, EOUT> onResult,
243                final AsyncFunction<? super E, VOUT, EOUT> onException) {
244            return thenAsync(onResult, onException, Promises.<VOUT, EOUT>runtimeExceptionIdempotentAsyncFunction());
245        }
246
247        @Override
248        @SuppressWarnings("unchecked")
249        public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> thenAsync(
250                final AsyncFunction<? super V, VOUT, EOUT> onResult,
251                final AsyncFunction<? super E, VOUT, EOUT> onException,
252                final AsyncFunction<? super RuntimeException, VOUT, EOUT> onRuntimeException) {
253            try {
254                if (hasResult()) {
255                    return (Promise<VOUT, EOUT>) onResult.apply(getResult());
256                } else if (hasException()) {
257                    return (Promise<VOUT, EOUT>) onException.apply(getException());
258                } else if (hasRuntimeException()) {
259                    return (Promise<VOUT, EOUT>) onRuntimeException.apply(getRuntimeException());
260                } else {
261                    throw new IllegalStateException("Unexpected state");
262                }
263            } catch (final RuntimeException e) {
264                return new RuntimeExceptionPromise<>(e);
265            } catch (final Exception e) {
266                return newExceptionPromise((EOUT) e);
267            }
268        }
269
270        @Override
271        public Promise<V, E> thenOnRuntimeException(RuntimeExceptionHandler onRuntimeException) {
272            if (getRuntimeException() != null) {
273                try {
274                    onRuntimeException.handleRuntimeException(getRuntimeException());
275                } catch (RuntimeException e) {
276                    LOGGER.error("Ignored unexpected exception thrown by RuntimeExceptionHandler", e);
277                }
278            }
279            return this;
280        }
281
282        abstract RuntimeException getRuntimeException();
283
284        abstract E getException();
285
286        abstract V getResult();
287
288        abstract boolean hasRuntimeException();
289
290        abstract boolean hasException();
291
292        abstract boolean hasResult();
293    }
294
295    private static final class RuntimeExceptionPromise<V, E extends Exception> extends CompletedPromise<V, E> {
296        private final RuntimeException runtimeException;
297
298        private RuntimeExceptionPromise(RuntimeException runtimeException) {
299            this.runtimeException = runtimeException;
300        }
301
302        @Override
303        RuntimeException getRuntimeException() {
304            return runtimeException;
305        }
306
307        @Override
308        E getException() {
309            return null;
310        }
311
312        @Override
313        V getResult() {
314            return null;
315        }
316
317        @Override
318        boolean hasRuntimeException() {
319            return true;
320        }
321
322        @Override
323        boolean hasException() {
324            return false;
325        }
326
327        @Override
328        boolean hasResult() {
329            return false;
330        }
331    }
332
333    private static final class ExceptionPromise<V, E extends Exception> extends CompletedPromise<V, E> {
334        private final E exception;
335
336        private ExceptionPromise(final E exception) {
337            this.exception = exception;
338        }
339
340        @Override
341        RuntimeException getRuntimeException() {
342            return null;
343        }
344
345        @Override
346        E getException() {
347            return exception;
348        }
349
350        @Override
351        V getResult() {
352            throw new IllegalStateException();
353        }
354
355        @Override
356        boolean hasRuntimeException() {
357            return false;
358        }
359
360        @Override
361        boolean hasException() {
362            return true;
363        }
364
365        @Override
366        boolean hasResult() {
367            return false;
368        }
369    }
370
371    private static final class ResultPromise<V, E extends Exception> extends
372            CompletedPromise<V, E> {
373        private final V value;
374
375        private ResultPromise(final V value) {
376            this.value = value;
377        }
378
379        @Override
380        RuntimeException getRuntimeException() {
381            return null;
382        }
383
384        @Override
385        E getException() {
386            throw new IllegalStateException();
387        }
388
389        @Override
390        V getResult() {
391            return value;
392        }
393
394        @Override
395        boolean hasRuntimeException() {
396            return false;
397        }
398
399        @Override
400        boolean hasException() {
401            return false;
402        }
403
404        @Override
405        boolean hasResult() {
406            return true;
407        }
408    }
409
410    private static final AsyncFunction<Exception, Object, Exception> EXCEPTION_IDEM_ASYNC_FUNC =
411        new AsyncFunction<Exception, Object, Exception>() {
412            @Override
413            public Promise<Object, Exception> apply(final Exception exception) throws Exception {
414                return newExceptionPromise(exception);
415            }
416        };
417
418    private static final Function<Exception, Object, Exception> EXCEPTION_IDEM_FUNC =
419        new Function<Exception, Object, Exception>() {
420            @Override
421            public Object apply(final Exception exception) throws Exception {
422                throw exception;
423            }
424        };
425
426    private static final AsyncFunction<RuntimeException, Object, Exception> RUNTIME_EXCEPTION_IDEM_ASYNC_FUNC =
427        new AsyncFunction<RuntimeException, Object, Exception>() {
428            @Override
429            public Promise<Object, Exception> apply(final RuntimeException runtimeException) throws Exception {
430                return newRuntimeExceptionPromise(runtimeException);
431            }
432        };
433
434    private static final Function<RuntimeException, Object, Exception> RUNTIME_EXCEPTION_IDEM_FUNC =
435        new Function<RuntimeException, Object, Exception>() {
436            @Override
437            public Object apply(final RuntimeException runtimeException) {
438                throw runtimeException;
439            }
440        };
441
442    private static final AsyncFunction<Object, Object, Exception> RESULT_IDEM_ASYNC_FUNC =
443        new AsyncFunction<Object, Object, Exception>() {
444            @Override
445            public Promise<Object, Exception> apply(final Object object) throws Exception {
446                return newResultPromise(object);
447            }
448        };
449
450    private static final Function<Object, Object, Exception> RESULT_IDEM_FUNC =
451        new Function<Object, Object, Exception>() {
452            @Override
453            public Object apply(final Object value) throws Exception {
454                return value;
455            }
456        };
457
458    /**
459     * Returns a {@link Promise} representing an asynchronous task which has
460     * already failed with the provided runtime exception. Attempts to get the result will
461     * immediately fail, and any listeners registered against the returned
462     * promise will be immediately invoked in the same thread as the caller.
463     *
464     * @param <V>
465     *            The type of the task's result, or {@link Void} if the task
466     *            does not return anything (i.e. it only has side-effects).
467     * @param <E>
468     *            The type of the exception thrown by the task if it fails, or
469     *            {@link NeverThrowsException} if the task cannot fail.
470     * @param exception
471     *            The exception indicating why the asynchronous task has failed.
472     * @return A {@link Promise} representing an asynchronous task which has
473     *         already failed with the provided exception.
474     */
475    public static <V, E extends Exception> Promise<V, E> newRuntimeExceptionPromise(final RuntimeException exception) {
476        return new RuntimeExceptionPromise<>(exception);
477    }
478
479    /**
480     * Returns a {@link Promise} representing an asynchronous task which has
481     * already failed with the provided exception. Attempts to get the result will
482     * immediately fail, and any listeners registered against the returned
483     * promise will be immediately invoked in the same thread as the caller.
484     *
485     * @param <V>
486     *            The type of the task's result, or {@link Void} if the task
487     *            does not return anything (i.e. it only has side-effects).
488     * @param <E>
489     *            The type of the exception thrown by the task if it fails, or
490     *            {@link NeverThrowsException} if the task cannot fail.
491     * @param exception
492     *            The exception indicating why the asynchronous task has failed.
493     * @return A {@link Promise} representing an asynchronous task which has
494     *         already failed with the provided exception.
495     */
496    public static <V, E extends Exception> Promise<V, E> newExceptionPromise(final E exception) {
497        return new ExceptionPromise<>(exception);
498    }
499
500    /**
501     * Returns a {@link Promise} representing an asynchronous task which has
502     * already succeeded with the provided result. Attempts to get the result
503     * will immediately return the result, and any listeners registered against
504     * the returned promise will be immediately invoked in the same thread as
505     * the caller.
506     *
507     * @param <V>
508     *            The type of the task's result, or {@link Void} if the task
509     *            does not return anything (i.e. it only has side-effects).
510     * @param <E>
511     *            The type of the exception thrown by the task if it fails, or
512     *            {@link NeverThrowsException} if the task cannot fail.
513     * @param result
514     *            The result of the asynchronous task.
515     * @return A {@link Promise} representing an asynchronous task which has
516     *         already succeeded with the provided result.
517     */
518    public static <V, E extends Exception> Promise<V, E> newResultPromise(final V result) {
519        return new ResultPromise<>(result);
520    }
521
522    /**
523     * Returns a {@link Promise} which will be completed once all of the
524     * provided promises have succeeded, or as soon as one of them fails.
525     *
526     * @param <V>
527     *            The type of the tasks' result, or {@link Void} if the tasks do
528     *            not return anything (i.e. they only has side-effects).
529     * @param <E>
530     *            The type of the exception thrown by the tasks if they fail, or
531     *            {@link NeverThrowsException} if the tasks cannot fail.
532     * @param promises
533     *            The list of tasks to be combined.
534     * @return A {@link Promise} which will be completed once all of the
535     *         provided promises have succeeded, or as soon as one of them
536     *         fails.
537     */
538    public static <V, E extends Exception> Promise<List<V>, E> when(
539            final List<Promise<V, E>> promises) {
540        final int size = promises.size();
541        final AtomicInteger remaining = new AtomicInteger(size);
542        final List<V> results = new ArrayList<>(size);
543        final PromiseImpl<List<V>, E> composite = PromiseImpl.create();
544        for (final Promise<V, E> promise : promises) {
545            promise.thenOnResult(new ResultHandler<V>() {
546                @Override
547                public void handleResult(final V value) {
548                    synchronized (results) {
549                        results.add(value);
550                    }
551                    if (remaining.decrementAndGet() == 0) {
552                        composite.handleResult(results);
553                    }
554                }
555            })  .thenOnException(new ExceptionHandler<E>() {
556                @Override
557                public void handleException(final E exception) {
558                    composite.handleException(exception);
559                }
560            })  .thenOnRuntimeException(new RuntimeExceptionHandler() {
561                @Override
562                public void handleRuntimeException(RuntimeException exception) {
563                    composite.handleRuntimeException(exception);
564                }
565            });
566        }
567        if (promises.isEmpty()) {
568            composite.handleResult(results);
569        }
570        return composite;
571    }
572
573    /**
574     * Returns a {@link Promise} which will be completed once all of the
575     * provided promises have succeeded, or as soon as one of them fails.
576     *
577     * @param <V>
578     *            The type of the tasks' result, or {@link Void} if the tasks do
579     *            not return anything (i.e. they only has side-effects).
580     * @param <E>
581     *            The type of the exception thrown by the tasks if they fail, or
582     *            {@link NeverThrowsException} if the tasks cannot fail.
583     * @param promises
584     *            The list of tasks to be combined.
585     * @return A {@link Promise} which will be completed once all of the
586     *         provided promises have succeeded, or as soon as one of them
587     *         has thrown an exception.
588     */
589    @SafeVarargs
590    public static <V, E extends Exception> Promise<List<V>, E> when(final Promise<V, E>... promises) {
591        return when(Arrays.asList(promises));
592    }
593
594    @SuppressWarnings("unchecked")
595    static <VOUT, E extends Exception> AsyncFunction<E, VOUT, E> exceptionIdempotentAsyncFunction() {
596        return (AsyncFunction<E, VOUT, E>) EXCEPTION_IDEM_ASYNC_FUNC;
597    }
598
599    @SuppressWarnings("unchecked")
600    static <VOUT, E extends Exception> Function<E, VOUT, E> exceptionIdempotentFunction() {
601        return (Function<E, VOUT, E>) EXCEPTION_IDEM_FUNC;
602    }
603
604    @SuppressWarnings("unchecked")
605    static <VOUT, E extends Exception> AsyncFunction<RuntimeException, VOUT, E>
606        runtimeExceptionIdempotentAsyncFunction() {
607        return (AsyncFunction<RuntimeException, VOUT, E>) RUNTIME_EXCEPTION_IDEM_ASYNC_FUNC;
608    }
609
610    @SuppressWarnings("unchecked")
611    static <VOUT, E extends Exception> Function<RuntimeException, VOUT, E> runtimeExceptionIdempotentFunction() {
612        return (Function<RuntimeException, VOUT, E>) RUNTIME_EXCEPTION_IDEM_FUNC;
613    }
614
615    @SuppressWarnings("unchecked")
616    static <V, E extends Exception> AsyncFunction<V, V, E> resultIdempotentAsyncFunction() {
617        return (AsyncFunction<V, V, E>) RESULT_IDEM_ASYNC_FUNC;
618    }
619
620    @SuppressWarnings("unchecked")
621    static <V, E extends Exception> Function<V, V, E> resultIdempotentFunction() {
622        return (Function<V, V, E>) RESULT_IDEM_FUNC;
623    }
624
625    private Promises() {
626        // Prevent instantiation.
627    }
628}