1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.forgerock.http.apache.async;
18
19 import static java.nio.channels.Channels.newChannel;
20 import static org.forgerock.http.apache.async.CloseableBufferFactory.closeableByteBufferFactory;
21 import static org.forgerock.util.Utils.closeSilently;
22
23 import java.io.IOException;
24 import java.nio.ByteBuffer;
25 import java.nio.channels.WritableByteChannel;
26 import java.util.Map;
27
28 import org.apache.http.HttpEntity;
29 import org.apache.http.HttpException;
30 import org.apache.http.HttpResponse;
31 import org.apache.http.client.methods.HttpUriRequest;
32 import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
33 import org.apache.http.nio.ContentDecoder;
34 import org.apache.http.nio.IOControl;
35 import org.apache.http.nio.client.methods.HttpAsyncMethods;
36 import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
37 import org.apache.http.protocol.HttpContext;
38 import org.forgerock.http.apache.AbstractHttpClient;
39 import org.forgerock.http.io.Buffer;
40 import org.forgerock.http.io.PipeBufferedStream;
41 import org.forgerock.http.protocol.Request;
42 import org.forgerock.http.protocol.Response;
43 import org.forgerock.http.protocol.Status;
44 import org.forgerock.util.Factory;
45 import org.forgerock.util.promise.NeverThrowsException;
46 import org.forgerock.util.promise.Promise;
47 import org.forgerock.util.promise.PromiseImpl;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
50 import org.slf4j.MDC;
51
52
53
54
55 public class AsyncHttpClient extends AbstractHttpClient {
56
57 private static final Logger logger = LoggerFactory.getLogger(AsyncHttpClient.class);
58
59 private final CloseableHttpAsyncClient client;
60 private final Factory<Buffer> storage;
61 private final CloseableBufferFactory<ByteBuffer> bufferFactory;
62
63 AsyncHttpClient(final CloseableHttpAsyncClient client, final Factory<Buffer> storage, final int threadCount) {
64
65 this.client = client;
66 this.storage = storage;
67 this.bufferFactory = closeableByteBufferFactory(threadCount, 8 * 1_024);
68 }
69
70 @Override
71 public Promise<Response, NeverThrowsException> sendAsync(final Request request) {
72
73 HttpUriRequest clientRequest = createHttpUriRequest(request);
74
75
76 final PromiseImpl<Response, NeverThrowsException> promise = PromiseImpl.create();
77
78 HttpAsyncResponseConsumer<HttpResponse> httpAsyncResponseConsumer =
79 new PromiseHttpAsyncResponseConsumer(promise, request.getUri().asURI().toASCIIString(), storage,
80 bufferFactory);
81
82
83 Map<String, String> mdc = MDC.getCopyOfContextMap();
84 if (mdc != null) {
85 httpAsyncResponseConsumer = new MdcAwareHttpAsyncResponseConsumer(httpAsyncResponseConsumer, mdc);
86 }
87
88
89 client.execute(HttpAsyncMethods.create(clientRequest), httpAsyncResponseConsumer, null);
90
91 return promise;
92 }
93
94 @Override
95 public void close() throws IOException {
96 client.close();
97 }
98
99 static final class PromiseHttpAsyncResponseConsumer implements HttpAsyncResponseConsumer<HttpResponse> {
100
101 private final PromiseImpl<Response, NeverThrowsException> promise;
102
103 private final Factory<Buffer> storage;
104 private final String uri;
105 private final CloseableBufferFactory<ByteBuffer> bufferFactory;
106
107 private Response response;
108 private WritableByteChannel channel;
109 private HttpResponse result;
110 private Exception exception;
111
112 PromiseHttpAsyncResponseConsumer(PromiseImpl<Response, NeverThrowsException> promise, String uri,
113 Factory<Buffer> storage, CloseableBufferFactory<ByteBuffer> bufferFactory) {
114 this.promise = promise;
115 this.storage = storage;
116 this.uri = uri;
117 this.bufferFactory = bufferFactory;
118 }
119
120 @Override
121 public void responseReceived(HttpResponse httpResponse) throws IOException, HttpException {
122 result = httpResponse;
123 response = createResponseWithoutEntity(httpResponse);
124
125 HttpEntity entity = httpResponse.getEntity();
126 if (entity != null) {
127 PipeBufferedStream pipe = new PipeBufferedStream(storage);
128 channel = newChannel(pipe.getIn());
129 response.getEntity().setRawContentInputStream(pipe.getOut());
130 }
131 }
132
133 @Override
134 public void consumeContent(ContentDecoder contentDecoder, IOControl ioControl) throws IOException {
135 try (CloseableBufferFactory<ByteBuffer>.CloseableBuffer buffer = bufferFactory.newInstance()) {
136 ByteBuffer byteBuffer = buffer.getBuffer();
137 while (contentDecoder.read(byteBuffer) > 0) {
138 byteBuffer.flip();
139 channel.write(byteBuffer);
140 byteBuffer.clear();
141 }
142 }
143
144 if (contentDecoder.isCompleted()) {
145 channel.close();
146 }
147 }
148
149 @Override
150 public void responseCompleted(HttpContext httpContext) {
151 promise.handleResult(response);
152 }
153
154 @Override
155 public void failed(Exception e) {
156 closeSilently(response, channel);
157 exception = e;
158 logger.trace("Failed to obtain response for {}", uri, e);
159 promise.handleResult(new Response(Status.BAD_GATEWAY).setCause(e));
160 }
161
162 @Override
163 public Exception getException() {
164 return exception;
165 }
166
167 @Override
168 public HttpResponse getResult() {
169 return result;
170 }
171
172 @Override
173 public boolean isDone() {
174 return promise.isDone();
175 }
176
177 @Override
178 public void close() throws IOException {
179 }
180
181 @Override
182 public boolean cancel() {
183 return false;
184 }
185
186 }
187
188
189
190
191
192
193 private static final class MdcAwareHttpAsyncResponseConsumer implements HttpAsyncResponseConsumer<HttpResponse> {
194
195 private final HttpAsyncResponseConsumer<HttpResponse> delegate;
196
197 private final Map<String, String> mdc;
198 MdcAwareHttpAsyncResponseConsumer(HttpAsyncResponseConsumer<HttpResponse> delegate, Map<String, String> mdc) {
199 this.delegate = delegate;
200 this.mdc = mdc;
201 }
202
203 @Override
204 public void responseReceived(HttpResponse response) throws IOException, HttpException {
205 Map<String, String> previous = MDC.getCopyOfContextMap();
206 try {
207 MDC.setContextMap(mdc);
208 delegate.responseReceived(response);
209 } finally {
210 restoreMdc(previous);
211 }
212 }
213
214 @Override
215 public void consumeContent(ContentDecoder decoder, IOControl ioctrl) throws IOException {
216 Map<String, String> previous = MDC.getCopyOfContextMap();
217 try {
218 MDC.setContextMap(mdc);
219 delegate.consumeContent(decoder, ioctrl);
220 } finally {
221 restoreMdc(previous);
222 }
223 }
224
225 @Override
226 public void responseCompleted(HttpContext context) {
227 Map<String, String> previous = MDC.getCopyOfContextMap();
228 try {
229 MDC.setContextMap(mdc);
230 delegate.responseCompleted(context);
231 } finally {
232 restoreMdc(previous);
233 }
234 }
235
236 @Override
237 public void failed(Exception ex) {
238 Map<String, String> previous = MDC.getCopyOfContextMap();
239 try {
240 MDC.setContextMap(mdc);
241 delegate.failed(ex);
242 } finally {
243 restoreMdc(previous);
244 }
245 }
246
247 @Override
248 public Exception getException() {
249 return delegate.getException();
250 }
251
252 @Override
253 public HttpResponse getResult() {
254 return delegate.getResult();
255 }
256
257 @Override
258 public boolean isDone() {
259 return delegate.isDone();
260 }
261
262 @Override
263 public void close() throws IOException {
264 Map<String, String> previous = MDC.getCopyOfContextMap();
265 try {
266 MDC.setContextMap(mdc);
267 delegate.close();
268 } finally {
269 restoreMdc(previous);
270 }
271 }
272
273 @Override
274 public boolean cancel() {
275 Map<String, String> previous = MDC.getCopyOfContextMap();
276 try {
277 MDC.setContextMap(mdc);
278 return delegate.cancel();
279 } finally {
280 restoreMdc(previous);
281 }
282 }
283
284 private void restoreMdc(Map<String, String> previous) {
285 if (previous != null) {
286 MDC.setContextMap(previous);
287 } else {
288 MDC.clear();
289 }
290 }
291 }
292 }