1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.forgerock.util.promise;
17
18 import java.util.Queue;
19 import java.util.concurrent.ConcurrentLinkedQueue;
20 import java.util.concurrent.ExecutionException;
21 import java.util.concurrent.TimeUnit;
22 import java.util.concurrent.TimeoutException;
23
24 import org.forgerock.util.AsyncFunction;
25 import org.forgerock.util.Function;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52 public class PromiseImpl<V, E extends Exception> implements Promise<V, E>, ResultHandler<V>,
53 ExceptionHandler<E>, RuntimeExceptionHandler {
54
55
56 private static final Logger LOGGER = LoggerFactory.getLogger(PromiseImpl.class);
57
58 private interface StateListener<V, E extends Exception> {
59 void handleStateChange(int newState, V result, E exception, RuntimeException runtimeException);
60 }
61
62
63
64
65 private static final int PENDING = 0;
66
67
68
69
70
71 private static final int HAS_RESULT = 1;
72
73
74
75
76 private static final int HAS_EXCEPTION = 2;
77
78
79
80
81 private static final int CANCELLED = 3;
82
83
84
85
86 private static final int HAS_RUNTIME_EXCEPTION = 4;
87
88
89
90
91
92
93
94
95
96
97
98
99 public static <V, E extends Exception> PromiseImpl<V, E> create() {
100 return new PromiseImpl<>();
101 }
102
103 private volatile int state = PENDING;
104 private V result = null;
105 private E exception = null;
106 private RuntimeException runtimeException = null;
107
108 private final Queue<StateListener<V, E>> listeners =
109 new ConcurrentLinkedQueue<>();
110
111
112
113
114
115 protected PromiseImpl() {
116
117 }
118
119 @Override
120 public final boolean cancel(final boolean mayInterruptIfRunning) {
121 if (isDone()) {
122
123 return false;
124 }
125 final E exception = tryCancel(mayInterruptIfRunning);
126 return exception != null && setState(CANCELLED, null, exception, null);
127 }
128
129 @Override
130 public final V get() throws InterruptedException, ExecutionException {
131 await();
132 return get0();
133 }
134
135 @Override
136 public final V get(final long timeout, final TimeUnit unit) throws InterruptedException,
137 ExecutionException, TimeoutException {
138 await(timeout, unit, false);
139 return get0();
140 }
141
142 @Override
143 public final V getOrThrow() throws InterruptedException, E {
144 await();
145 return getOrThrow0();
146 }
147
148 @Override
149 public final V getOrThrow(final long timeout, final TimeUnit unit) throws InterruptedException,
150 E, TimeoutException {
151 await(timeout, unit, false);
152 return getOrThrow0();
153 }
154
155 @Override
156 public final V getOrThrowUninterruptibly() throws E {
157 boolean wasInterrupted = false;
158 try {
159 while (true) {
160 try {
161 return getOrThrow();
162 } catch (final InterruptedException e) {
163 wasInterrupted = true;
164 }
165 }
166 } finally {
167 if (wasInterrupted) {
168 Thread.currentThread().interrupt();
169 }
170 }
171 }
172
173 @Override
174 public final V getOrThrowUninterruptibly(final long timeout, final TimeUnit unit) throws E,
175 TimeoutException {
176 try {
177 await(timeout, unit, true);
178 } catch (InterruptedException ignored) {
179
180 }
181 return getOrThrow0();
182 }
183
184
185
186
187
188
189
190
191
192
193
194 @Override
195 public final void handleException(final E exception) {
196 tryHandleException(exception);
197 }
198
199 @Override
200 public void handleRuntimeException(RuntimeException exception) {
201 setState(HAS_RUNTIME_EXCEPTION, null, null, exception);
202 }
203
204
205
206
207
208
209
210
211
212
213
214 @Override
215 public final void handleResult(final V result) {
216 tryHandleResult(result);
217 }
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240 public final boolean tryHandleException(final E exception) {
241 return setState(HAS_EXCEPTION, null, exception, null);
242 }
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265 public final boolean tryHandleResult(final V result) {
266 return setState(HAS_RESULT, result, null, null);
267 }
268
269 @Override
270 public final boolean isCancelled() {
271 return state == CANCELLED;
272 }
273
274 @Override
275 public final boolean isDone() {
276 return state != PENDING;
277 }
278
279 @Override
280 public final Promise<V, E> thenOnException(final ExceptionHandler<? super E> onException) {
281 addOrFireListener(new StateListener<V, E>() {
282 @Override
283 public void handleStateChange(final int newState, final V result, final E exception,
284 final RuntimeException runtimeException) {
285 if (newState == HAS_EXCEPTION || newState == CANCELLED) {
286 try {
287 onException.handleException(exception);
288 } catch (RuntimeException e) {
289 LOGGER.error("Ignored unexpected exception thrown by ExceptionHandler", e);
290 }
291 }
292 }
293 });
294 return this;
295 }
296
297 @Override
298 public final Promise<V, E> thenOnResult(final ResultHandler<? super V> onResult) {
299 addOrFireListener(new StateListener<V, E>() {
300 @Override
301 public void handleStateChange(final int newState, final V result, final E exception,
302 final RuntimeException runtimeException) {
303 if (newState == HAS_RESULT) {
304 try {
305 onResult.handleResult(result);
306 } catch (RuntimeException e) {
307 LOGGER.error("Ignored unexpected exception thrown by ResultHandler", e);
308 }
309 }
310 }
311 });
312 return this;
313 }
314
315 @Override
316 public final Promise<V, E> thenOnResultOrException(final ResultHandler<? super V> onResult,
317 final ExceptionHandler<? super E> onException) {
318 addOrFireListener(new StateListener<V, E>() {
319 @Override
320 public void handleStateChange(final int newState, final V result, final E exception,
321 final RuntimeException runtimeException) {
322 if (newState == HAS_RESULT) {
323 try {
324 onResult.handleResult(result);
325 } catch (RuntimeException e) {
326 LOGGER.error("Ignored unexpected exception thrown by ResultHandler", e);
327 }
328 } else if (newState == HAS_EXCEPTION || newState == CANCELLED) {
329 try {
330 onException.handleException(exception);
331 } catch (RuntimeException e) {
332 LOGGER.error("Ignored unexpected exception thrown by ExceptionHandler", e);
333 }
334 }
335 }
336 });
337 return this;
338 }
339
340 @Override
341 public final Promise<V, E> thenOnResultOrException(final Runnable onResultOrException) {
342 addOrFireListener(new StateListener<V, E>() {
343 @Override
344 public void handleStateChange(final int newState, final V result, final E exception,
345 final RuntimeException runtimeException) {
346 if (newState != HAS_RUNTIME_EXCEPTION) {
347 try {
348 onResultOrException.run();
349 } catch (RuntimeException e) {
350 LOGGER.error("Ignored unexpected exception thrown by Runnable", e);
351 }
352 }
353 }
354 });
355 return this;
356 }
357
358 @Override
359 public final <VOUT> Promise<VOUT, E> then(final Function<? super V, VOUT, E> onResult) {
360 return then(onResult, Promises.<VOUT, E>exceptionIdempotentFunction());
361 }
362
363 @Override
364 public <EOUT extends Exception> Promise<V, EOUT> thenCatch(final Function<? super E, V, EOUT> onException) {
365 return then(Promises.<V, EOUT>resultIdempotentFunction(), onException);
366 }
367
368 @Override
369 public Promise<V, E> thenCatchRuntimeException(
370 Function<? super RuntimeException, V, E> onRuntimeException) {
371 return then(Promises.<V, E>resultIdempotentFunction(), Promises.<V, E>exceptionIdempotentFunction(),
372 onRuntimeException);
373 }
374
375 @Override
376 public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> then(
377 final Function<? super V, VOUT, EOUT> onResult, final Function<? super E, VOUT, EOUT> onException) {
378 return then(onResult, onException, Promises.<VOUT, EOUT>runtimeExceptionIdempotentFunction());
379 }
380
381 @Override
382 @SuppressWarnings("unchecked")
383 public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> then(
384 final Function<? super V, VOUT, EOUT> onResult, final Function<? super E, VOUT, EOUT> onException,
385 final Function<? super RuntimeException, VOUT, EOUT> onRuntimeException) {
386 final PromiseImpl<VOUT, EOUT> chained = new PromiseImpl<>();
387 addOrFireListener(new StateListener<V, E>() {
388 @Override
389 public void handleStateChange(final int newState, final V result, final E exception,
390 final RuntimeException runtimeException) {
391 try {
392 switch (newState) {
393 case HAS_RESULT:
394 chained.handleResult(onResult.apply(result));
395 break;
396 case HAS_EXCEPTION:
397 case CANCELLED:
398 chained.handleResult(onException.apply(exception));
399 break;
400 case HAS_RUNTIME_EXCEPTION:
401 chained.handleResult(onRuntimeException.apply(runtimeException));
402 break;
403 default:
404 throw new IllegalStateException("Unexpected state : " + newState);
405 }
406 } catch (final RuntimeException e) {
407 tryHandlingRuntimeException(e, chained);
408 } catch (final Exception e) {
409 chained.handleException((EOUT) e);
410 }
411 }
412 });
413 return chained;
414 }
415
416 private <VOUT, EOUT extends Exception> void tryHandlingRuntimeException(final RuntimeException runtimeException,
417 final PromiseImpl<VOUT, EOUT> chained) {
418 try {
419 chained.handleRuntimeException(runtimeException);
420 } catch (Exception ignored) {
421 LOGGER.error("Runtime exception handler threw a RuntimeException which cannot be handled!", ignored);
422 }
423 }
424
425 @Override
426 public final Promise<V, E> thenAlways(final Runnable always) {
427 addOrFireListener(new StateListener<V, E>() {
428 @Override
429 public void handleStateChange(final int newState, final V result, final E exception,
430 final RuntimeException runtimeException) {
431 try {
432 always.run();
433 } catch (RuntimeException e) {
434 LOGGER.error("Ignored unexpected exception thrown by Runnable", e);
435 }
436 }
437 });
438 return this;
439 }
440
441 @Override
442 public final Promise<V, E> thenFinally(final Runnable onFinally) {
443 return thenAlways(onFinally);
444 }
445
446 @Override
447 public final <VOUT> Promise<VOUT, E> thenAsync(final AsyncFunction<? super V, VOUT, E> onResult) {
448 return thenAsync(onResult, Promises.<VOUT, E>exceptionIdempotentAsyncFunction());
449 }
450
451 @Override
452 public final <EOUT extends Exception> Promise<V, EOUT> thenCatchAsync(
453 AsyncFunction<? super E, V, EOUT> onException) {
454 return thenAsync(Promises.<V, EOUT>resultIdempotentAsyncFunction(), onException);
455 }
456
457 @Override
458 public final Promise<V, E> thenCatchRuntimeExceptionAsync(
459 AsyncFunction<? super RuntimeException, V, E> onRuntimeException) {
460 return thenAsync(Promises.<V, E>resultIdempotentAsyncFunction(),
461 Promises.<V, E>exceptionIdempotentAsyncFunction(), onRuntimeException);
462 }
463
464 @Override
465 public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> thenAsync(
466 final AsyncFunction<? super V, VOUT, EOUT> onResult,
467 final AsyncFunction<? super E, VOUT, EOUT> onException) {
468 return thenAsync(onResult, onException, Promises.<VOUT, EOUT>runtimeExceptionIdempotentAsyncFunction());
469 }
470
471 @Override
472 public final <VOUT, EOUT extends Exception> Promise<VOUT, EOUT> thenAsync(
473 final AsyncFunction<? super V, VOUT, EOUT> onResult,
474 final AsyncFunction<? super E, VOUT, EOUT> onException,
475 final AsyncFunction<? super RuntimeException, VOUT, EOUT> onRuntimeException) {
476 final PromiseImpl<VOUT, EOUT> chained = new PromiseImpl<>();
477 addOrFireListener(new StateListener<V, E>() {
478 @Override
479 @SuppressWarnings("unchecked")
480 public void handleStateChange(final int newState, final V result, final E exception,
481 final RuntimeException runtimeException) {
482 try {
483 switch (newState) {
484 case HAS_RESULT:
485 callNestedPromise(onResult.apply(result));
486 break;
487 case HAS_EXCEPTION:
488 case CANCELLED:
489 callNestedPromise(onException.apply(exception));
490 break;
491 case HAS_RUNTIME_EXCEPTION:
492 callNestedPromise(onRuntimeException.apply(runtimeException));
493 break;
494 default:
495 throw new IllegalStateException("Unexpected state : " + newState);
496 }
497 } catch (final RuntimeException e) {
498 tryHandlingRuntimeException(e, chained);
499 } catch (final Exception e) {
500 chained.handleException((EOUT) e);
501 }
502 }
503
504 private void callNestedPromise(Promise<? extends VOUT, ? extends EOUT> nestedPromise) {
505 nestedPromise
506 .thenOnResult(chained)
507 .thenOnException(chained)
508 .thenOnRuntimeException(chained);
509 }
510 });
511 return chained;
512 }
513
514 @Override
515 public final Promise<V, E> thenOnRuntimeException(final RuntimeExceptionHandler onRuntimeException) {
516 addOrFireListener(new StateListener<V, E>() {
517 @Override
518 public void handleStateChange(int newState, V result, E exception, RuntimeException runtimeException) {
519 if (newState == HAS_RUNTIME_EXCEPTION) {
520 try {
521 onRuntimeException.handleRuntimeException(runtimeException);
522 } catch (RuntimeException e) {
523 LOGGER.error("Ignored unexpected exception thrown by RuntimeExceptionHandler", e);
524 }
525 }
526 }
527 });
528 return this;
529 }
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548 protected E tryCancel(final boolean mayInterruptIfRunning) {
549 return null;
550 }
551
552 private void addOrFireListener(final StateListener<V, E> listener) {
553 final int stateBefore = state;
554 if (stateBefore != PENDING) {
555 handleCompletion(listener, stateBefore);
556 } else {
557 listeners.add(listener);
558 final int stateAfter = state;
559 if (stateAfter != PENDING && listeners.remove(listener)) {
560 handleCompletion(listener, stateAfter);
561 }
562 }
563 }
564
565 private void handleCompletion(final StateListener<V, E> listener, final int completedState) {
566 try {
567 listener.handleStateChange(completedState, result, exception, runtimeException);
568 } catch (RuntimeException ignored) {
569 LOGGER.error("State change listener threw a RuntimeException which cannot be handled!", ignored);
570 }
571 }
572
573 private V get0() throws ExecutionException {
574 if (runtimeException != null) {
575 throw new ExecutionException(runtimeException);
576 } else if (exception != null) {
577 throw new ExecutionException(exception);
578 } else {
579 return result;
580 }
581 }
582
583 private V getOrThrow0() throws E {
584 if (runtimeException != null) {
585 throw runtimeException;
586 } else if (exception != null) {
587 throw exception;
588 } else {
589 return result;
590 }
591 }
592
593 private boolean setState(final int newState, final V result, final E exception,
594 final RuntimeException runtimeException) {
595 synchronized (this) {
596 if (state != PENDING) {
597
598 return false;
599 }
600 this.result = result;
601 this.exception = exception;
602 this.runtimeException = runtimeException;
603 state = newState;
604 notifyAll();
605 }
606 StateListener<V, E> listener;
607 while ((listener = listeners.poll()) != null) {
608 handleCompletion(listener, newState);
609 }
610 return true;
611 }
612
613 private void await() throws InterruptedException {
614
615 if (state == PENDING) {
616 synchronized (this) {
617 while (state == PENDING) {
618 wait();
619 }
620 }
621 }
622 }
623
624 private void await(final long timeout, final TimeUnit unit, final boolean isUninterruptibly)
625 throws InterruptedException, TimeoutException {
626
627 if (state == PENDING) {
628 final long timeoutMS = unit.toMillis(timeout);
629 final long endTimeMS = System.currentTimeMillis() + timeoutMS;
630 boolean wasInterrupted = false;
631 try {
632 synchronized (this) {
633 while (state == PENDING) {
634 final long remainingTimeMS = endTimeMS - System.currentTimeMillis();
635 if (remainingTimeMS <= 0) {
636 throw new TimeoutException();
637 }
638 try {
639 wait(remainingTimeMS);
640 } catch (final InterruptedException e) {
641 if (isUninterruptibly) {
642 wasInterrupted = true;
643 } else {
644 throw e;
645 }
646 }
647 }
648 }
649 } finally {
650 if (wasInterrupted) {
651 Thread.currentThread().interrupt();
652 }
653 }
654 }
655 }
656 }