View Javadoc
1   /*
2    * The contents of this file are subject to the terms of the Common Development and
3    * Distribution License (the License). You may not use this file except in compliance with the
4    * License.
5    *
6    * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
7    * specific language governing permission and limitations under the License.
8    *
9    * When distributing Covered Software, include this CDDL Header Notice in each file and include
10   * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
11   * Header, with the fields enclosed by brackets [] replaced by your own identifying
12   * information: "Portions copyright [year] [name of copyright owner]".
13   *
14   * Copyright 2015-2016 ForgeRock AS.
15   */
16  
17  package org.forgerock.util.promise;
18  
19  import java.util.ArrayList;
20  import java.util.Arrays;
21  import java.util.List;
22  import java.util.concurrent.ExecutionException;
23  import java.util.concurrent.TimeUnit;
24  import java.util.concurrent.atomic.AtomicInteger;
25  
26  import org.forgerock.util.AsyncFunction;
27  import org.forgerock.util.Function;
28  import org.slf4j.Logger;
29  import org.slf4j.LoggerFactory;
30  
31  /**
32   * Utility methods for creating and composing {@link Promise}s.
33   */
34  public final class Promises {
35      // TODO: n-of, etc.
36  
37      /**
38       * These completed promise implementations provide an optimization for
39       * synchronous processing, which have been found to provide small but
40       * significant benefit:
41       *
42       * <ul>
43       *     <li>A small reduction in GC overhead (less frequent GCs = more
44       *     deterministic response times)</li>
45       *     <li>A reduction of the cost associated with calling then() on a
46       *     pre-completed promise versus the implementation in
47       *     org.forgerock.util.promise.PromiseImpl#then(Function, Function).
48       *     Specifically, a reduction in two volatile accesses as well as memory
49       *     again.</li>
50       * </ul>
51       *
52       * @param <V> {@inheritDoc}
53       * @param <E> {@inheritDoc}
54       */
55      private static abstract class CompletedPromise<V, E extends Exception> implements Promise<V, E> {
56  
57          private static final Logger LOGGER = LoggerFactory.getLogger(CompletedPromise.class);
58  
59          @Override
60          public final boolean cancel(final boolean mayInterruptIfRunning) {
61              return false;
62          }
63  
64          @Override
65          public final V get() throws ExecutionException {
66              if (hasResult()) {
67                  return getResult();
68              } else if (hasException()) {
69                  throw new ExecutionException(getException());
70              } else {
71                  throw new ExecutionException(getRuntimeException());
72              }
73          }
74  
75          @Override
76          public final V get(final long timeout, final TimeUnit unit) throws ExecutionException {
77              return get();
78          }
79  
80          @Override
81          public final V getOrThrow() throws E {
82              if (hasResult()) {
83                  return getResult();
84              } else if (hasException()) {
85                  throw getException();
86              } else {
87                  throw getRuntimeException();
88              }
89          }
90  
91          @Override
92          public final V getOrThrow(final long timeout, final TimeUnit unit) throws E {
93              return getOrThrow();
94          }
95  
96          @Override
97          public final V getOrThrowUninterruptibly() throws E {
98              return getOrThrow();
99          }
100 
101         @Override
102         public final V getOrThrowUninterruptibly(final long timeout, final TimeUnit unit) throws E {
103             return getOrThrow();
104         }
105 
106         @Override
107         public final boolean isCancelled() {
108             return false;
109         }
110 
111         @Override
112         public final boolean isDone() {
113             return true;
114         }
115 
116         @Override
117         public final Promise<V, E> thenOnException(final ExceptionHandler<? super E> onException) {
118             if (hasException()) {
119                 try {
120                     onException.handleException(getException());
121                 } catch (RuntimeException e) {
122                     LOGGER.error("Ignored unexpected exception thrown by ExceptionHandler", e);
123                 }
124             }
125             return this;
126         }
127 
128         @Override
129         public final Promise<V, E> thenOnResult(final ResultHandler<? super V> onResult) {
130             if (hasResult()) {
131                 try {
132                     onResult.handleResult(getResult());
133                 } catch (RuntimeException e) {
134                     LOGGER.error("Ignored unexpected exception thrown by ResultHandler", e);
135                 }
136             }
137             return this;
138         }
139 
140         @Override
141         public final Promise<V, E> thenOnResultOrException(final ResultHandler<? super V> onResult,
142                 final ExceptionHandler<? super E> onException) {
143             return thenOnResult(onResult).thenOnException(onException);
144         }
145 
146         @Override
147         public final Promise<V, E> thenOnResultOrException(final Runnable onResultOrException) {
148             if (hasResult() || hasException()) {
149                 try {
150                     onResultOrException.run();
151                 } catch (RuntimeException e) {
152                     LOGGER.error("Ignored unexpected exception thrown by Runnable", e);
153                 }
154             }
155             return this;
156         }
157 
158         @Override
159         public final <VOUT> Promise<VOUT, E> then(final Function<? super V, VOUT, E> onResult) {
160             return then(onResult, Promises.<VOUT, E>exceptionIdempotentFunction());
161         }
162 
163         @Override
164         public <EOUT extends Exception> Promise<V, EOUT> thenCatch(Function<? super E, V, EOUT> onException) {
165             return then(Promises.<V, EOUT>resultIdempotentFunction(), onException);
166         }
167 
168         @Override
169         public Promise<V, E> thenCatchRuntimeException(
170                 Function<? super RuntimeException, V, E> onRuntimeException) {
171             return then(Promises.<V, E>resultIdempotentFunction(), Promises.<V, E>exceptionIdempotentFunction(),
172                     onRuntimeException);
173         }
174 
175         @Override
176         public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> then(
177                 final Function<? super V, VOUT, EOUT> onResult,
178                 final Function<? super E, VOUT, EOUT> onException) {
179             return then(onResult, onException, Promises.<VOUT, EOUT>runtimeExceptionIdempotentFunction());
180         }
181 
182         @Override
183         @SuppressWarnings("unchecked")
184         public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> then(
185                 final Function<? super V, VOUT, EOUT> onResult,
186                 final Function<? super E, VOUT, EOUT> onException,
187                 final Function<? super RuntimeException, VOUT, EOUT> onRuntimeException) {
188             try {
189                 if (hasResult()) {
190                     return newResultPromise(onResult.apply(getResult()));
191                 } else if (hasException()) {
192                     return newResultPromise(onException.apply(getException()));
193                 } else if (hasRuntimeException()) {
194                     return newResultPromise(onRuntimeException.apply(getRuntimeException()));
195                 } else {
196                     throw new IllegalStateException("Unexpected state");
197                 }
198             } catch (final RuntimeException e) {
199                 return new RuntimeExceptionPromise<>(e);
200             } catch (final Exception e) {
201                 return newExceptionPromise((EOUT) e);
202             }
203         }
204 
205         @Override
206         public final Promise<V, E> thenAlways(final Runnable always) {
207             try {
208                 always.run();
209             } catch (RuntimeException e) {
210                 LOGGER.error("Ignored unexpected exception thrown by Runnable", e);
211             }
212             return this;
213         }
214 
215         @Override
216         public Promise<V, E> thenFinally(Runnable onFinally) {
217             return thenAlways(onFinally);
218         }
219 
220         @Override
221         public final <VOUT> Promise<VOUT, E> thenAsync(
222                 final AsyncFunction<? super V, VOUT, E> onResult) {
223             return thenAsync(onResult, Promises.<VOUT, E>exceptionIdempotentAsyncFunction());
224         }
225 
226         @Override
227         public final <EOUT extends Exception> Promise<V, EOUT> thenCatchAsync(
228                 final AsyncFunction<? super E, V, EOUT> onException) {
229             return thenAsync(Promises.<V, EOUT>resultIdempotentAsyncFunction(), onException);
230         }
231 
232         @Override
233         public Promise<V, E> thenCatchRuntimeExceptionAsync(
234                 AsyncFunction<? super RuntimeException, V, E> onRuntimeException) {
235             return thenAsync(Promises.<V, E>resultIdempotentAsyncFunction(),
236                     Promises.<V, E>exceptionIdempotentAsyncFunction(),
237                     onRuntimeException);
238         }
239 
240         @Override
241         public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> thenAsync(
242                 final AsyncFunction<? super V, VOUT, EOUT> onResult,
243                 final AsyncFunction<? super E, VOUT, EOUT> onException) {
244             return thenAsync(onResult, onException, Promises.<VOUT, EOUT>runtimeExceptionIdempotentAsyncFunction());
245         }
246 
247         @Override
248         @SuppressWarnings("unchecked")
249         public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> thenAsync(
250                 final AsyncFunction<? super V, VOUT, EOUT> onResult,
251                 final AsyncFunction<? super E, VOUT, EOUT> onException,
252                 final AsyncFunction<? super RuntimeException, VOUT, EOUT> onRuntimeException) {
253             try {
254                 if (hasResult()) {
255                     return (Promise<VOUT, EOUT>) onResult.apply(getResult());
256                 } else if (hasException()) {
257                     return (Promise<VOUT, EOUT>) onException.apply(getException());
258                 } else if (hasRuntimeException()) {
259                     return (Promise<VOUT, EOUT>) onRuntimeException.apply(getRuntimeException());
260                 } else {
261                     throw new IllegalStateException("Unexpected state");
262                 }
263             } catch (final RuntimeException e) {
264                 return new RuntimeExceptionPromise<>(e);
265             } catch (final Exception e) {
266                 return newExceptionPromise((EOUT) e);
267             }
268         }
269 
270         @Override
271         public Promise<V, E> thenOnRuntimeException(RuntimeExceptionHandler onRuntimeException) {
272             if (getRuntimeException() != null) {
273                 try {
274                     onRuntimeException.handleRuntimeException(getRuntimeException());
275                 } catch (RuntimeException e) {
276                     LOGGER.error("Ignored unexpected exception thrown by RuntimeExceptionHandler", e);
277                 }
278             }
279             return this;
280         }
281 
282         abstract RuntimeException getRuntimeException();
283 
284         abstract E getException();
285 
286         abstract V getResult();
287 
288         abstract boolean hasRuntimeException();
289 
290         abstract boolean hasException();
291 
292         abstract boolean hasResult();
293     }
294 
295     private static final class RuntimeExceptionPromise<V, E extends Exception> extends CompletedPromise<V, E> {
296         private final RuntimeException runtimeException;
297 
298         private RuntimeExceptionPromise(RuntimeException runtimeException) {
299             this.runtimeException = runtimeException;
300         }
301 
302         @Override
303         RuntimeException getRuntimeException() {
304             return runtimeException;
305         }
306 
307         @Override
308         E getException() {
309             return null;
310         }
311 
312         @Override
313         V getResult() {
314             return null;
315         }
316 
317         @Override
318         boolean hasRuntimeException() {
319             return true;
320         }
321 
322         @Override
323         boolean hasException() {
324             return false;
325         }
326 
327         @Override
328         boolean hasResult() {
329             return false;
330         }
331     }
332 
333     private static final class ExceptionPromise<V, E extends Exception> extends CompletedPromise<V, E> {
334         private final E exception;
335 
336         private ExceptionPromise(final E exception) {
337             this.exception = exception;
338         }
339 
340         @Override
341         RuntimeException getRuntimeException() {
342             return null;
343         }
344 
345         @Override
346         E getException() {
347             return exception;
348         }
349 
350         @Override
351         V getResult() {
352             throw new IllegalStateException();
353         }
354 
355         @Override
356         boolean hasRuntimeException() {
357             return false;
358         }
359 
360         @Override
361         boolean hasException() {
362             return true;
363         }
364 
365         @Override
366         boolean hasResult() {
367             return false;
368         }
369     }
370 
371     private static final class ResultPromise<V, E extends Exception> extends
372             CompletedPromise<V, E> {
373         private final V value;
374 
375         private ResultPromise(final V value) {
376             this.value = value;
377         }
378 
379         @Override
380         RuntimeException getRuntimeException() {
381             return null;
382         }
383 
384         @Override
385         E getException() {
386             throw new IllegalStateException();
387         }
388 
389         @Override
390         V getResult() {
391             return value;
392         }
393 
394         @Override
395         boolean hasRuntimeException() {
396             return false;
397         }
398 
399         @Override
400         boolean hasException() {
401             return false;
402         }
403 
404         @Override
405         boolean hasResult() {
406             return true;
407         }
408     }
409 
410     private static final AsyncFunction<Exception, Object, Exception> EXCEPTION_IDEM_ASYNC_FUNC =
411         new AsyncFunction<Exception, Object, Exception>() {
412             @Override
413             public Promise<Object, Exception> apply(final Exception exception) throws Exception {
414                 return newExceptionPromise(exception);
415             }
416         };
417 
418     private static final Function<Exception, Object, Exception> EXCEPTION_IDEM_FUNC =
419         new Function<Exception, Object, Exception>() {
420             @Override
421             public Object apply(final Exception exception) throws Exception {
422                 throw exception;
423             }
424         };
425 
426     private static final AsyncFunction<RuntimeException, Object, Exception> RUNTIME_EXCEPTION_IDEM_ASYNC_FUNC =
427         new AsyncFunction<RuntimeException, Object, Exception>() {
428             @Override
429             public Promise<Object, Exception> apply(final RuntimeException runtimeException) throws Exception {
430                 return newRuntimeExceptionPromise(runtimeException);
431             }
432         };
433 
434     private static final Function<RuntimeException, Object, Exception> RUNTIME_EXCEPTION_IDEM_FUNC =
435         new Function<RuntimeException, Object, Exception>() {
436             @Override
437             public Object apply(final RuntimeException runtimeException) {
438                 throw runtimeException;
439             }
440         };
441 
442     private static final AsyncFunction<Object, Object, Exception> RESULT_IDEM_ASYNC_FUNC =
443         new AsyncFunction<Object, Object, Exception>() {
444             @Override
445             public Promise<Object, Exception> apply(final Object object) throws Exception {
446                 return newResultPromise(object);
447             }
448         };
449 
450     private static final Function<Object, Object, Exception> RESULT_IDEM_FUNC =
451         new Function<Object, Object, Exception>() {
452             @Override
453             public Object apply(final Object value) throws Exception {
454                 return value;
455             }
456         };
457 
458     /**
459      * Returns a {@link Promise} representing an asynchronous task which has
460      * already failed with the provided runtime exception. Attempts to get the result will
461      * immediately fail, and any listeners registered against the returned
462      * promise will be immediately invoked in the same thread as the caller.
463      *
464      * @param <V>
465      *            The type of the task's result, or {@link Void} if the task
466      *            does not return anything (i.e. it only has side-effects).
467      * @param <E>
468      *            The type of the exception thrown by the task if it fails, or
469      *            {@link NeverThrowsException} if the task cannot fail.
470      * @param exception
471      *            The exception indicating why the asynchronous task has failed.
472      * @return A {@link Promise} representing an asynchronous task which has
473      *         already failed with the provided exception.
474      */
475     public static <V, E extends Exception> Promise<V, E> newRuntimeExceptionPromise(final RuntimeException exception) {
476         return new RuntimeExceptionPromise<>(exception);
477     }
478 
479     /**
480      * Returns a {@link Promise} representing an asynchronous task which has
481      * already failed with the provided exception. Attempts to get the result will
482      * immediately fail, and any listeners registered against the returned
483      * promise will be immediately invoked in the same thread as the caller.
484      *
485      * @param <V>
486      *            The type of the task's result, or {@link Void} if the task
487      *            does not return anything (i.e. it only has side-effects).
488      * @param <E>
489      *            The type of the exception thrown by the task if it fails, or
490      *            {@link NeverThrowsException} if the task cannot fail.
491      * @param exception
492      *            The exception indicating why the asynchronous task has failed.
493      * @return A {@link Promise} representing an asynchronous task which has
494      *         already failed with the provided exception.
495      */
496     public static <V, E extends Exception> Promise<V, E> newExceptionPromise(final E exception) {
497         return new ExceptionPromise<>(exception);
498     }
499 
500     /**
501      * Returns a {@link Promise} representing an asynchronous task which has
502      * already succeeded with the provided result. Attempts to get the result
503      * will immediately return the result, and any listeners registered against
504      * the returned promise will be immediately invoked in the same thread as
505      * the caller.
506      *
507      * @param <V>
508      *            The type of the task's result, or {@link Void} if the task
509      *            does not return anything (i.e. it only has side-effects).
510      * @param <E>
511      *            The type of the exception thrown by the task if it fails, or
512      *            {@link NeverThrowsException} if the task cannot fail.
513      * @param result
514      *            The result of the asynchronous task.
515      * @return A {@link Promise} representing an asynchronous task which has
516      *         already succeeded with the provided result.
517      */
518     public static <V, E extends Exception> Promise<V, E> newResultPromise(final V result) {
519         return new ResultPromise<>(result);
520     }
521 
522     /**
523      * Returns a {@link Promise} which will be completed once all of the
524      * provided promises have succeeded, or as soon as one of them fails.
525      *
526      * @param <V>
527      *            The type of the tasks' result, or {@link Void} if the tasks do
528      *            not return anything (i.e. they only has side-effects).
529      * @param <E>
530      *            The type of the exception thrown by the tasks if they fail, or
531      *            {@link NeverThrowsException} if the tasks cannot fail.
532      * @param promises
533      *            The list of tasks to be combined.
534      * @return A {@link Promise} which will be completed once all of the
535      *         provided promises have succeeded, or as soon as one of them
536      *         fails.
537      */
538     public static <V, E extends Exception> Promise<List<V>, E> when(
539             final List<Promise<V, E>> promises) {
540         final int size = promises.size();
541         final AtomicInteger remaining = new AtomicInteger(size);
542         final List<V> results = new ArrayList<>(size);
543         final PromiseImpl<List<V>, E> composite = PromiseImpl.create();
544         for (final Promise<V, E> promise : promises) {
545             promise.thenOnResult(new ResultHandler<V>() {
546                 @Override
547                 public void handleResult(final V value) {
548                     synchronized (results) {
549                         results.add(value);
550                     }
551                     if (remaining.decrementAndGet() == 0) {
552                         composite.handleResult(results);
553                     }
554                 }
555             })  .thenOnException(new ExceptionHandler<E>() {
556                 @Override
557                 public void handleException(final E exception) {
558                     composite.handleException(exception);
559                 }
560             })  .thenOnRuntimeException(new RuntimeExceptionHandler() {
561                 @Override
562                 public void handleRuntimeException(RuntimeException exception) {
563                     composite.handleRuntimeException(exception);
564                 }
565             });
566         }
567         if (promises.isEmpty()) {
568             composite.handleResult(results);
569         }
570         return composite;
571     }
572 
573     /**
574      * Returns a {@link Promise} which will be completed once all of the
575      * provided promises have succeeded, or as soon as one of them fails.
576      *
577      * @param <V>
578      *            The type of the tasks' result, or {@link Void} if the tasks do
579      *            not return anything (i.e. they only has side-effects).
580      * @param <E>
581      *            The type of the exception thrown by the tasks if they fail, or
582      *            {@link NeverThrowsException} if the tasks cannot fail.
583      * @param promises
584      *            The list of tasks to be combined.
585      * @return A {@link Promise} which will be completed once all of the
586      *         provided promises have succeeded, or as soon as one of them
587      *         has thrown an exception.
588      */
589     @SafeVarargs
590     public static <V, E extends Exception> Promise<List<V>, E> when(final Promise<V, E>... promises) {
591         return when(Arrays.asList(promises));
592     }
593 
594     @SuppressWarnings("unchecked")
595     static <VOUT, E extends Exception> AsyncFunction<E, VOUT, E> exceptionIdempotentAsyncFunction() {
596         return (AsyncFunction<E, VOUT, E>) EXCEPTION_IDEM_ASYNC_FUNC;
597     }
598 
599     @SuppressWarnings("unchecked")
600     static <VOUT, E extends Exception> Function<E, VOUT, E> exceptionIdempotentFunction() {
601         return (Function<E, VOUT, E>) EXCEPTION_IDEM_FUNC;
602     }
603 
604     @SuppressWarnings("unchecked")
605     static <VOUT, E extends Exception> AsyncFunction<RuntimeException, VOUT, E>
606         runtimeExceptionIdempotentAsyncFunction() {
607         return (AsyncFunction<RuntimeException, VOUT, E>) RUNTIME_EXCEPTION_IDEM_ASYNC_FUNC;
608     }
609 
610     @SuppressWarnings("unchecked")
611     static <VOUT, E extends Exception> Function<RuntimeException, VOUT, E> runtimeExceptionIdempotentFunction() {
612         return (Function<RuntimeException, VOUT, E>) RUNTIME_EXCEPTION_IDEM_FUNC;
613     }
614 
615     @SuppressWarnings("unchecked")
616     static <V, E extends Exception> AsyncFunction<V, V, E> resultIdempotentAsyncFunction() {
617         return (AsyncFunction<V, V, E>) RESULT_IDEM_ASYNC_FUNC;
618     }
619 
620     @SuppressWarnings("unchecked")
621     static <V, E extends Exception> Function<V, V, E> resultIdempotentFunction() {
622         return (Function<V, V, E>) RESULT_IDEM_FUNC;
623     }
624 
625     private Promises() {
626         // Prevent instantiation.
627     }
628 }