001/* 002 * The contents of this file are subject to the terms of the Common Development and 003 * Distribution License (the License). You may not use this file except in compliance with the 004 * License. 005 * 006 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the 007 * specific language governing permission and limitations under the License. 008 * 009 * When distributing Covered Software, include this CDDL Header Notice in each file and include 010 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL 011 * Header, with the fields enclosed by brackets [] replaced by your own identifying 012 * information: "Portions copyright [year] [name of copyright owner]". 013 * 014 * Copyright 2015-2016 ForgeRock AS. 015 */ 016package org.forgerock.util.promise; 017 018import java.util.Queue; 019import java.util.concurrent.ConcurrentLinkedQueue; 020import java.util.concurrent.ExecutionException; 021import java.util.concurrent.TimeUnit; 022import java.util.concurrent.TimeoutException; 023 024import org.forgerock.util.AsyncFunction; 025import org.forgerock.util.Function; 026import org.slf4j.Logger; 027import org.slf4j.LoggerFactory; 028 029/** 030 * An implementation of {@link Promise} which can be used as is, or as the basis 031 * for more complex asynchronous behavior. A {@code PromiseImpl} must be 032 * completed by invoking one of: 033 * <ul> 034 * <li>{@link #handleResult} - marks the promise as having succeeded with the 035 * provide result 036 * <li>{@link #handleException} - marks the promise as having failed with the 037 * provided exception 038 * <li>{@link #cancel} - requests cancellation of the asynchronous task 039 * represented by the promise. Cancellation is only supported if the 040 * {@link #tryCancel(boolean)} is overridden and returns an exception. 041 * </ul> 042 * 043 * @param <V> 044 * The type of the task's result, or {@link Void} if the task does 045 * not return anything (i.e. it only has side-effects). 046 * @param <E> 047 * The type of the exception thrown by the task if it fails, or 048 * {@link NeverThrowsException} if the task cannot fail. 049 * @see Promise 050 * @see Promises 051 */ 052public class PromiseImpl<V, E extends Exception> implements Promise<V, E>, ResultHandler<V>, 053 ExceptionHandler<E>, RuntimeExceptionHandler { 054 // TODO: Is using monitor based sync better than AQS? 055 056 private static final Logger LOGGER = LoggerFactory.getLogger(PromiseImpl.class); 057 058 private interface StateListener<V, E extends Exception> { 059 void handleStateChange(int newState, V result, E exception, RuntimeException runtimeException); 060 } 061 062 /** 063 * State value indicating that this promise has not completed. 064 */ 065 private static final int PENDING = 0; 066 067 /** 068 * State value indicating that this promise has completed successfully 069 * (result set). 070 */ 071 private static final int HAS_RESULT = 1; 072 073 /** 074 * State value indicating that this promise has failed (exception set). 075 */ 076 private static final int HAS_EXCEPTION = 2; 077 078 /** 079 * State value indicating that this promise has been cancelled (exception set). 080 */ 081 private static final int CANCELLED = 3; 082 083 /** 084 * State value indicating that this promise has failed with a runtime exception. 085 */ 086 private static final int HAS_RUNTIME_EXCEPTION = 4; 087 088 /** 089 * Creates a new pending {@link Promise} implementation. 090 * 091 * @param <V> 092 * The type of the task's result, or {@link Void} if the task 093 * does not return anything (i.e. it only has side-effects). 094 * @param <E> 095 * The type of the exception thrown by the task if it fails, or 096 * {@link NeverThrowsException} if the task cannot fail. 097 * @return A new pending {@link Promise} implementation. 098 */ 099 public static <V, E extends Exception> PromiseImpl<V, E> create() { 100 return new PromiseImpl<>(); 101 } 102 103 private volatile int state = PENDING; 104 private V result = null; 105 private E exception = null; 106 private RuntimeException runtimeException = null; 107 108 private final Queue<StateListener<V, E>> listeners = 109 new ConcurrentLinkedQueue<>(); 110 111 /** 112 * Creates a new pending {@link Promise} implementation. This constructor is 113 * protected to allow for sub-classing. 114 */ 115 protected PromiseImpl() { 116 // No implementation. 117 } 118 119 @Override 120 public final boolean cancel(final boolean mayInterruptIfRunning) { 121 if (isDone()) { 122 // Fail-fast. 123 return false; 124 } 125 final E exception = tryCancel(mayInterruptIfRunning); 126 return exception != null && setState(CANCELLED, null, exception, null); 127 } 128 129 @Override 130 public final V get() throws InterruptedException, ExecutionException { 131 await(); // Publishes. 132 return get0(); 133 } 134 135 @Override 136 public final V get(final long timeout, final TimeUnit unit) throws InterruptedException, 137 ExecutionException, TimeoutException { 138 await(timeout, unit, false); // Publishes. 139 return get0(); 140 } 141 142 @Override 143 public final V getOrThrow() throws InterruptedException, E { 144 await(); // Publishes. 145 return getOrThrow0(); 146 } 147 148 @Override 149 public final V getOrThrow(final long timeout, final TimeUnit unit) throws InterruptedException, 150 E, TimeoutException { 151 await(timeout, unit, false); // Publishes. 152 return getOrThrow0(); 153 } 154 155 @Override 156 public final V getOrThrowUninterruptibly() throws E { 157 boolean wasInterrupted = false; 158 try { 159 while (true) { 160 try { 161 return getOrThrow(); 162 } catch (final InterruptedException e) { 163 wasInterrupted = true; 164 } 165 } 166 } finally { 167 if (wasInterrupted) { 168 Thread.currentThread().interrupt(); 169 } 170 } 171 } 172 173 @Override 174 public final V getOrThrowUninterruptibly(final long timeout, final TimeUnit unit) throws E, 175 TimeoutException { 176 try { 177 await(timeout, unit, true); // Publishes. 178 } catch (InterruptedException ignored) { 179 // Will never occur since interrupts are ignored. 180 } 181 return getOrThrow0(); 182 } 183 184 /** 185 * Signals that the asynchronous task represented by this promise has 186 * failed. If the task has already completed (i.e. {@code isDone() == true}) 187 * then calling this method has no effect and the provided result will be 188 * discarded. 189 * 190 * @param exception 191 * The exception indicating why the task failed. 192 * @see #tryHandleException(Exception) 193 */ 194 @Override 195 public final void handleException(final E exception) { 196 tryHandleException(exception); 197 } 198 199 @Override 200 public void handleRuntimeException(RuntimeException exception) { 201 setState(HAS_RUNTIME_EXCEPTION, null, null, exception); 202 } 203 204 /** 205 * Signals that the asynchronous task represented by this promise has 206 * succeeded. If the task has already completed (i.e. 207 * {@code isDone() == true}) then calling this method has no effect and the 208 * provided result will be discarded. 209 * 210 * @param result 211 * The result of the asynchronous task (may be {@code null}). 212 * @see #tryHandleResult(Object) 213 */ 214 @Override 215 public final void handleResult(final V result) { 216 tryHandleResult(result); 217 } 218 219 /** 220 * Attempts to signal that the asynchronous task represented by this promise 221 * has failed. If the task has already completed (i.e. 222 * {@code isDone() == true}) then calling this method has no effect and 223 * {@code false} is returned. 224 * <p> 225 * This method should be used in cases where multiple threads may 226 * concurrently attempt to complete a promise and need to release resources 227 * if the completion attempt fails. For example, an asynchronous TCP connect 228 * attempt may complete after a timeout has expired. In this case the 229 * connection should be immediately closed because it is never going to be 230 * used. 231 * 232 * @param exception 233 * The exception indicating why the task failed. 234 * @return {@code false} if this promise has already been completed, either 235 * due to normal termination, an exception, or cancellation (i.e. 236 * {@code isDone() == true}). 237 * @see #handleException(Exception) 238 * @see #isDone() 239 */ 240 public final boolean tryHandleException(final E exception) { 241 return setState(HAS_EXCEPTION, null, exception, null); 242 } 243 244 /** 245 * Attempts to signal that the asynchronous task represented by this promise 246 * has succeeded. If the task has already completed (i.e. 247 * {@code isDone() == true}) then calling this method has no effect and 248 * {@code false} is returned. 249 * <p> 250 * This method should be used in cases where multiple threads may 251 * concurrently attempt to complete a promise and need to release resources 252 * if the completion attempt fails. For example, an asynchronous TCP connect 253 * attempt may complete after a timeout has expired. In this case the 254 * connection should be immediately closed because it is never going to be 255 * used. 256 * 257 * @param result 258 * The result of the asynchronous task (may be {@code null}). 259 * @return {@code false} if this promise has already been completed, either 260 * due to normal termination, an exception, or cancellation (i.e. 261 * {@code isDone() == true}). 262 * @see #handleResult(Object) 263 * @see #isDone() 264 */ 265 public final boolean tryHandleResult(final V result) { 266 return setState(HAS_RESULT, result, null, null); 267 } 268 269 @Override 270 public final boolean isCancelled() { 271 return state == CANCELLED; 272 } 273 274 @Override 275 public final boolean isDone() { 276 return state != PENDING; 277 } 278 279 @Override 280 public final Promise<V, E> thenOnException(final ExceptionHandler<? super E> onException) { 281 addOrFireListener(new StateListener<V, E>() { 282 @Override 283 public void handleStateChange(final int newState, final V result, final E exception, 284 final RuntimeException runtimeException) { 285 if (newState == HAS_EXCEPTION || newState == CANCELLED) { 286 try { 287 onException.handleException(exception); 288 } catch (RuntimeException e) { 289 LOGGER.error("Ignored unexpected exception thrown by ExceptionHandler", e); 290 } 291 } 292 } 293 }); 294 return this; 295 } 296 297 @Override 298 public final Promise<V, E> thenOnResult(final ResultHandler<? super V> onResult) { 299 addOrFireListener(new StateListener<V, E>() { 300 @Override 301 public void handleStateChange(final int newState, final V result, final E exception, 302 final RuntimeException runtimeException) { 303 if (newState == HAS_RESULT) { 304 try { 305 onResult.handleResult(result); 306 } catch (RuntimeException e) { 307 LOGGER.error("Ignored unexpected exception thrown by ResultHandler", e); 308 } 309 } 310 } 311 }); 312 return this; 313 } 314 315 @Override 316 public final Promise<V, E> thenOnResultOrException(final ResultHandler<? super V> onResult, 317 final ExceptionHandler<? super E> onException) { 318 addOrFireListener(new StateListener<V, E>() { 319 @Override 320 public void handleStateChange(final int newState, final V result, final E exception, 321 final RuntimeException runtimeException) { 322 if (newState == HAS_RESULT) { 323 try { 324 onResult.handleResult(result); 325 } catch (RuntimeException e) { 326 LOGGER.error("Ignored unexpected exception thrown by ResultHandler", e); 327 } 328 } else if (newState == HAS_EXCEPTION || newState == CANCELLED) { 329 try { 330 onException.handleException(exception); 331 } catch (RuntimeException e) { 332 LOGGER.error("Ignored unexpected exception thrown by ExceptionHandler", e); 333 } 334 } 335 } 336 }); 337 return this; 338 } 339 340 @Override 341 public final Promise<V, E> thenOnResultOrException(final Runnable onResultOrException) { 342 addOrFireListener(new StateListener<V, E>() { 343 @Override 344 public void handleStateChange(final int newState, final V result, final E exception, 345 final RuntimeException runtimeException) { 346 if (newState != HAS_RUNTIME_EXCEPTION) { 347 try { 348 onResultOrException.run(); 349 } catch (RuntimeException e) { 350 LOGGER.error("Ignored unexpected exception thrown by Runnable", e); 351 } 352 } 353 } 354 }); 355 return this; 356 } 357 358 @Override 359 public final <VOUT> Promise<VOUT, E> then(final Function<? super V, VOUT, E> onResult) { 360 return then(onResult, Promises.<VOUT, E>exceptionIdempotentFunction()); 361 } 362 363 @Override 364 public <EOUT extends Exception> Promise<V, EOUT> thenCatch(final Function<? super E, V, EOUT> onException) { 365 return then(Promises.<V, EOUT>resultIdempotentFunction(), onException); 366 } 367 368 @Override 369 public Promise<V, E> thenCatchRuntimeException( 370 Function<? super RuntimeException, V, E> onRuntimeException) { 371 return then(Promises.<V, E>resultIdempotentFunction(), Promises.<V, E>exceptionIdempotentFunction(), 372 onRuntimeException); 373 } 374 375 @Override 376 public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> then( 377 final Function<? super V, VOUT, EOUT> onResult, final Function<? super E, VOUT, EOUT> onException) { 378 return then(onResult, onException, Promises.<VOUT, EOUT>runtimeExceptionIdempotentFunction()); 379 } 380 381 @Override 382 @SuppressWarnings("unchecked") 383 public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> then( 384 final Function<? super V, VOUT, EOUT> onResult, final Function<? super E, VOUT, EOUT> onException, 385 final Function<? super RuntimeException, VOUT, EOUT> onRuntimeException) { 386 final PromiseImpl<VOUT, EOUT> chained = new PromiseImpl<>(); 387 addOrFireListener(new StateListener<V, E>() { 388 @Override 389 public void handleStateChange(final int newState, final V result, final E exception, 390 final RuntimeException runtimeException) { 391 try { 392 switch (newState) { 393 case HAS_RESULT: 394 chained.handleResult(onResult.apply(result)); 395 break; 396 case HAS_EXCEPTION: 397 case CANCELLED: 398 chained.handleResult(onException.apply(exception)); 399 break; 400 case HAS_RUNTIME_EXCEPTION: 401 chained.handleResult(onRuntimeException.apply(runtimeException)); 402 break; 403 default: 404 throw new IllegalStateException("Unexpected state : " + newState); 405 } 406 } catch (final RuntimeException e) { 407 tryHandlingRuntimeException(e, chained); 408 } catch (final Exception e) { 409 chained.handleException((EOUT) e); 410 } 411 } 412 }); 413 return chained; 414 } 415 416 private <VOUT, EOUT extends Exception> void tryHandlingRuntimeException(final RuntimeException runtimeException, 417 final PromiseImpl<VOUT, EOUT> chained) { 418 try { 419 chained.handleRuntimeException(runtimeException); 420 } catch (Exception ignored) { 421 LOGGER.error("Runtime exception handler threw a RuntimeException which cannot be handled!", ignored); 422 } 423 } 424 425 @Override 426 public final Promise<V, E> thenAlways(final Runnable always) { 427 addOrFireListener(new StateListener<V, E>() { 428 @Override 429 public void handleStateChange(final int newState, final V result, final E exception, 430 final RuntimeException runtimeException) { 431 try { 432 always.run(); 433 } catch (RuntimeException e) { 434 LOGGER.error("Ignored unexpected exception thrown by Runnable", e); 435 } 436 } 437 }); 438 return this; 439 } 440 441 @Override 442 public final Promise<V, E> thenFinally(final Runnable onFinally) { 443 return thenAlways(onFinally); 444 } 445 446 @Override 447 public final <VOUT> Promise<VOUT, E> thenAsync(final AsyncFunction<? super V, VOUT, E> onResult) { 448 return thenAsync(onResult, Promises.<VOUT, E>exceptionIdempotentAsyncFunction()); 449 } 450 451 @Override 452 public final <EOUT extends Exception> Promise<V, EOUT> thenCatchAsync( 453 AsyncFunction<? super E, V, EOUT> onException) { 454 return thenAsync(Promises.<V, EOUT>resultIdempotentAsyncFunction(), onException); 455 } 456 457 @Override 458 public final Promise<V, E> thenCatchRuntimeExceptionAsync( 459 AsyncFunction<? super RuntimeException, V, E> onRuntimeException) { 460 return thenAsync(Promises.<V, E>resultIdempotentAsyncFunction(), 461 Promises.<V, E>exceptionIdempotentAsyncFunction(), onRuntimeException); 462 } 463 464 @Override 465 public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> thenAsync( 466 final AsyncFunction<? super V, VOUT, EOUT> onResult, 467 final AsyncFunction<? super E, VOUT, EOUT> onException) { 468 return thenAsync(onResult, onException, Promises.<VOUT, EOUT>runtimeExceptionIdempotentAsyncFunction()); 469 } 470 471 @Override 472 public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> thenAsync( 473 final AsyncFunction<? super V, VOUT, EOUT> onResult, 474 final AsyncFunction<? super E, VOUT, EOUT> onException, 475 final AsyncFunction<? super RuntimeException, VOUT, EOUT> onRuntimeException) { 476 final PromiseImpl<VOUT, EOUT> chained = new PromiseImpl<>(); 477 addOrFireListener(new StateListener<V, E>() { 478 @Override 479 @SuppressWarnings("unchecked") 480 public void handleStateChange(final int newState, final V result, final E exception, 481 final RuntimeException runtimeException) { 482 try { 483 switch (newState) { 484 case HAS_RESULT: 485 callNestedPromise(onResult.apply(result)); 486 break; 487 case HAS_EXCEPTION: 488 case CANCELLED: 489 callNestedPromise(onException.apply(exception)); 490 break; 491 case HAS_RUNTIME_EXCEPTION: 492 callNestedPromise(onRuntimeException.apply(runtimeException)); 493 break; 494 default: 495 throw new IllegalStateException("Unexpected state : " + newState); 496 } 497 } catch (final RuntimeException e) { 498 tryHandlingRuntimeException(e, chained); 499 } catch (final Exception e) { 500 chained.handleException((EOUT) e); 501 } 502 } 503 504 private void callNestedPromise(Promise<? extends VOUT, ? extends EOUT> nestedPromise) { 505 nestedPromise 506 .thenOnResult(chained) 507 .thenOnException(chained) 508 .thenOnRuntimeException(chained); 509 } 510 }); 511 return chained; 512 } 513 514 @Override 515 public final Promise<V, E> thenOnRuntimeException(final RuntimeExceptionHandler onRuntimeException) { 516 addOrFireListener(new StateListener<V, E>() { 517 @Override 518 public void handleStateChange(int newState, V result, E exception, RuntimeException runtimeException) { 519 if (newState == HAS_RUNTIME_EXCEPTION) { 520 try { 521 onRuntimeException.handleRuntimeException(runtimeException); 522 } catch (RuntimeException e) { 523 LOGGER.error("Ignored unexpected exception thrown by RuntimeExceptionHandler", e); 524 } 525 } 526 } 527 }); 528 return this; 529 } 530 531 /** 532 * Invoked when the client attempts to cancel the asynchronous task 533 * represented by this promise. Implementations which support cancellation 534 * should override this method to cancel the asynchronous task and, if 535 * successful, return an appropriate exception which can be used to signal 536 * that the task has failed. 537 * <p> 538 * By default cancellation is not supported and this method returns 539 * {@code null}. 540 * 541 * @param mayInterruptIfRunning 542 * {@code true} if the thread executing this task should be 543 * interrupted; otherwise, in-progress tasks are allowed to 544 * complete. 545 * @return {@code null} if cancellation was not supported or not possible, 546 * otherwise an appropriate exception. 547 */ 548 protected E tryCancel(final boolean mayInterruptIfRunning) { 549 return null; 550 } 551 552 private void addOrFireListener(final StateListener<V, E> listener) { 553 final int stateBefore = state; 554 if (stateBefore != PENDING) { 555 handleCompletion(listener, stateBefore); 556 } else { 557 listeners.add(listener); 558 final int stateAfter = state; 559 if (stateAfter != PENDING && listeners.remove(listener)) { 560 handleCompletion(listener, stateAfter); 561 } 562 } 563 } 564 565 private void handleCompletion(final StateListener<V, E> listener, final int completedState) { 566 try { 567 listener.handleStateChange(completedState, result, exception, runtimeException); 568 } catch (RuntimeException ignored) { 569 LOGGER.error("State change listener threw a RuntimeException which cannot be handled!", ignored); 570 } 571 } 572 573 private V get0() throws ExecutionException { 574 if (runtimeException != null) { 575 throw new ExecutionException(runtimeException); 576 } else if (exception != null) { 577 throw new ExecutionException(exception); 578 } else { 579 return result; 580 } 581 } 582 583 private V getOrThrow0() throws E { 584 if (runtimeException != null) { 585 throw runtimeException; 586 } else if (exception != null) { 587 throw exception; 588 } else { 589 return result; 590 } 591 } 592 593 private boolean setState(final int newState, final V result, final E exception, 594 final RuntimeException runtimeException) { 595 synchronized (this) { 596 if (state != PENDING) { 597 // Already completed. 598 return false; 599 } 600 this.result = result; 601 this.exception = exception; 602 this.runtimeException = runtimeException; 603 state = newState; // Publishes. 604 notifyAll(); // Wake up any blocked threads. 605 } 606 StateListener<V, E> listener; 607 while ((listener = listeners.poll()) != null) { 608 handleCompletion(listener, newState); 609 } 610 return true; 611 } 612 613 private void await() throws InterruptedException { 614 // Use double-check for fast-path. 615 if (state == PENDING) { 616 synchronized (this) { 617 while (state == PENDING) { 618 wait(); 619 } 620 } 621 } 622 } 623 624 private void await(final long timeout, final TimeUnit unit, final boolean isUninterruptibly) 625 throws InterruptedException, TimeoutException { 626 // Use double-check for fast-path. 627 if (state == PENDING) { 628 final long timeoutMS = unit.toMillis(timeout); 629 final long endTimeMS = System.currentTimeMillis() + timeoutMS; 630 boolean wasInterrupted = false; 631 try { 632 synchronized (this) { 633 while (state == PENDING) { 634 final long remainingTimeMS = endTimeMS - System.currentTimeMillis(); 635 if (remainingTimeMS <= 0) { 636 throw new TimeoutException(); 637 } 638 try { 639 wait(remainingTimeMS); 640 } catch (final InterruptedException e) { 641 if (isUninterruptibly) { 642 wasInterrupted = true; 643 } else { 644 throw e; 645 } 646 } 647 } 648 } 649 } finally { 650 if (wasInterrupted) { 651 Thread.currentThread().interrupt(); 652 } 653 } 654 } 655 } 656}