View Javadoc
1   /*
2    * The contents of this file are subject to the terms of the Common Development and
3    * Distribution License (the License). You may not use this file except in compliance with the
4    * License.
5    *
6    * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
7    * specific language governing permission and limitations under the License.
8    *
9    * When distributing Covered Software, include this CDDL Header Notice in each file and include
10   * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
11   * Header, with the fields enclosed by brackets [] replaced by your own identifying
12   * information: "Portions copyright [year] [name of copyright owner]".
13   *
14   * Copyright 2015-2016 ForgeRock AS.
15   */
16  
17  package org.forgerock.http.apache.async;
18  
19  import static java.nio.channels.Channels.newChannel;
20  import static org.forgerock.http.apache.async.CloseableBufferFactory.closeableByteBufferFactory;
21  import static org.forgerock.util.Utils.closeSilently;
22  
23  import java.io.IOException;
24  import java.nio.ByteBuffer;
25  import java.nio.channels.WritableByteChannel;
26  import java.util.Map;
27  
28  import org.apache.http.HttpEntity;
29  import org.apache.http.HttpException;
30  import org.apache.http.HttpResponse;
31  import org.apache.http.client.methods.HttpUriRequest;
32  import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
33  import org.apache.http.nio.ContentDecoder;
34  import org.apache.http.nio.IOControl;
35  import org.apache.http.nio.client.methods.HttpAsyncMethods;
36  import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
37  import org.apache.http.protocol.HttpContext;
38  import org.forgerock.http.apache.AbstractHttpClient;
39  import org.forgerock.http.io.Buffer;
40  import org.forgerock.http.io.PipeBufferedStream;
41  import org.forgerock.http.protocol.Request;
42  import org.forgerock.http.protocol.Response;
43  import org.forgerock.http.protocol.Status;
44  import org.forgerock.util.Factory;
45  import org.forgerock.util.promise.NeverThrowsException;
46  import org.forgerock.util.promise.Promise;
47  import org.forgerock.util.promise.PromiseImpl;
48  import org.slf4j.Logger;
49  import org.slf4j.LoggerFactory;
50  import org.slf4j.MDC;
51  
52  /**
53   * Apache HTTP Async Client based implementation.
54   */
55  public class AsyncHttpClient extends AbstractHttpClient {
56  
57      private static final Logger logger = LoggerFactory.getLogger(AsyncHttpClient.class);
58  
59      private final CloseableHttpAsyncClient client;
60      private final Factory<Buffer> storage;
61      private final CloseableBufferFactory<ByteBuffer> bufferFactory;
62  
63      AsyncHttpClient(final CloseableHttpAsyncClient client, final Factory<Buffer> storage, final int threadCount) {
64          // Client should already be started
65          this.client = client;
66          this.storage = storage;
67          this.bufferFactory = closeableByteBufferFactory(threadCount, 8 * 1_024);
68      }
69  
70      @Override
71      public Promise<Response, NeverThrowsException> sendAsync(final Request request) {
72  
73          HttpUriRequest clientRequest = createHttpUriRequest(request);
74  
75          // Send request and return the configured Promise
76          final PromiseImpl<Response, NeverThrowsException> promise = PromiseImpl.create();
77  
78          HttpAsyncResponseConsumer<HttpResponse> httpAsyncResponseConsumer =
79                  new PromiseHttpAsyncResponseConsumer(promise, request.getUri().asURI().toASCIIString(), storage,
80                          bufferFactory);
81  
82          // Copy the MDC before submitting the job
83          Map<String, String> mdc = MDC.getCopyOfContextMap();
84          if (mdc != null) {
85              httpAsyncResponseConsumer = new MdcAwareHttpAsyncResponseConsumer(httpAsyncResponseConsumer, mdc);
86          }
87  
88          // Execute
89          client.execute(HttpAsyncMethods.create(clientRequest), httpAsyncResponseConsumer, null);
90  
91          return promise;
92      }
93  
94      @Override
95      public void close() throws IOException {
96          client.close();
97      }
98  
99      static final class PromiseHttpAsyncResponseConsumer implements HttpAsyncResponseConsumer<HttpResponse> {
100 
101         private final PromiseImpl<Response, NeverThrowsException> promise;
102 
103         private final Factory<Buffer> storage;
104         private final String uri;
105         private final CloseableBufferFactory<ByteBuffer> bufferFactory;
106 
107         private Response response;
108         private WritableByteChannel channel;
109         private HttpResponse result;
110         private Exception exception;
111 
112         PromiseHttpAsyncResponseConsumer(PromiseImpl<Response, NeverThrowsException> promise, String uri,
113                 Factory<Buffer> storage, CloseableBufferFactory<ByteBuffer> bufferFactory) {
114             this.promise = promise;
115             this.storage = storage;
116             this.uri = uri;
117             this.bufferFactory = bufferFactory;
118         }
119 
120         @Override
121         public void responseReceived(HttpResponse httpResponse) throws IOException, HttpException {
122             result = httpResponse;
123             response = createResponseWithoutEntity(httpResponse);
124 
125             HttpEntity entity = httpResponse.getEntity();
126             if (entity != null) {
127                 PipeBufferedStream pipe = new PipeBufferedStream(storage);
128                 channel = newChannel(pipe.getIn());
129                 response.getEntity().setRawContentInputStream(pipe.getOut());
130             }
131         }
132 
133         @Override
134         public void consumeContent(ContentDecoder contentDecoder, IOControl ioControl) throws IOException {
135             try (CloseableBufferFactory<ByteBuffer>.CloseableBuffer buffer = bufferFactory.newInstance()) {
136                 ByteBuffer byteBuffer = buffer.getBuffer();
137                 while (contentDecoder.read(byteBuffer) > 0) {
138                     byteBuffer.flip();
139                     channel.write(byteBuffer);
140                     byteBuffer.clear();
141                 }
142             }
143 
144             if (contentDecoder.isCompleted()) {
145                 channel.close();
146             }
147         }
148 
149         @Override
150         public void responseCompleted(HttpContext httpContext) {
151             promise.handleResult(response);
152         }
153 
154         @Override
155         public void failed(Exception e) {
156             closeSilently(response, channel);
157             exception = e;
158             logger.trace("Failed to obtain response for {}", uri, e);
159             promise.handleResult(new Response(Status.BAD_GATEWAY).setCause(e));
160         }
161 
162         @Override
163         public Exception getException() {
164             return exception;
165         }
166 
167         @Override
168         public HttpResponse getResult() {
169             return result;
170         }
171 
172         @Override
173         public boolean isDone() {
174             return promise.isDone();
175         }
176 
177         @Override
178         public void close() throws IOException {
179         }
180 
181         @Override
182         public boolean cancel() {
183             return false;
184         }
185 
186     }
187 
188     /**
189      * This HttpAsyncResponseConsumer setup the MDC when the async HTTP client hand-off response processing back to the
190      * caller. In other words, all log statements appearing before this consumer is invoked will not have updated
191      * contextual information.
192      */
193     private static final class MdcAwareHttpAsyncResponseConsumer implements HttpAsyncResponseConsumer<HttpResponse> {
194 
195         private final HttpAsyncResponseConsumer<HttpResponse> delegate;
196 
197         private final Map<String, String> mdc;
198         MdcAwareHttpAsyncResponseConsumer(HttpAsyncResponseConsumer<HttpResponse> delegate, Map<String, String> mdc) {
199             this.delegate = delegate;
200             this.mdc = mdc;
201         }
202 
203         @Override
204         public void responseReceived(HttpResponse response) throws IOException, HttpException {
205             Map<String, String> previous = MDC.getCopyOfContextMap();
206             try {
207                 MDC.setContextMap(mdc);
208                 delegate.responseReceived(response);
209             } finally {
210                 restoreMdc(previous);
211             }
212         }
213 
214         @Override
215         public void consumeContent(ContentDecoder decoder, IOControl ioctrl) throws IOException {
216             Map<String, String> previous = MDC.getCopyOfContextMap();
217             try {
218                 MDC.setContextMap(mdc);
219                 delegate.consumeContent(decoder, ioctrl);
220             } finally {
221                 restoreMdc(previous);
222             }
223         }
224 
225         @Override
226         public void responseCompleted(HttpContext context) {
227             Map<String, String> previous = MDC.getCopyOfContextMap();
228             try {
229                 MDC.setContextMap(mdc);
230                 delegate.responseCompleted(context);
231             } finally {
232                 restoreMdc(previous);
233             }
234         }
235 
236         @Override
237         public void failed(Exception ex) {
238             Map<String, String> previous = MDC.getCopyOfContextMap();
239             try {
240                 MDC.setContextMap(mdc);
241                 delegate.failed(ex);
242             } finally {
243                 restoreMdc(previous);
244             }
245         }
246 
247         @Override
248         public Exception getException() {
249             return delegate.getException();
250         }
251 
252         @Override
253         public HttpResponse getResult() {
254             return delegate.getResult();
255         }
256 
257         @Override
258         public boolean isDone() {
259             return delegate.isDone();
260         }
261 
262         @Override
263         public void close() throws IOException {
264             Map<String, String> previous = MDC.getCopyOfContextMap();
265             try {
266                 MDC.setContextMap(mdc);
267                 delegate.close();
268             } finally {
269                 restoreMdc(previous);
270             }
271         }
272 
273         @Override
274         public boolean cancel() {
275             Map<String, String> previous = MDC.getCopyOfContextMap();
276             try {
277                 MDC.setContextMap(mdc);
278                 return delegate.cancel();
279             } finally {
280                 restoreMdc(previous);
281             }
282         }
283 
284         private void restoreMdc(Map<String, String> previous) {
285             if (previous != null) {
286                 MDC.setContextMap(previous);
287             } else {
288                 MDC.clear();
289             }
290         }
291     }
292 }