View Javadoc
1   /*
2    * The contents of this file are subject to the terms of the Common Development and
3    * Distribution License (the License). You may not use this file except in compliance with the
4    * License.
5    *
6    * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
7    * specific language governing permission and limitations under the License.
8    *
9    * When distributing Covered Software, include this CDDL Header Notice in each file and include
10   * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
11   * Header, with the fields enclosed by brackets [] replaced by your own identifying
12   * information: "Portions copyright [year] [name of copyright owner]".
13   *
14   * Copyright 2015-2016 ForgeRock AS.
15   */
16  package org.forgerock.util.promise;
17  
18  import java.util.Queue;
19  import java.util.concurrent.ConcurrentLinkedQueue;
20  import java.util.concurrent.ExecutionException;
21  import java.util.concurrent.TimeUnit;
22  import java.util.concurrent.TimeoutException;
23  
24  import org.forgerock.util.AsyncFunction;
25  import org.forgerock.util.Function;
26  import org.slf4j.Logger;
27  import org.slf4j.LoggerFactory;
28  
29  /**
30   * An implementation of {@link Promise} which can be used as is, or as the basis
31   * for more complex asynchronous behavior. A {@code PromiseImpl} must be
32   * completed by invoking one of:
33   * <ul>
34   * <li>{@link #handleResult} - marks the promise as having succeeded with the
35   * provide result
36   * <li>{@link #handleException} - marks the promise as having failed with the
37   * provided exception
38   * <li>{@link #cancel} - requests cancellation of the asynchronous task
39   * represented by the promise. Cancellation is only supported if the
40   * {@link #tryCancel(boolean)} is overridden and returns an exception.
41   * </ul>
42   *
43   * @param <V>
44   *            The type of the task's result, or {@link Void} if the task does
45   *            not return anything (i.e. it only has side-effects).
46   * @param <E>
47   *            The type of the exception thrown by the task if it fails, or
48   *            {@link NeverThrowsException} if the task cannot fail.
49   * @see Promise
50   * @see Promises
51   */
52  public class PromiseImpl<V, E extends Exception> implements Promise<V, E>, ResultHandler<V>,
53          ExceptionHandler<E>, RuntimeExceptionHandler {
54      // TODO: Is using monitor based sync better than AQS?
55  
56      private static final Logger LOGGER = LoggerFactory.getLogger(PromiseImpl.class);
57  
58      private interface StateListener<V, E extends Exception> {
59          void handleStateChange(int newState, V result, E exception, RuntimeException runtimeException);
60      }
61  
62      /**
63       * State value indicating that this promise has not completed.
64       */
65      private static final int PENDING = 0;
66  
67      /**
68       * State value indicating that this promise has completed successfully
69       * (result set).
70       */
71      private static final int HAS_RESULT = 1;
72  
73      /**
74       * State value indicating that this promise has failed (exception set).
75       */
76      private static final int HAS_EXCEPTION = 2;
77  
78      /**
79       * State value indicating that this promise has been cancelled (exception set).
80       */
81      private static final int CANCELLED = 3;
82  
83      /**
84       * State value indicating that this promise has failed with a runtime exception.
85       */
86      private static final int HAS_RUNTIME_EXCEPTION = 4;
87  
88      /**
89       * Creates a new pending {@link Promise} implementation.
90       *
91       * @param <V>
92       *            The type of the task's result, or {@link Void} if the task
93       *            does not return anything (i.e. it only has side-effects).
94       * @param <E>
95       *            The type of the exception thrown by the task if it fails, or
96       *            {@link NeverThrowsException} if the task cannot fail.
97       * @return A new pending {@link Promise} implementation.
98       */
99      public static <V, E extends Exception> PromiseImpl<V, E> create() {
100         return new PromiseImpl<>();
101     }
102 
103     private volatile int state = PENDING;
104     private V result = null;
105     private E exception = null;
106     private RuntimeException runtimeException = null;
107 
108     private final Queue<StateListener<V, E>> listeners =
109             new ConcurrentLinkedQueue<>();
110 
111     /**
112      * Creates a new pending {@link Promise} implementation. This constructor is
113      * protected to allow for sub-classing.
114      */
115     protected PromiseImpl() {
116         // No implementation.
117     }
118 
119     @Override
120     public final boolean cancel(final boolean mayInterruptIfRunning) {
121         if (isDone()) {
122             // Fail-fast.
123             return false;
124         }
125         final E exception = tryCancel(mayInterruptIfRunning);
126         return exception != null && setState(CANCELLED, null, exception, null);
127     }
128 
129     @Override
130     public final V get() throws InterruptedException, ExecutionException {
131         await(); // Publishes.
132         return get0();
133     }
134 
135     @Override
136     public final V get(final long timeout, final TimeUnit unit) throws InterruptedException,
137             ExecutionException, TimeoutException {
138         await(timeout, unit, false); // Publishes.
139         return get0();
140     }
141 
142     @Override
143     public final V getOrThrow() throws InterruptedException, E {
144         await(); // Publishes.
145         return getOrThrow0();
146     }
147 
148     @Override
149     public final V getOrThrow(final long timeout, final TimeUnit unit) throws InterruptedException,
150             E, TimeoutException {
151         await(timeout, unit, false); // Publishes.
152         return getOrThrow0();
153     }
154 
155     @Override
156     public final V getOrThrowUninterruptibly() throws E {
157         boolean wasInterrupted = false;
158         try {
159             while (true) {
160                 try {
161                     return getOrThrow();
162                 } catch (final InterruptedException e) {
163                     wasInterrupted = true;
164                 }
165             }
166         } finally {
167             if (wasInterrupted) {
168                 Thread.currentThread().interrupt();
169             }
170         }
171     }
172 
173     @Override
174     public final V getOrThrowUninterruptibly(final long timeout, final TimeUnit unit) throws E,
175             TimeoutException {
176         try {
177             await(timeout, unit, true); // Publishes.
178         } catch (InterruptedException ignored) {
179             // Will never occur since interrupts are ignored.
180         }
181         return getOrThrow0();
182     }
183 
184     /**
185      * Signals that the asynchronous task represented by this promise has
186      * failed. If the task has already completed (i.e. {@code isDone() == true})
187      * then calling this method has no effect and the provided result will be
188      * discarded.
189      *
190      * @param exception
191      *            The exception indicating why the task failed.
192      * @see #tryHandleException(Exception)
193      */
194     @Override
195     public final void handleException(final E exception) {
196         tryHandleException(exception);
197     }
198 
199     @Override
200     public void handleRuntimeException(RuntimeException exception) {
201         setState(HAS_RUNTIME_EXCEPTION, null, null, exception);
202     }
203 
204     /**
205      * Signals that the asynchronous task represented by this promise has
206      * succeeded. If the task has already completed (i.e.
207      * {@code isDone() == true}) then calling this method has no effect and the
208      * provided result will be discarded.
209      *
210      * @param result
211      *            The result of the asynchronous task (may be {@code null}).
212      * @see #tryHandleResult(Object)
213      */
214     @Override
215     public final void handleResult(final V result) {
216         tryHandleResult(result);
217     }
218 
219     /**
220      * Attempts to signal that the asynchronous task represented by this promise
221      * has failed. If the task has already completed (i.e.
222      * {@code isDone() == true}) then calling this method has no effect and
223      * {@code false} is returned.
224      * <p>
225      * This method should be used in cases where multiple threads may
226      * concurrently attempt to complete a promise and need to release resources
227      * if the completion attempt fails. For example, an asynchronous TCP connect
228      * attempt may complete after a timeout has expired. In this case the
229      * connection should be immediately closed because it is never going to be
230      * used.
231      *
232      * @param exception
233      *            The exception indicating why the task failed.
234      * @return {@code false} if this promise has already been completed, either
235      *         due to normal termination, an exception, or cancellation (i.e.
236      *         {@code isDone() == true}).
237      * @see #handleException(Exception)
238      * @see #isDone()
239      */
240     public final boolean tryHandleException(final E exception) {
241         return setState(HAS_EXCEPTION, null, exception, null);
242     }
243 
244     /**
245      * Attempts to signal that the asynchronous task represented by this promise
246      * has succeeded. If the task has already completed (i.e.
247      * {@code isDone() == true}) then calling this method has no effect and
248      * {@code false} is returned.
249      * <p>
250      * This method should be used in cases where multiple threads may
251      * concurrently attempt to complete a promise and need to release resources
252      * if the completion attempt fails. For example, an asynchronous TCP connect
253      * attempt may complete after a timeout has expired. In this case the
254      * connection should be immediately closed because it is never going to be
255      * used.
256      *
257      * @param result
258      *            The result of the asynchronous task (may be {@code null}).
259      * @return {@code false} if this promise has already been completed, either
260      *         due to normal termination, an exception, or cancellation (i.e.
261      *         {@code isDone() == true}).
262      * @see #handleResult(Object)
263      * @see #isDone()
264      */
265     public final boolean tryHandleResult(final V result) {
266         return setState(HAS_RESULT, result, null, null);
267     }
268 
269     @Override
270     public final boolean isCancelled() {
271         return state == CANCELLED;
272     }
273 
274     @Override
275     public final boolean isDone() {
276         return state != PENDING;
277     }
278 
279     @Override
280     public final Promise<V, E> thenOnException(final ExceptionHandler<? super E> onException) {
281         addOrFireListener(new StateListener<V, E>() {
282             @Override
283             public void handleStateChange(final int newState, final V result, final E exception,
284                     final RuntimeException runtimeException) {
285                 if (newState == HAS_EXCEPTION || newState == CANCELLED) {
286                     try {
287                         onException.handleException(exception);
288                     } catch (RuntimeException e) {
289                         LOGGER.error("Ignored unexpected exception thrown by ExceptionHandler", e);
290                     }
291                 }
292             }
293         });
294         return this;
295     }
296 
297     @Override
298     public final Promise<V, E> thenOnResult(final ResultHandler<? super V> onResult) {
299         addOrFireListener(new StateListener<V, E>() {
300             @Override
301             public void handleStateChange(final int newState, final V result, final E exception,
302                     final RuntimeException runtimeException) {
303                 if (newState == HAS_RESULT) {
304                     try {
305                         onResult.handleResult(result);
306                     } catch (RuntimeException e) {
307                         LOGGER.error("Ignored unexpected exception thrown by ResultHandler", e);
308                     }
309                 }
310             }
311         });
312         return this;
313     }
314 
315     @Override
316     public final Promise<V, E> thenOnResultOrException(final ResultHandler<? super V> onResult,
317             final ExceptionHandler<? super E> onException) {
318         addOrFireListener(new StateListener<V, E>() {
319             @Override
320             public void handleStateChange(final int newState, final V result, final E exception,
321                     final RuntimeException runtimeException) {
322                 if (newState == HAS_RESULT) {
323                     try {
324                         onResult.handleResult(result);
325                     } catch (RuntimeException e) {
326                         LOGGER.error("Ignored unexpected exception thrown by ResultHandler", e);
327                     }
328                 } else if (newState == HAS_EXCEPTION || newState == CANCELLED) {
329                     try {
330                         onException.handleException(exception);
331                     } catch (RuntimeException e) {
332                         LOGGER.error("Ignored unexpected exception thrown by ExceptionHandler", e);
333                     }
334                 }
335             }
336         });
337         return this;
338     }
339 
340     @Override
341     public final Promise<V, E> thenOnResultOrException(final Runnable onResultOrException) {
342         addOrFireListener(new StateListener<V, E>() {
343             @Override
344             public void handleStateChange(final int newState, final V result, final E exception,
345                     final RuntimeException runtimeException) {
346                 if (newState != HAS_RUNTIME_EXCEPTION) {
347                     try {
348                         onResultOrException.run();
349                     } catch (RuntimeException e) {
350                         LOGGER.error("Ignored unexpected exception thrown by Runnable", e);
351                     }
352                 }
353             }
354         });
355         return this;
356     }
357 
358     @Override
359     public final <VOUT> Promise<VOUT, E> then(final Function<? super V, VOUT, E> onResult) {
360         return then(onResult, Promises.<VOUT, E>exceptionIdempotentFunction());
361     }
362 
363     @Override
364     public <EOUT extends Exception> Promise<V, EOUT> thenCatch(final Function<? super E, V, EOUT> onException) {
365         return then(Promises.<V, EOUT>resultIdempotentFunction(), onException);
366     }
367 
368     @Override
369     public Promise<V, E> thenCatchRuntimeException(
370             Function<? super RuntimeException, V, E> onRuntimeException) {
371         return then(Promises.<V, E>resultIdempotentFunction(), Promises.<V, E>exceptionIdempotentFunction(),
372                 onRuntimeException);
373     }
374 
375     @Override
376     public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> then(
377             final Function<? super V, VOUT, EOUT> onResult, final Function<? super E, VOUT, EOUT> onException) {
378         return then(onResult, onException, Promises.<VOUT, EOUT>runtimeExceptionIdempotentFunction());
379     }
380 
381     @Override
382     @SuppressWarnings("unchecked")
383     public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> then(
384             final Function<? super V, VOUT, EOUT> onResult, final Function<? super E, VOUT, EOUT> onException,
385             final Function<? super RuntimeException, VOUT, EOUT> onRuntimeException) {
386         final PromiseImpl<VOUT, EOUT> chained = new PromiseImpl<>();
387         addOrFireListener(new StateListener<V, E>() {
388             @Override
389             public void handleStateChange(final int newState, final V result, final E exception,
390                                           final RuntimeException runtimeException) {
391                 try {
392                     switch (newState) {
393                         case HAS_RESULT:
394                             chained.handleResult(onResult.apply(result));
395                             break;
396                         case HAS_EXCEPTION:
397                         case CANCELLED:
398                             chained.handleResult(onException.apply(exception));
399                             break;
400                         case HAS_RUNTIME_EXCEPTION:
401                             chained.handleResult(onRuntimeException.apply(runtimeException));
402                             break;
403                         default:
404                             throw new IllegalStateException("Unexpected state : " + newState);
405                     }
406                 } catch (final RuntimeException e) {
407                     tryHandlingRuntimeException(e, chained);
408                 } catch (final Exception e) {
409                     chained.handleException((EOUT) e);
410                 }
411             }
412         });
413         return chained;
414     }
415 
416     private <VOUT, EOUT extends Exception> void tryHandlingRuntimeException(final RuntimeException runtimeException,
417             final PromiseImpl<VOUT, EOUT> chained) {
418         try {
419             chained.handleRuntimeException(runtimeException);
420         } catch (Exception ignored) {
421             LOGGER.error("Runtime exception handler threw a RuntimeException which cannot be handled!", ignored);
422         }
423     }
424 
425     @Override
426     public final Promise<V, E> thenAlways(final Runnable always) {
427         addOrFireListener(new StateListener<V, E>() {
428             @Override
429             public void handleStateChange(final int newState, final V result, final E exception,
430                     final RuntimeException runtimeException) {
431                 try {
432                     always.run();
433                 } catch (RuntimeException e) {
434                     LOGGER.error("Ignored unexpected exception thrown by Runnable", e);
435                 }
436             }
437         });
438         return this;
439     }
440 
441     @Override
442     public final Promise<V, E> thenFinally(final Runnable onFinally) {
443         return thenAlways(onFinally);
444     }
445 
446     @Override
447     public final <VOUT> Promise<VOUT, E> thenAsync(final AsyncFunction<? super V, VOUT, E> onResult) {
448         return thenAsync(onResult, Promises.<VOUT, E>exceptionIdempotentAsyncFunction());
449     }
450 
451     @Override
452     public final <EOUT extends Exception> Promise<V, EOUT> thenCatchAsync(
453             AsyncFunction<? super E, V, EOUT> onException) {
454         return thenAsync(Promises.<V, EOUT>resultIdempotentAsyncFunction(), onException);
455     }
456 
457     @Override
458     public final Promise<V, E> thenCatchRuntimeExceptionAsync(
459             AsyncFunction<? super RuntimeException, V, E> onRuntimeException) {
460         return thenAsync(Promises.<V, E>resultIdempotentAsyncFunction(),
461                 Promises.<V, E>exceptionIdempotentAsyncFunction(), onRuntimeException);
462     }
463 
464     @Override
465     public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> thenAsync(
466             final AsyncFunction<? super V, VOUT, EOUT> onResult,
467             final AsyncFunction<? super E, VOUT, EOUT> onException) {
468         return thenAsync(onResult, onException, Promises.<VOUT, EOUT>runtimeExceptionIdempotentAsyncFunction());
469     }
470 
471     @Override
472     public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> thenAsync(
473             final AsyncFunction<? super V, VOUT, EOUT> onResult,
474             final AsyncFunction<? super E, VOUT, EOUT> onException,
475             final AsyncFunction<? super RuntimeException, VOUT, EOUT> onRuntimeException) {
476         final PromiseImpl<VOUT, EOUT> chained = new PromiseImpl<>();
477         addOrFireListener(new StateListener<V, E>() {
478             @Override
479             @SuppressWarnings("unchecked")
480             public void handleStateChange(final int newState, final V result, final E exception,
481                     final RuntimeException runtimeException) {
482                 try {
483                     switch (newState) {
484                         case HAS_RESULT:
485                             callNestedPromise(onResult.apply(result));
486                             break;
487                         case HAS_EXCEPTION:
488                         case CANCELLED:
489                             callNestedPromise(onException.apply(exception));
490                             break;
491                         case HAS_RUNTIME_EXCEPTION:
492                             callNestedPromise(onRuntimeException.apply(runtimeException));
493                             break;
494                         default:
495                             throw new IllegalStateException("Unexpected state : " + newState);
496                     }
497                 } catch (final RuntimeException e) {
498                     tryHandlingRuntimeException(e, chained);
499                 } catch (final Exception e) {
500                     chained.handleException((EOUT) e);
501                 }
502             }
503 
504             private void callNestedPromise(Promise<? extends VOUT, ? extends EOUT> nestedPromise) {
505                 nestedPromise
506                         .thenOnResult(chained)
507                         .thenOnException(chained)
508                         .thenOnRuntimeException(chained);
509             }
510         });
511         return chained;
512     }
513 
514     @Override
515     public final Promise<V, E> thenOnRuntimeException(final RuntimeExceptionHandler onRuntimeException) {
516         addOrFireListener(new StateListener<V, E>() {
517             @Override
518             public void handleStateChange(int newState, V result, E exception, RuntimeException runtimeException) {
519                 if (newState == HAS_RUNTIME_EXCEPTION) {
520                     try {
521                         onRuntimeException.handleRuntimeException(runtimeException);
522                     } catch (RuntimeException e) {
523                         LOGGER.error("Ignored unexpected exception thrown by RuntimeExceptionHandler", e);
524                     }
525                 }
526             }
527         });
528         return this;
529     }
530 
531     /**
532      * Invoked when the client attempts to cancel the asynchronous task
533      * represented by this promise. Implementations which support cancellation
534      * should override this method to cancel the asynchronous task and, if
535      * successful, return an appropriate exception which can be used to signal
536      * that the task has failed.
537      * <p>
538      * By default cancellation is not supported and this method returns
539      * {@code null}.
540      *
541      * @param mayInterruptIfRunning
542      *            {@code true} if the thread executing this task should be
543      *            interrupted; otherwise, in-progress tasks are allowed to
544      *            complete.
545      * @return {@code null} if cancellation was not supported or not possible,
546      *         otherwise an appropriate exception.
547      */
548     protected E tryCancel(final boolean mayInterruptIfRunning) {
549         return null;
550     }
551 
552     private void addOrFireListener(final StateListener<V, E> listener) {
553         final int stateBefore = state;
554         if (stateBefore != PENDING) {
555             handleCompletion(listener, stateBefore);
556         } else {
557             listeners.add(listener);
558             final int stateAfter = state;
559             if (stateAfter != PENDING && listeners.remove(listener)) {
560                 handleCompletion(listener, stateAfter);
561             }
562         }
563     }
564 
565     private void handleCompletion(final StateListener<V, E> listener, final int completedState) {
566         try {
567             listener.handleStateChange(completedState, result, exception, runtimeException);
568         } catch (RuntimeException ignored) {
569             LOGGER.error("State change listener threw a RuntimeException which cannot be handled!", ignored);
570         }
571     }
572 
573     private V get0() throws ExecutionException {
574         if (runtimeException != null) {
575             throw new ExecutionException(runtimeException);
576         } else if (exception != null) {
577             throw new ExecutionException(exception);
578         } else {
579             return result;
580         }
581     }
582 
583     private V getOrThrow0() throws E {
584         if (runtimeException != null) {
585             throw runtimeException;
586         } else if (exception != null) {
587             throw exception;
588         } else {
589             return result;
590         }
591     }
592 
593     private boolean setState(final int newState, final V result, final E exception,
594             final RuntimeException runtimeException) {
595         synchronized (this) {
596             if (state != PENDING) {
597                 // Already completed.
598                 return false;
599             }
600             this.result = result;
601             this.exception = exception;
602             this.runtimeException = runtimeException;
603             state = newState; // Publishes.
604             notifyAll(); // Wake up any blocked threads.
605         }
606         StateListener<V, E> listener;
607         while ((listener = listeners.poll()) != null) {
608             handleCompletion(listener, newState);
609         }
610         return true;
611     }
612 
613     private void await() throws InterruptedException {
614         // Use double-check for fast-path.
615         if (state == PENDING) {
616             synchronized (this) {
617                 while (state == PENDING) {
618                     wait();
619                 }
620             }
621         }
622     }
623 
624     private void await(final long timeout, final TimeUnit unit, final boolean isUninterruptibly)
625             throws InterruptedException, TimeoutException {
626         // Use double-check for fast-path.
627         if (state == PENDING) {
628             final long timeoutMS = unit.toMillis(timeout);
629             final long endTimeMS = System.currentTimeMillis() + timeoutMS;
630             boolean wasInterrupted = false;
631             try {
632                 synchronized (this) {
633                     while (state == PENDING) {
634                         final long remainingTimeMS = endTimeMS - System.currentTimeMillis();
635                         if (remainingTimeMS <= 0) {
636                             throw new TimeoutException();
637                         }
638                         try {
639                             wait(remainingTimeMS);
640                         } catch (final InterruptedException e) {
641                             if (isUninterruptibly) {
642                                 wasInterrupted = true;
643                             } else {
644                                 throw e;
645                             }
646                         }
647                     }
648                 }
649             } finally {
650                 if (wasInterrupted) {
651                     Thread.currentThread().interrupt();
652                 }
653             }
654         }
655     }
656 }