001/*
002 * The contents of this file are subject to the terms of the Common Development and
003 * Distribution License (the License). You may not use this file except in compliance with the
004 * License.
005 *
006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
007 * specific language governing permission and limitations under the License.
008 *
009 * When distributing Covered Software, include this CDDL Header Notice in each file and include
010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
011 * Header, with the fields enclosed by brackets [] replaced by your own identifying
012 * information: "Portions copyright [year] [name of copyright owner]".
013 *
014 * Copyright 2015-2016 ForgeRock AS.
015 */
016package org.forgerock.util.promise;
017
018import java.util.Queue;
019import java.util.concurrent.ConcurrentLinkedQueue;
020import java.util.concurrent.ExecutionException;
021import java.util.concurrent.TimeUnit;
022import java.util.concurrent.TimeoutException;
023
024import org.forgerock.util.AsyncFunction;
025import org.forgerock.util.Function;
026import org.slf4j.Logger;
027import org.slf4j.LoggerFactory;
028
029/**
030 * An implementation of {@link Promise} which can be used as is, or as the basis
031 * for more complex asynchronous behavior. A {@code PromiseImpl} must be
032 * completed by invoking one of:
033 * <ul>
034 * <li>{@link #handleResult} - marks the promise as having succeeded with the
035 * provide result
036 * <li>{@link #handleException} - marks the promise as having failed with the
037 * provided exception
038 * <li>{@link #cancel} - requests cancellation of the asynchronous task
039 * represented by the promise. Cancellation is only supported if the
040 * {@link #tryCancel(boolean)} is overridden and returns an exception.
041 * </ul>
042 *
043 * @param <V>
044 *            The type of the task's result, or {@link Void} if the task does
045 *            not return anything (i.e. it only has side-effects).
046 * @param <E>
047 *            The type of the exception thrown by the task if it fails, or
048 *            {@link NeverThrowsException} if the task cannot fail.
049 * @see Promise
050 * @see Promises
051 */
052public class PromiseImpl<V, E extends Exception> implements Promise<V, E>, ResultHandler<V>,
053        ExceptionHandler<E>, RuntimeExceptionHandler {
054    // TODO: Is using monitor based sync better than AQS?
055
056    private static final Logger LOGGER = LoggerFactory.getLogger(PromiseImpl.class);
057
058    private interface StateListener<V, E extends Exception> {
059        void handleStateChange(int newState, V result, E exception, RuntimeException runtimeException);
060    }
061
062    /**
063     * State value indicating that this promise has not completed.
064     */
065    private static final int PENDING = 0;
066
067    /**
068     * State value indicating that this promise has completed successfully
069     * (result set).
070     */
071    private static final int HAS_RESULT = 1;
072
073    /**
074     * State value indicating that this promise has failed (exception set).
075     */
076    private static final int HAS_EXCEPTION = 2;
077
078    /**
079     * State value indicating that this promise has been cancelled (exception set).
080     */
081    private static final int CANCELLED = 3;
082
083    /**
084     * State value indicating that this promise has failed with a runtime exception.
085     */
086    private static final int HAS_RUNTIME_EXCEPTION = 4;
087
088    /**
089     * Creates a new pending {@link Promise} implementation.
090     *
091     * @param <V>
092     *            The type of the task's result, or {@link Void} if the task
093     *            does not return anything (i.e. it only has side-effects).
094     * @param <E>
095     *            The type of the exception thrown by the task if it fails, or
096     *            {@link NeverThrowsException} if the task cannot fail.
097     * @return A new pending {@link Promise} implementation.
098     */
099    public static <V, E extends Exception> PromiseImpl<V, E> create() {
100        return new PromiseImpl<>();
101    }
102
103    private volatile int state = PENDING;
104    private V result = null;
105    private E exception = null;
106    private RuntimeException runtimeException = null;
107
108    private final Queue<StateListener<V, E>> listeners =
109            new ConcurrentLinkedQueue<>();
110
111    /**
112     * Creates a new pending {@link Promise} implementation. This constructor is
113     * protected to allow for sub-classing.
114     */
115    protected PromiseImpl() {
116        // No implementation.
117    }
118
119    @Override
120    public final boolean cancel(final boolean mayInterruptIfRunning) {
121        if (isDone()) {
122            // Fail-fast.
123            return false;
124        }
125        final E exception = tryCancel(mayInterruptIfRunning);
126        return exception != null && setState(CANCELLED, null, exception, null);
127    }
128
129    @Override
130    public final V get() throws InterruptedException, ExecutionException {
131        await(); // Publishes.
132        return get0();
133    }
134
135    @Override
136    public final V get(final long timeout, final TimeUnit unit) throws InterruptedException,
137            ExecutionException, TimeoutException {
138        await(timeout, unit, false); // Publishes.
139        return get0();
140    }
141
142    @Override
143    public final V getOrThrow() throws InterruptedException, E {
144        await(); // Publishes.
145        return getOrThrow0();
146    }
147
148    @Override
149    public final V getOrThrow(final long timeout, final TimeUnit unit) throws InterruptedException,
150            E, TimeoutException {
151        await(timeout, unit, false); // Publishes.
152        return getOrThrow0();
153    }
154
155    @Override
156    public final V getOrThrowUninterruptibly() throws E {
157        boolean wasInterrupted = false;
158        try {
159            while (true) {
160                try {
161                    return getOrThrow();
162                } catch (final InterruptedException e) {
163                    wasInterrupted = true;
164                }
165            }
166        } finally {
167            if (wasInterrupted) {
168                Thread.currentThread().interrupt();
169            }
170        }
171    }
172
173    @Override
174    public final V getOrThrowUninterruptibly(final long timeout, final TimeUnit unit) throws E,
175            TimeoutException {
176        try {
177            await(timeout, unit, true); // Publishes.
178        } catch (InterruptedException ignored) {
179            // Will never occur since interrupts are ignored.
180        }
181        return getOrThrow0();
182    }
183
184    /**
185     * Signals that the asynchronous task represented by this promise has
186     * failed. If the task has already completed (i.e. {@code isDone() == true})
187     * then calling this method has no effect and the provided result will be
188     * discarded.
189     *
190     * @param exception
191     *            The exception indicating why the task failed.
192     * @see #tryHandleException(Exception)
193     */
194    @Override
195    public final void handleException(final E exception) {
196        tryHandleException(exception);
197    }
198
199    @Override
200    public void handleRuntimeException(RuntimeException exception) {
201        setState(HAS_RUNTIME_EXCEPTION, null, null, exception);
202    }
203
204    /**
205     * Signals that the asynchronous task represented by this promise has
206     * succeeded. If the task has already completed (i.e.
207     * {@code isDone() == true}) then calling this method has no effect and the
208     * provided result will be discarded.
209     *
210     * @param result
211     *            The result of the asynchronous task (may be {@code null}).
212     * @see #tryHandleResult(Object)
213     */
214    @Override
215    public final void handleResult(final V result) {
216        tryHandleResult(result);
217    }
218
219    /**
220     * Attempts to signal that the asynchronous task represented by this promise
221     * has failed. If the task has already completed (i.e.
222     * {@code isDone() == true}) then calling this method has no effect and
223     * {@code false} is returned.
224     * <p>
225     * This method should be used in cases where multiple threads may
226     * concurrently attempt to complete a promise and need to release resources
227     * if the completion attempt fails. For example, an asynchronous TCP connect
228     * attempt may complete after a timeout has expired. In this case the
229     * connection should be immediately closed because it is never going to be
230     * used.
231     *
232     * @param exception
233     *            The exception indicating why the task failed.
234     * @return {@code false} if this promise has already been completed, either
235     *         due to normal termination, an exception, or cancellation (i.e.
236     *         {@code isDone() == true}).
237     * @see #handleException(Exception)
238     * @see #isDone()
239     */
240    public final boolean tryHandleException(final E exception) {
241        return setState(HAS_EXCEPTION, null, exception, null);
242    }
243
244    /**
245     * Attempts to signal that the asynchronous task represented by this promise
246     * has succeeded. If the task has already completed (i.e.
247     * {@code isDone() == true}) then calling this method has no effect and
248     * {@code false} is returned.
249     * <p>
250     * This method should be used in cases where multiple threads may
251     * concurrently attempt to complete a promise and need to release resources
252     * if the completion attempt fails. For example, an asynchronous TCP connect
253     * attempt may complete after a timeout has expired. In this case the
254     * connection should be immediately closed because it is never going to be
255     * used.
256     *
257     * @param result
258     *            The result of the asynchronous task (may be {@code null}).
259     * @return {@code false} if this promise has already been completed, either
260     *         due to normal termination, an exception, or cancellation (i.e.
261     *         {@code isDone() == true}).
262     * @see #handleResult(Object)
263     * @see #isDone()
264     */
265    public final boolean tryHandleResult(final V result) {
266        return setState(HAS_RESULT, result, null, null);
267    }
268
269    @Override
270    public final boolean isCancelled() {
271        return state == CANCELLED;
272    }
273
274    @Override
275    public final boolean isDone() {
276        return state != PENDING;
277    }
278
279    @Override
280    public final Promise<V, E> thenOnException(final ExceptionHandler<? super E> onException) {
281        addOrFireListener(new StateListener<V, E>() {
282            @Override
283            public void handleStateChange(final int newState, final V result, final E exception,
284                    final RuntimeException runtimeException) {
285                if (newState == HAS_EXCEPTION || newState == CANCELLED) {
286                    try {
287                        onException.handleException(exception);
288                    } catch (RuntimeException e) {
289                        LOGGER.error("Ignored unexpected exception thrown by ExceptionHandler", e);
290                    }
291                }
292            }
293        });
294        return this;
295    }
296
297    @Override
298    public final Promise<V, E> thenOnResult(final ResultHandler<? super V> onResult) {
299        addOrFireListener(new StateListener<V, E>() {
300            @Override
301            public void handleStateChange(final int newState, final V result, final E exception,
302                    final RuntimeException runtimeException) {
303                if (newState == HAS_RESULT) {
304                    try {
305                        onResult.handleResult(result);
306                    } catch (RuntimeException e) {
307                        LOGGER.error("Ignored unexpected exception thrown by ResultHandler", e);
308                    }
309                }
310            }
311        });
312        return this;
313    }
314
315    @Override
316    public final Promise<V, E> thenOnResultOrException(final ResultHandler<? super V> onResult,
317            final ExceptionHandler<? super E> onException) {
318        addOrFireListener(new StateListener<V, E>() {
319            @Override
320            public void handleStateChange(final int newState, final V result, final E exception,
321                    final RuntimeException runtimeException) {
322                if (newState == HAS_RESULT) {
323                    try {
324                        onResult.handleResult(result);
325                    } catch (RuntimeException e) {
326                        LOGGER.error("Ignored unexpected exception thrown by ResultHandler", e);
327                    }
328                } else if (newState == HAS_EXCEPTION || newState == CANCELLED) {
329                    try {
330                        onException.handleException(exception);
331                    } catch (RuntimeException e) {
332                        LOGGER.error("Ignored unexpected exception thrown by ExceptionHandler", e);
333                    }
334                }
335            }
336        });
337        return this;
338    }
339
340    @Override
341    public final Promise<V, E> thenOnResultOrException(final Runnable onResultOrException) {
342        addOrFireListener(new StateListener<V, E>() {
343            @Override
344            public void handleStateChange(final int newState, final V result, final E exception,
345                    final RuntimeException runtimeException) {
346                if (newState != HAS_RUNTIME_EXCEPTION) {
347                    try {
348                        onResultOrException.run();
349                    } catch (RuntimeException e) {
350                        LOGGER.error("Ignored unexpected exception thrown by Runnable", e);
351                    }
352                }
353            }
354        });
355        return this;
356    }
357
358    @Override
359    public final <VOUT> Promise<VOUT, E> then(final Function<? super V, VOUT, E> onResult) {
360        return then(onResult, Promises.<VOUT, E>exceptionIdempotentFunction());
361    }
362
363    @Override
364    public <EOUT extends Exception> Promise<V, EOUT> thenCatch(final Function<? super E, V, EOUT> onException) {
365        return then(Promises.<V, EOUT>resultIdempotentFunction(), onException);
366    }
367
368    @Override
369    public Promise<V, E> thenCatchRuntimeException(
370            Function<? super RuntimeException, V, E> onRuntimeException) {
371        return then(Promises.<V, E>resultIdempotentFunction(), Promises.<V, E>exceptionIdempotentFunction(),
372                onRuntimeException);
373    }
374
375    @Override
376    public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> then(
377            final Function<? super V, VOUT, EOUT> onResult, final Function<? super E, VOUT, EOUT> onException) {
378        return then(onResult, onException, Promises.<VOUT, EOUT>runtimeExceptionIdempotentFunction());
379    }
380
381    @Override
382    @SuppressWarnings("unchecked")
383    public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> then(
384            final Function<? super V, VOUT, EOUT> onResult, final Function<? super E, VOUT, EOUT> onException,
385            final Function<? super RuntimeException, VOUT, EOUT> onRuntimeException) {
386        final PromiseImpl<VOUT, EOUT> chained = new PromiseImpl<>();
387        addOrFireListener(new StateListener<V, E>() {
388            @Override
389            public void handleStateChange(final int newState, final V result, final E exception,
390                                          final RuntimeException runtimeException) {
391                try {
392                    switch (newState) {
393                        case HAS_RESULT:
394                            chained.handleResult(onResult.apply(result));
395                            break;
396                        case HAS_EXCEPTION:
397                        case CANCELLED:
398                            chained.handleResult(onException.apply(exception));
399                            break;
400                        case HAS_RUNTIME_EXCEPTION:
401                            chained.handleResult(onRuntimeException.apply(runtimeException));
402                            break;
403                        default:
404                            throw new IllegalStateException("Unexpected state : " + newState);
405                    }
406                } catch (final RuntimeException e) {
407                    tryHandlingRuntimeException(e, chained);
408                } catch (final Exception e) {
409                    chained.handleException((EOUT) e);
410                }
411            }
412        });
413        return chained;
414    }
415
416    private <VOUT, EOUT extends Exception> void tryHandlingRuntimeException(final RuntimeException runtimeException,
417            final PromiseImpl<VOUT, EOUT> chained) {
418        try {
419            chained.handleRuntimeException(runtimeException);
420        } catch (Exception ignored) {
421            LOGGER.error("Runtime exception handler threw a RuntimeException which cannot be handled!", ignored);
422        }
423    }
424
425    @Override
426    public final Promise<V, E> thenAlways(final Runnable always) {
427        addOrFireListener(new StateListener<V, E>() {
428            @Override
429            public void handleStateChange(final int newState, final V result, final E exception,
430                    final RuntimeException runtimeException) {
431                try {
432                    always.run();
433                } catch (RuntimeException e) {
434                    LOGGER.error("Ignored unexpected exception thrown by Runnable", e);
435                }
436            }
437        });
438        return this;
439    }
440
441    @Override
442    public final Promise<V, E> thenFinally(final Runnable onFinally) {
443        return thenAlways(onFinally);
444    }
445
446    @Override
447    public final <VOUT> Promise<VOUT, E> thenAsync(final AsyncFunction<? super V, VOUT, E> onResult) {
448        return thenAsync(onResult, Promises.<VOUT, E>exceptionIdempotentAsyncFunction());
449    }
450
451    @Override
452    public final <EOUT extends Exception> Promise<V, EOUT> thenCatchAsync(
453            AsyncFunction<? super E, V, EOUT> onException) {
454        return thenAsync(Promises.<V, EOUT>resultIdempotentAsyncFunction(), onException);
455    }
456
457    @Override
458    public final Promise<V, E> thenCatchRuntimeExceptionAsync(
459            AsyncFunction<? super RuntimeException, V, E> onRuntimeException) {
460        return thenAsync(Promises.<V, E>resultIdempotentAsyncFunction(),
461                Promises.<V, E>exceptionIdempotentAsyncFunction(), onRuntimeException);
462    }
463
464    @Override
465    public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> thenAsync(
466            final AsyncFunction<? super V, VOUT, EOUT> onResult,
467            final AsyncFunction<? super E, VOUT, EOUT> onException) {
468        return thenAsync(onResult, onException, Promises.<VOUT, EOUT>runtimeExceptionIdempotentAsyncFunction());
469    }
470
471    @Override
472    public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> thenAsync(
473            final AsyncFunction<? super V, VOUT, EOUT> onResult,
474            final AsyncFunction<? super E, VOUT, EOUT> onException,
475            final AsyncFunction<? super RuntimeException, VOUT, EOUT> onRuntimeException) {
476        final PromiseImpl<VOUT, EOUT> chained = new PromiseImpl<>();
477        addOrFireListener(new StateListener<V, E>() {
478            @Override
479            @SuppressWarnings("unchecked")
480            public void handleStateChange(final int newState, final V result, final E exception,
481                    final RuntimeException runtimeException) {
482                try {
483                    switch (newState) {
484                        case HAS_RESULT:
485                            callNestedPromise(onResult.apply(result));
486                            break;
487                        case HAS_EXCEPTION:
488                        case CANCELLED:
489                            callNestedPromise(onException.apply(exception));
490                            break;
491                        case HAS_RUNTIME_EXCEPTION:
492                            callNestedPromise(onRuntimeException.apply(runtimeException));
493                            break;
494                        default:
495                            throw new IllegalStateException("Unexpected state : " + newState);
496                    }
497                } catch (final RuntimeException e) {
498                    tryHandlingRuntimeException(e, chained);
499                } catch (final Exception e) {
500                    chained.handleException((EOUT) e);
501                }
502            }
503
504            private void callNestedPromise(Promise<? extends VOUT, ? extends EOUT> nestedPromise) {
505                nestedPromise
506                        .thenOnResult(chained)
507                        .thenOnException(chained)
508                        .thenOnRuntimeException(chained);
509            }
510        });
511        return chained;
512    }
513
514    @Override
515    public final Promise<V, E> thenOnRuntimeException(final RuntimeExceptionHandler onRuntimeException) {
516        addOrFireListener(new StateListener<V, E>() {
517            @Override
518            public void handleStateChange(int newState, V result, E exception, RuntimeException runtimeException) {
519                if (newState == HAS_RUNTIME_EXCEPTION) {
520                    try {
521                        onRuntimeException.handleRuntimeException(runtimeException);
522                    } catch (RuntimeException e) {
523                        LOGGER.error("Ignored unexpected exception thrown by RuntimeExceptionHandler", e);
524                    }
525                }
526            }
527        });
528        return this;
529    }
530
531    /**
532     * Invoked when the client attempts to cancel the asynchronous task
533     * represented by this promise. Implementations which support cancellation
534     * should override this method to cancel the asynchronous task and, if
535     * successful, return an appropriate exception which can be used to signal
536     * that the task has failed.
537     * <p>
538     * By default cancellation is not supported and this method returns
539     * {@code null}.
540     *
541     * @param mayInterruptIfRunning
542     *            {@code true} if the thread executing this task should be
543     *            interrupted; otherwise, in-progress tasks are allowed to
544     *            complete.
545     * @return {@code null} if cancellation was not supported or not possible,
546     *         otherwise an appropriate exception.
547     */
548    protected E tryCancel(final boolean mayInterruptIfRunning) {
549        return null;
550    }
551
552    private void addOrFireListener(final StateListener<V, E> listener) {
553        final int stateBefore = state;
554        if (stateBefore != PENDING) {
555            handleCompletion(listener, stateBefore);
556        } else {
557            listeners.add(listener);
558            final int stateAfter = state;
559            if (stateAfter != PENDING && listeners.remove(listener)) {
560                handleCompletion(listener, stateAfter);
561            }
562        }
563    }
564
565    private void handleCompletion(final StateListener<V, E> listener, final int completedState) {
566        try {
567            listener.handleStateChange(completedState, result, exception, runtimeException);
568        } catch (RuntimeException ignored) {
569            LOGGER.error("State change listener threw a RuntimeException which cannot be handled!", ignored);
570        }
571    }
572
573    private V get0() throws ExecutionException {
574        if (runtimeException != null) {
575            throw new ExecutionException(runtimeException);
576        } else if (exception != null) {
577            throw new ExecutionException(exception);
578        } else {
579            return result;
580        }
581    }
582
583    private V getOrThrow0() throws E {
584        if (runtimeException != null) {
585            throw runtimeException;
586        } else if (exception != null) {
587            throw exception;
588        } else {
589            return result;
590        }
591    }
592
593    private boolean setState(final int newState, final V result, final E exception,
594            final RuntimeException runtimeException) {
595        synchronized (this) {
596            if (state != PENDING) {
597                // Already completed.
598                return false;
599            }
600            this.result = result;
601            this.exception = exception;
602            this.runtimeException = runtimeException;
603            state = newState; // Publishes.
604            notifyAll(); // Wake up any blocked threads.
605        }
606        StateListener<V, E> listener;
607        while ((listener = listeners.poll()) != null) {
608            handleCompletion(listener, newState);
609        }
610        return true;
611    }
612
613    private void await() throws InterruptedException {
614        // Use double-check for fast-path.
615        if (state == PENDING) {
616            synchronized (this) {
617                while (state == PENDING) {
618                    wait();
619                }
620            }
621        }
622    }
623
624    private void await(final long timeout, final TimeUnit unit, final boolean isUninterruptibly)
625            throws InterruptedException, TimeoutException {
626        // Use double-check for fast-path.
627        if (state == PENDING) {
628            final long timeoutMS = unit.toMillis(timeout);
629            final long endTimeMS = System.currentTimeMillis() + timeoutMS;
630            boolean wasInterrupted = false;
631            try {
632                synchronized (this) {
633                    while (state == PENDING) {
634                        final long remainingTimeMS = endTimeMS - System.currentTimeMillis();
635                        if (remainingTimeMS <= 0) {
636                            throw new TimeoutException();
637                        }
638                        try {
639                            wait(remainingTimeMS);
640                        } catch (final InterruptedException e) {
641                            if (isUninterruptibly) {
642                                wasInterrupted = true;
643                            } else {
644                                throw e;
645                            }
646                        }
647                    }
648                }
649            } finally {
650                if (wasInterrupted) {
651                    Thread.currentThread().interrupt();
652                }
653            }
654        }
655    }
656}