001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2015-2016 ForgeRock AS. 015 */ 016 017package org.forgerock.util.promise; 018 019import java.util.ArrayList; 020import java.util.Arrays; 021import java.util.List; 022import java.util.concurrent.ExecutionException; 023import java.util.concurrent.TimeUnit; 024import java.util.concurrent.atomic.AtomicInteger; 025 026import org.forgerock.util.AsyncFunction; 027import org.forgerock.util.Function; 028import org.slf4j.Logger; 029import org.slf4j.LoggerFactory; 030 031/** 032 * Utility methods for creating and composing {@link Promise}s. 033 */ 034public final class Promises { 035 // TODO: n-of, etc. 036 037 /** 038 * These completed promise implementations provide an optimization for 039 * synchronous processing, which have been found to provide small but 040 * significant benefit: 041 * 042 * <ul> 043 * <li>A small reduction in GC overhead (less frequent GCs = more 044 * deterministic response times)</li> 045 * <li>A reduction of the cost associated with calling then() on a 046 * pre-completed promise versus the implementation in 047 * org.forgerock.util.promise.PromiseImpl#then(Function, Function). 048 * Specifically, a reduction in two volatile accesses as well as memory 049 * again.</li> 050 * </ul> 051 * 052 * @param <V> {@inheritDoc} 053 * @param <E> {@inheritDoc} 054 */ 055 private static abstract class CompletedPromise<V, E extends Exception> implements Promise<V, E> { 056 057 private static final Logger LOGGER = LoggerFactory.getLogger(CompletedPromise.class); 058 059 @Override 060 public final boolean cancel(final boolean mayInterruptIfRunning) { 061 return false; 062 } 063 064 @Override 065 public final V get() throws ExecutionException { 066 if (hasResult()) { 067 return getResult(); 068 } else if (hasException()) { 069 throw new ExecutionException(getException()); 070 } else { 071 throw new ExecutionException(getRuntimeException()); 072 } 073 } 074 075 @Override 076 public final V get(final long timeout, final TimeUnit unit) throws ExecutionException { 077 return get(); 078 } 079 080 @Override 081 public final V getOrThrow() throws E { 082 if (hasResult()) { 083 return getResult(); 084 } else if (hasException()) { 085 throw getException(); 086 } else { 087 throw getRuntimeException(); 088 } 089 } 090 091 @Override 092 public final V getOrThrow(final long timeout, final TimeUnit unit) throws E { 093 return getOrThrow(); 094 } 095 096 @Override 097 public final V getOrThrowUninterruptibly() throws E { 098 return getOrThrow(); 099 } 100 101 @Override 102 public final V getOrThrowUninterruptibly(final long timeout, final TimeUnit unit) throws E { 103 return getOrThrow(); 104 } 105 106 @Override 107 public final boolean isCancelled() { 108 return false; 109 } 110 111 @Override 112 public final boolean isDone() { 113 return true; 114 } 115 116 @Override 117 public final Promise<V, E> thenOnException(final ExceptionHandler<? super E> onException) { 118 if (hasException()) { 119 try { 120 onException.handleException(getException()); 121 } catch (RuntimeException e) { 122 LOGGER.error("Ignored unexpected exception thrown by ExceptionHandler", e); 123 } 124 } 125 return this; 126 } 127 128 @Override 129 public final Promise<V, E> thenOnResult(final ResultHandler<? super V> onResult) { 130 if (hasResult()) { 131 try { 132 onResult.handleResult(getResult()); 133 } catch (RuntimeException e) { 134 LOGGER.error("Ignored unexpected exception thrown by ResultHandler", e); 135 } 136 } 137 return this; 138 } 139 140 @Override 141 public final Promise<V, E> thenOnResultOrException(final ResultHandler<? super V> onResult, 142 final ExceptionHandler<? super E> onException) { 143 return thenOnResult(onResult).thenOnException(onException); 144 } 145 146 @Override 147 public final Promise<V, E> thenOnResultOrException(final Runnable onResultOrException) { 148 if (hasResult() || hasException()) { 149 try { 150 onResultOrException.run(); 151 } catch (RuntimeException e) { 152 LOGGER.error("Ignored unexpected exception thrown by Runnable", e); 153 } 154 } 155 return this; 156 } 157 158 @Override 159 public final <VOUT> Promise<VOUT, E> then(final Function<? super V, VOUT, E> onResult) { 160 return then(onResult, Promises.<VOUT, E>exceptionIdempotentFunction()); 161 } 162 163 @Override 164 public <EOUT extends Exception> Promise<V, EOUT> thenCatch(Function<? super E, V, EOUT> onException) { 165 return then(Promises.<V, EOUT>resultIdempotentFunction(), onException); 166 } 167 168 @Override 169 public Promise<V, E> thenCatchRuntimeException( 170 Function<? super RuntimeException, V, E> onRuntimeException) { 171 return then(Promises.<V, E>resultIdempotentFunction(), Promises.<V, E>exceptionIdempotentFunction(), 172 onRuntimeException); 173 } 174 175 @Override 176 public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> then( 177 final Function<? super V, VOUT, EOUT> onResult, 178 final Function<? super E, VOUT, EOUT> onException) { 179 return then(onResult, onException, Promises.<VOUT, EOUT>runtimeExceptionIdempotentFunction()); 180 } 181 182 @Override 183 @SuppressWarnings("unchecked") 184 public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> then( 185 final Function<? super V, VOUT, EOUT> onResult, 186 final Function<? super E, VOUT, EOUT> onException, 187 final Function<? super RuntimeException, VOUT, EOUT> onRuntimeException) { 188 try { 189 if (hasResult()) { 190 return newResultPromise(onResult.apply(getResult())); 191 } else if (hasException()) { 192 return newResultPromise(onException.apply(getException())); 193 } else if (hasRuntimeException()) { 194 return newResultPromise(onRuntimeException.apply(getRuntimeException())); 195 } else { 196 throw new IllegalStateException("Unexpected state"); 197 } 198 } catch (final RuntimeException e) { 199 return new RuntimeExceptionPromise<>(e); 200 } catch (final Exception e) { 201 return newExceptionPromise((EOUT) e); 202 } 203 } 204 205 @Override 206 public final Promise<V, E> thenAlways(final Runnable always) { 207 try { 208 always.run(); 209 } catch (RuntimeException e) { 210 LOGGER.error("Ignored unexpected exception thrown by Runnable", e); 211 } 212 return this; 213 } 214 215 @Override 216 public Promise<V, E> thenFinally(Runnable onFinally) { 217 return thenAlways(onFinally); 218 } 219 220 @Override 221 public final <VOUT> Promise<VOUT, E> thenAsync( 222 final AsyncFunction<? super V, VOUT, E> onResult) { 223 return thenAsync(onResult, Promises.<VOUT, E>exceptionIdempotentAsyncFunction()); 224 } 225 226 @Override 227 public final <EOUT extends Exception> Promise<V, EOUT> thenCatchAsync( 228 final AsyncFunction<? super E, V, EOUT> onException) { 229 return thenAsync(Promises.<V, EOUT>resultIdempotentAsyncFunction(), onException); 230 } 231 232 @Override 233 public Promise<V, E> thenCatchRuntimeExceptionAsync( 234 AsyncFunction<? super RuntimeException, V, E> onRuntimeException) { 235 return thenAsync(Promises.<V, E>resultIdempotentAsyncFunction(), 236 Promises.<V, E>exceptionIdempotentAsyncFunction(), 237 onRuntimeException); 238 } 239 240 @Override 241 public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> thenAsync( 242 final AsyncFunction<? super V, VOUT, EOUT> onResult, 243 final AsyncFunction<? super E, VOUT, EOUT> onException) { 244 return thenAsync(onResult, onException, Promises.<VOUT, EOUT>runtimeExceptionIdempotentAsyncFunction()); 245 } 246 247 @Override 248 @SuppressWarnings("unchecked") 249 public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> thenAsync( 250 final AsyncFunction<? super V, VOUT, EOUT> onResult, 251 final AsyncFunction<? super E, VOUT, EOUT> onException, 252 final AsyncFunction<? super RuntimeException, VOUT, EOUT> onRuntimeException) { 253 try { 254 if (hasResult()) { 255 return (Promise<VOUT, EOUT>) onResult.apply(getResult()); 256 } else if (hasException()) { 257 return (Promise<VOUT, EOUT>) onException.apply(getException()); 258 } else if (hasRuntimeException()) { 259 return (Promise<VOUT, EOUT>) onRuntimeException.apply(getRuntimeException()); 260 } else { 261 throw new IllegalStateException("Unexpected state"); 262 } 263 } catch (final RuntimeException e) { 264 return new RuntimeExceptionPromise<>(e); 265 } catch (final Exception e) { 266 return newExceptionPromise((EOUT) e); 267 } 268 } 269 270 @Override 271 public Promise<V, E> thenOnRuntimeException(RuntimeExceptionHandler onRuntimeException) { 272 if (getRuntimeException() != null) { 273 try { 274 onRuntimeException.handleRuntimeException(getRuntimeException()); 275 } catch (RuntimeException e) { 276 LOGGER.error("Ignored unexpected exception thrown by RuntimeExceptionHandler", e); 277 } 278 } 279 return this; 280 } 281 282 abstract RuntimeException getRuntimeException(); 283 284 abstract E getException(); 285 286 abstract V getResult(); 287 288 abstract boolean hasRuntimeException(); 289 290 abstract boolean hasException(); 291 292 abstract boolean hasResult(); 293 } 294 295 private static final class RuntimeExceptionPromise<V, E extends Exception> extends CompletedPromise<V, E> { 296 private final RuntimeException runtimeException; 297 298 private RuntimeExceptionPromise(RuntimeException runtimeException) { 299 this.runtimeException = runtimeException; 300 } 301 302 @Override 303 RuntimeException getRuntimeException() { 304 return runtimeException; 305 } 306 307 @Override 308 E getException() { 309 return null; 310 } 311 312 @Override 313 V getResult() { 314 return null; 315 } 316 317 @Override 318 boolean hasRuntimeException() { 319 return true; 320 } 321 322 @Override 323 boolean hasException() { 324 return false; 325 } 326 327 @Override 328 boolean hasResult() { 329 return false; 330 } 331 } 332 333 private static final class ExceptionPromise<V, E extends Exception> extends CompletedPromise<V, E> { 334 private final E exception; 335 336 private ExceptionPromise(final E exception) { 337 this.exception = exception; 338 } 339 340 @Override 341 RuntimeException getRuntimeException() { 342 return null; 343 } 344 345 @Override 346 E getException() { 347 return exception; 348 } 349 350 @Override 351 V getResult() { 352 throw new IllegalStateException(); 353 } 354 355 @Override 356 boolean hasRuntimeException() { 357 return false; 358 } 359 360 @Override 361 boolean hasException() { 362 return true; 363 } 364 365 @Override 366 boolean hasResult() { 367 return false; 368 } 369 } 370 371 private static final class ResultPromise<V, E extends Exception> extends 372 CompletedPromise<V, E> { 373 private final V value; 374 375 private ResultPromise(final V value) { 376 this.value = value; 377 } 378 379 @Override 380 RuntimeException getRuntimeException() { 381 return null; 382 } 383 384 @Override 385 E getException() { 386 throw new IllegalStateException(); 387 } 388 389 @Override 390 V getResult() { 391 return value; 392 } 393 394 @Override 395 boolean hasRuntimeException() { 396 return false; 397 } 398 399 @Override 400 boolean hasException() { 401 return false; 402 } 403 404 @Override 405 boolean hasResult() { 406 return true; 407 } 408 } 409 410 private static final AsyncFunction<Exception, Object, Exception> EXCEPTION_IDEM_ASYNC_FUNC = 411 new AsyncFunction<Exception, Object, Exception>() { 412 @Override 413 public Promise<Object, Exception> apply(final Exception exception) throws Exception { 414 return newExceptionPromise(exception); 415 } 416 }; 417 418 private static final Function<Exception, Object, Exception> EXCEPTION_IDEM_FUNC = 419 new Function<Exception, Object, Exception>() { 420 @Override 421 public Object apply(final Exception exception) throws Exception { 422 throw exception; 423 } 424 }; 425 426 private static final AsyncFunction<RuntimeException, Object, Exception> RUNTIME_EXCEPTION_IDEM_ASYNC_FUNC = 427 new AsyncFunction<RuntimeException, Object, Exception>() { 428 @Override 429 public Promise<Object, Exception> apply(final RuntimeException runtimeException) throws Exception { 430 return newRuntimeExceptionPromise(runtimeException); 431 } 432 }; 433 434 private static final Function<RuntimeException, Object, Exception> RUNTIME_EXCEPTION_IDEM_FUNC = 435 new Function<RuntimeException, Object, Exception>() { 436 @Override 437 public Object apply(final RuntimeException runtimeException) { 438 throw runtimeException; 439 } 440 }; 441 442 private static final AsyncFunction<Object, Object, Exception> RESULT_IDEM_ASYNC_FUNC = 443 new AsyncFunction<Object, Object, Exception>() { 444 @Override 445 public Promise<Object, Exception> apply(final Object object) throws Exception { 446 return newResultPromise(object); 447 } 448 }; 449 450 private static final Function<Object, Object, Exception> RESULT_IDEM_FUNC = 451 new Function<Object, Object, Exception>() { 452 @Override 453 public Object apply(final Object value) throws Exception { 454 return value; 455 } 456 }; 457 458 /** 459 * Returns a {@link Promise} representing an asynchronous task which has 460 * already failed with the provided runtime exception. Attempts to get the result will 461 * immediately fail, and any listeners registered against the returned 462 * promise will be immediately invoked in the same thread as the caller. 463 * 464 * @param <V> 465 * The type of the task's result, or {@link Void} if the task 466 * does not return anything (i.e. it only has side-effects). 467 * @param <E> 468 * The type of the exception thrown by the task if it fails, or 469 * {@link NeverThrowsException} if the task cannot fail. 470 * @param exception 471 * The exception indicating why the asynchronous task has failed. 472 * @return A {@link Promise} representing an asynchronous task which has 473 * already failed with the provided exception. 474 */ 475 public static <V, E extends Exception> Promise<V, E> newRuntimeExceptionPromise(final RuntimeException exception) { 476 return new RuntimeExceptionPromise<>(exception); 477 } 478 479 /** 480 * Returns a {@link Promise} representing an asynchronous task which has 481 * already failed with the provided exception. Attempts to get the result will 482 * immediately fail, and any listeners registered against the returned 483 * promise will be immediately invoked in the same thread as the caller. 484 * 485 * @param <V> 486 * The type of the task's result, or {@link Void} if the task 487 * does not return anything (i.e. it only has side-effects). 488 * @param <E> 489 * The type of the exception thrown by the task if it fails, or 490 * {@link NeverThrowsException} if the task cannot fail. 491 * @param exception 492 * The exception indicating why the asynchronous task has failed. 493 * @return A {@link Promise} representing an asynchronous task which has 494 * already failed with the provided exception. 495 */ 496 public static <V, E extends Exception> Promise<V, E> newExceptionPromise(final E exception) { 497 return new ExceptionPromise<>(exception); 498 } 499 500 /** 501 * Returns a {@link Promise} representing an asynchronous task which has 502 * already succeeded with the provided result. Attempts to get the result 503 * will immediately return the result, and any listeners registered against 504 * the returned promise will be immediately invoked in the same thread as 505 * the caller. 506 * 507 * @param <V> 508 * The type of the task's result, or {@link Void} if the task 509 * does not return anything (i.e. it only has side-effects). 510 * @param <E> 511 * The type of the exception thrown by the task if it fails, or 512 * {@link NeverThrowsException} if the task cannot fail. 513 * @param result 514 * The result of the asynchronous task. 515 * @return A {@link Promise} representing an asynchronous task which has 516 * already succeeded with the provided result. 517 */ 518 public static <V, E extends Exception> Promise<V, E> newResultPromise(final V result) { 519 return new ResultPromise<>(result); 520 } 521 522 /** 523 * Returns a {@link Promise} which will be completed once all of the 524 * provided promises have succeeded, or as soon as one of them fails. 525 * 526 * @param <V> 527 * The type of the tasks' result, or {@link Void} if the tasks do 528 * not return anything (i.e. they only has side-effects). 529 * @param <E> 530 * The type of the exception thrown by the tasks if they fail, or 531 * {@link NeverThrowsException} if the tasks cannot fail. 532 * @param promises 533 * The list of tasks to be combined. 534 * @return A {@link Promise} which will be completed once all of the 535 * provided promises have succeeded, or as soon as one of them 536 * fails. 537 */ 538 public static <V, E extends Exception> Promise<List<V>, E> when( 539 final List<Promise<V, E>> promises) { 540 final int size = promises.size(); 541 final AtomicInteger remaining = new AtomicInteger(size); 542 final List<V> results = new ArrayList<>(size); 543 final PromiseImpl<List<V>, E> composite = PromiseImpl.create(); 544 for (final Promise<V, E> promise : promises) { 545 promise.thenOnResult(new ResultHandler<V>() { 546 @Override 547 public void handleResult(final V value) { 548 synchronized (results) { 549 results.add(value); 550 } 551 if (remaining.decrementAndGet() == 0) { 552 composite.handleResult(results); 553 } 554 } 555 }) .thenOnException(new ExceptionHandler<E>() { 556 @Override 557 public void handleException(final E exception) { 558 composite.handleException(exception); 559 } 560 }) .thenOnRuntimeException(new RuntimeExceptionHandler() { 561 @Override 562 public void handleRuntimeException(RuntimeException exception) { 563 composite.handleRuntimeException(exception); 564 } 565 }); 566 } 567 if (promises.isEmpty()) { 568 composite.handleResult(results); 569 } 570 return composite; 571 } 572 573 /** 574 * Returns a {@link Promise} which will be completed once all of the 575 * provided promises have succeeded, or as soon as one of them fails. 576 * 577 * @param <V> 578 * The type of the tasks' result, or {@link Void} if the tasks do 579 * not return anything (i.e. they only has side-effects). 580 * @param <E> 581 * The type of the exception thrown by the tasks if they fail, or 582 * {@link NeverThrowsException} if the tasks cannot fail. 583 * @param promises 584 * The list of tasks to be combined. 585 * @return A {@link Promise} which will be completed once all of the 586 * provided promises have succeeded, or as soon as one of them 587 * has thrown an exception. 588 */ 589 @SafeVarargs 590 public static <V, E extends Exception> Promise<List<V>, E> when(final Promise<V, E>... promises) { 591 return when(Arrays.asList(promises)); 592 } 593 594 @SuppressWarnings("unchecked") 595 static <VOUT, E extends Exception> AsyncFunction<E, VOUT, E> exceptionIdempotentAsyncFunction() { 596 return (AsyncFunction<E, VOUT, E>) EXCEPTION_IDEM_ASYNC_FUNC; 597 } 598 599 @SuppressWarnings("unchecked") 600 static <VOUT, E extends Exception> Function<E, VOUT, E> exceptionIdempotentFunction() { 601 return (Function<E, VOUT, E>) EXCEPTION_IDEM_FUNC; 602 } 603 604 @SuppressWarnings("unchecked") 605 static <VOUT, E extends Exception> AsyncFunction<RuntimeException, VOUT, E> 606 runtimeExceptionIdempotentAsyncFunction() { 607 return (AsyncFunction<RuntimeException, VOUT, E>) RUNTIME_EXCEPTION_IDEM_ASYNC_FUNC; 608 } 609 610 @SuppressWarnings("unchecked") 611 static <VOUT, E extends Exception> Function<RuntimeException, VOUT, E> runtimeExceptionIdempotentFunction() { 612 return (Function<RuntimeException, VOUT, E>) RUNTIME_EXCEPTION_IDEM_FUNC; 613 } 614 615 @SuppressWarnings("unchecked") 616 static <V, E extends Exception> AsyncFunction<V, V, E> resultIdempotentAsyncFunction() { 617 return (AsyncFunction<V, V, E>) RESULT_IDEM_ASYNC_FUNC; 618 } 619 620 @SuppressWarnings("unchecked") 621 static <V, E extends Exception> Function<V, V, E> resultIdempotentFunction() { 622 return (Function<V, V, E>) RESULT_IDEM_FUNC; 623 } 624 625 private Promises() { 626 // Prevent instantiation. 627 } 628}