View Javadoc
1   /*
2    * The contents of this file are subject to the terms of the Common Development and
3    * Distribution License (the License). You may not use this file except in compliance with the
4    * License.
5    *
6    * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
7    * specific language governing permission and limitations under the License.
8    *
9    * When distributing Covered Software, include this CDDL Header Notice in each file and include
10   * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
11   * Header, with the fields enclosed by brackets [] replaced by your own identifying
12   * information: "Portions copyright [year] [name of copyright owner]".
13   *
14   * Copyright 2016 ForgeRock AS.
15   */
16  package org.forgerock.audit.handlers.elasticsearch;
17  
18  import static org.forgerock.audit.util.ElasticsearchUtil.OBJECT_MAPPER;
19  import static org.forgerock.http.handler.HttpClientHandler.OPTION_LOADER;
20  import static org.forgerock.json.JsonValue.field;
21  import static org.forgerock.json.JsonValue.json;
22  import static org.forgerock.json.JsonValue.object;
23  import static org.forgerock.json.resource.ResourceException.newResourceException;
24  import static org.forgerock.json.resource.ResourceResponse.FIELD_CONTENT_ID;
25  import static org.forgerock.json.resource.Responses.newQueryResponse;
26  import static org.forgerock.json.resource.Responses.newResourceResponse;
27  import static org.forgerock.util.CloseSilentlyFunction.closeSilently;
28  import static org.forgerock.util.promise.Promises.newExceptionPromise;
29  
30  import java.io.IOException;
31  import java.net.URISyntaxException;
32  import java.util.ArrayList;
33  import java.util.List;
34  
35  import org.forgerock.audit.Audit;
36  import org.forgerock.audit.events.EventTopicsMetaData;
37  import org.forgerock.audit.events.handlers.AuditEventHandler;
38  import org.forgerock.audit.events.handlers.AuditEventHandlerBase;
39  import org.forgerock.audit.events.handlers.buffering.BufferedBatchPublisher;
40  import org.forgerock.audit.handlers.elasticsearch.ElasticsearchAuditEventHandlerConfiguration.ConnectionConfiguration;
41  import org.forgerock.audit.handlers.elasticsearch.ElasticsearchAuditEventHandlerConfiguration.EventBufferingConfiguration;
42  import org.forgerock.audit.events.handlers.buffering.BatchConsumer;
43  import org.forgerock.audit.events.handlers.buffering.BatchPublisher;
44  import org.forgerock.audit.events.handlers.buffering.BatchException;
45  import org.forgerock.audit.util.ElasticsearchUtil;
46  import org.forgerock.http.Client;
47  import org.forgerock.http.HttpApplicationException;
48  import org.forgerock.http.apache.async.AsyncHttpClientProvider;
49  import org.forgerock.http.handler.HttpClientHandler;
50  import org.forgerock.http.header.ContentTypeHeader;
51  import org.forgerock.http.protocol.Request;
52  import org.forgerock.http.protocol.Response;
53  import org.forgerock.http.protocol.Responses;
54  import org.forgerock.http.spi.Loader;
55  import org.forgerock.json.JsonValue;
56  import org.forgerock.json.resource.CountPolicy;
57  import org.forgerock.json.resource.InternalServerErrorException;
58  import org.forgerock.json.resource.NotFoundException;
59  import org.forgerock.json.resource.QueryRequest;
60  import org.forgerock.json.resource.QueryResourceHandler;
61  import org.forgerock.json.resource.QueryResponse;
62  import org.forgerock.json.resource.ResourceException;
63  import org.forgerock.json.resource.ResourceResponse;
64  import org.forgerock.json.resource.ServiceUnavailableException;
65  import org.forgerock.services.context.Context;
66  import org.forgerock.util.Function;
67  import org.forgerock.util.Options;
68  import org.forgerock.util.Reject;
69  import org.forgerock.util.encode.Base64;
70  import org.forgerock.util.promise.Promise;
71  import org.forgerock.util.time.Duration;
72  import org.slf4j.Logger;
73  import org.slf4j.LoggerFactory;
74  
75  /**
76   * {@link AuditEventHandler} for Elasticsearch.
77   */
78  public class ElasticsearchAuditEventHandler extends AuditEventHandlerBase implements
79          BatchConsumer {
80  
81      private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchAuditEventHandler.class);
82      private static final ElasticsearchQueryFilterVisitor ELASTICSEARCH_QUERY_FILTER_VISITOR =
83              new ElasticsearchQueryFilterVisitor();
84  
85      private static final String QUERY = "query";
86      private static final String GET = "GET";
87      private static final String SEARCH = "/_search";
88      private static final String BULK = "/_bulk";
89      private static final String HITS = "hits";
90      private static final String SOURCE = "_source";
91      private static final int DEFAULT_PAGE_SIZE = 10;
92      private static final String TOTAL = "total";
93      private static final String PUT = "PUT";
94      private static final String POST = "POST";
95  
96      /**
97       * Average number of characters, per event, for batch indexing via Elasticsearch Bulk API. This value
98       * is used to initialize the size of buffers, but if the value is too low, the buffers will automatically resize
99       * as needed.
100      */
101     private static final int BATCH_INDEX_AVERAGE_PER_EVENT_PAYLOAD_SIZE = 1280;
102 
103     /**
104      * The Elasticsearch {@link AuditEventHandler} <b>always</b> flushes events in the batch queue on shutdown or
105      * configuration change.
106      */
107     private static final boolean ALWAYS_FLUSH_BATCH_QUEUE = true;
108     private static final int DEFAULT_OFFSET = 0;
109 
110     private final String indexName;
111     private final String basicAuthHeaderValue;
112     private final String baseUri;
113     private final String bulkUri;
114     private final ElasticsearchAuditEventHandlerConfiguration configuration;
115     private final Client client;
116     private final BatchPublisher batchIndexer;
117     private final HttpClientHandler defaultHttpClientHandler;
118 
119     /**
120      * Create a new {@code ElasticsearchAuditEventHandler} instance.
121      *
122      * @param configuration Configuration parameters that can be adjusted by system administrators.
123      * @param eventTopicsMetaData Meta-data for all audit event topics.
124      * @param client HTTP client or {@code null} to use default client.
125      */
126     public ElasticsearchAuditEventHandler(
127             final ElasticsearchAuditEventHandlerConfiguration configuration,
128             final EventTopicsMetaData eventTopicsMetaData,
129             @Audit final Client client) {
130         super(configuration.getName(), eventTopicsMetaData, configuration.getTopics(), configuration.isEnabled());
131         this.configuration = Reject.checkNotNull(configuration);
132         if (client == null) {
133             this.defaultHttpClientHandler = defaultHttpClientHandler();
134             this.client = new Client(defaultHttpClientHandler);
135         } else {
136             this.defaultHttpClientHandler = null;
137             this.client = client;
138         }
139         indexName = configuration.getIndexMapping().getIndexName();
140         basicAuthHeaderValue = buildBasicAuthHeaderValue();
141         baseUri = buildBaseUri();
142         bulkUri = buildBulkUri();
143 
144         final EventBufferingConfiguration bufferConfig = configuration.getBuffering();
145         if (bufferConfig.isEnabled()) {
146             final Duration writeInterval =
147                     bufferConfig.getWriteInterval() == null || bufferConfig.getWriteInterval().isEmpty()
148                             ? null
149                             : Duration.duration(bufferConfig.getWriteInterval());
150             batchIndexer = BufferedBatchPublisher.newBuilder(this)
151                     .capacity(bufferConfig.getMaxSize())
152                     .writeInterval(writeInterval)
153                     .maxBatchEvents(bufferConfig.getMaxBatchedEvents())
154                     .averagePerEventPayloadSize(BATCH_INDEX_AVERAGE_PER_EVENT_PAYLOAD_SIZE)
155                     .autoFlush(ALWAYS_FLUSH_BATCH_QUEUE)
156                     .build();
157         } else {
158             batchIndexer = null;
159         }
160     }
161 
162     @Override
163     public void startup() throws ResourceException {
164         if (batchIndexer != null) {
165             batchIndexer.startup();
166         }
167     }
168 
169     @Override
170     public void shutdown() throws ResourceException {
171         if (batchIndexer != null) {
172             batchIndexer.shutdown();
173         }
174         if (defaultHttpClientHandler != null) {
175             try {
176                 defaultHttpClientHandler.close();
177             } catch (IOException e) {
178                 throw ResourceException.newResourceException(ResourceException.INTERNAL_ERROR,
179                         "An error occurred while closing the default HTTP client handler", e);
180             }
181         }
182     }
183 
184     /**
185      * Queries the Elasticsearch
186      * <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search.html">Search API</a> for
187      * audit events.
188      *
189      * {@inheritDoc}
190      */
191     @Override
192     public Promise<QueryResponse, ResourceException> queryEvents(final Context context, final String topic,
193            final QueryRequest query, final QueryResourceHandler handler) {
194         final int pageSize = query.getPageSize() <= 0 ? DEFAULT_PAGE_SIZE : query.getPageSize();
195         // set the offset to either first the offset provided, or second the paged result cookie value, or finally 0
196         final int offset;
197         if (query.getPagedResultsOffset() != 0) {
198             offset = query.getPagedResultsOffset();
199         } else if (query.getPagedResultsCookie() != null) {
200             offset = Integer.valueOf(query.getPagedResultsCookie());
201         } else {
202             offset = DEFAULT_OFFSET;
203         }
204 
205         final JsonValue payload =
206                 json(object(field(
207                         QUERY, query.getQueryFilter().accept(ELASTICSEARCH_QUERY_FILTER_VISITOR, null).getObject())));
208         try {
209             final Request request = createRequest(GET, buildSearchUri(topic, pageSize, offset), payload.getObject());
210             return client.send(request).then(closeSilently(new Function<Response, QueryResponse, ResourceException>() {
211                     @Override
212                     public QueryResponse apply(Response response) throws ResourceException {
213                         if (!response.getStatus().isSuccessful()) {
214                             final String message =
215                                     "Elasticsearch response (" + indexName + "/" + topic + SEARCH + "): "
216                                     + response.getEntity();
217                             throw newResourceException(response.getStatus().getCode(), message);
218                         }
219                         try {
220                             JsonValue events = json(response.getEntity().getJson());
221                             for (JsonValue event : events.get(HITS).get(HITS)) {
222                                 handler.handleResource(
223                                         newResourceResponse(event.get(FIELD_CONTENT_ID).asString(), null,
224                                                 ElasticsearchUtil.denormalizeJson(event.get(SOURCE))));
225                             }
226                             final int totalResults = events.get(HITS).get(TOTAL).asInteger();
227                             final String pagedResultsCookie = (pageSize + offset) >= totalResults
228                                     ? null
229                                     : Integer.toString(pageSize + offset);
230                             return newQueryResponse(pagedResultsCookie,
231                                     CountPolicy.EXACT,
232                                     totalResults);
233                         } catch (IOException e) {
234                             throw new InternalServerErrorException(e.getMessage(), e);
235                         }
236                     }
237             }), Responses.<QueryResponse, ResourceException>noopExceptionFunction());
238         } catch (URISyntaxException e) {
239             return new InternalServerErrorException(e.getMessage(), e).asPromise();
240         }
241     }
242 
243     @Override
244     public Promise<ResourceResponse, ResourceException> readEvent(final Context context, final String topic,
245             final String resourceId) {
246         final Request request;
247         try {
248             request = createRequest(GET, buildEventUri(topic, resourceId), null);
249         } catch (Exception e) {
250             final String error = String.format("Unable to read audit entry for topic=%s, _id=%s", topic, resourceId);
251             LOGGER.error(error, e);
252             return new InternalServerErrorException(error, e).asPromise();
253         }
254 
255         return client.send(request).then(closeSilently(new Function<Response, ResourceResponse, ResourceException>() {
256                 @Override
257                 public ResourceResponse apply(Response response) throws ResourceException {
258                     if (!response.getStatus().isSuccessful()) {
259                         throw resourceException(indexName, topic, resourceId, response);
260                     }
261 
262                     try {
263                         // the original audit JSON is under _source, and we also add back the _id
264                         JsonValue jsonValue = json(response.getEntity().getJson());
265                         jsonValue = ElasticsearchUtil.denormalizeJson(jsonValue.get(SOURCE));
266                         jsonValue.put(FIELD_CONTENT_ID, resourceId);
267                         return newResourceResponse(resourceId, null, jsonValue);
268                     } catch (IOException e) {
269                         throw new InternalServerErrorException(e.getMessage(), e);
270                     }
271                 }
272         }), Responses.<ResourceResponse, ResourceException>noopExceptionFunction());
273     }
274 
275     @Override
276     public Promise<ResourceResponse, ResourceException> publishEvent(final Context context, final String topic,
277             final JsonValue event) {
278         if (batchIndexer == null) {
279             return publishSingleEvent(topic, event);
280         } else {
281             if (!batchIndexer.offer(topic, event)) {
282                 return new ServiceUnavailableException("Elasticsearch batch indexer full, so dropping audit event "
283                         + indexName + "/" + topic + "/" + event.get("_id").asString()).asPromise();
284             }
285             return newResourceResponse(event.get(ResourceResponse.FIELD_CONTENT_ID).asString(), null,
286                     event).asPromise();
287         }
288     }
289 
290     /**
291      * Publishes a single event to the provided topic.
292      *
293      * @param topic The topic where to publish the event.
294      * @param event The event to publish.
295      * @return a promise with either a response or an exception
296      */
297     protected Promise<ResourceResponse, ResourceException> publishSingleEvent(final String topic,
298             final JsonValue event) {
299         // _id is a protected Elasticsearch field, so read it and remove it
300         final String resourceId = event.get(FIELD_CONTENT_ID).asString();
301         event.remove(FIELD_CONTENT_ID);
302 
303         try {
304             final String jsonPayload = ElasticsearchUtil.normalizeJson(event);
305             event.put(FIELD_CONTENT_ID, resourceId);
306 
307             final Request request = createRequest(PUT, buildEventUri(topic, resourceId), jsonPayload);
308 
309             return client.send(request).then(
310                     closeSilently(new Function<Response, ResourceResponse, ResourceException>() {
311                         @Override
312                         public ResourceResponse apply(Response response) throws ResourceException {
313                             if (!response.getStatus().isSuccessful()) {
314                                 throw resourceException(indexName, topic, resourceId, response);
315                             }
316                             return newResourceResponse(event.get(ResourceResponse.FIELD_CONTENT_ID).asString(), null,
317                                     event);
318                         }
319                     }), Responses.<ResourceResponse, ResourceException>noopExceptionFunction());
320         } catch (Exception e) {
321             final String error = String.format("Unable to create audit entry for topic=%s, _id=%s", topic, resourceId);
322             LOGGER.error(error, e);
323             return new InternalServerErrorException(error, e).asPromise();
324         }
325     }
326 
327     /**
328      * Adds an audit event to an Elasticsearch Bulk API payload.
329      *
330      * @param topic Event topic
331      * @param event Event JSON payload
332      * @param payload Elasticsearch Bulk API payload
333      * @throws BatchException indicates failure to add-to-batch
334      */
335     @Override
336     public void addToBatch(final String topic, final JsonValue event, final StringBuilder payload)
337             throws BatchException {
338         try {
339             // _id is a protected Elasticsearch field
340             final String resourceId = event.get(FIELD_CONTENT_ID).asString();
341             event.remove(FIELD_CONTENT_ID);
342             final String jsonPayload = ElasticsearchUtil.normalizeJson(event);
343             event.put(FIELD_CONTENT_ID, resourceId);
344 
345             // newlines have special significance in the Bulk API
346             // https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
347             payload.append("{ \"index\" : { \"_type\" : ")
348                     .append(OBJECT_MAPPER.writeValueAsString(topic))
349                     .append(", \"_id\" : ")
350                     .append(OBJECT_MAPPER.writeValueAsString(resourceId))
351                     .append(" } }\n")
352                     .append(jsonPayload)
353                     .append('\n');
354         } catch (IOException e) {
355             throw new BatchException("Unexpected error while adding to batch", e);
356         }
357     }
358 
359     /**
360      * Publishes a <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html">Bulk API</a>
361      * payload to Elasticsearch.
362      *
363      * @param payload Elasticsearch Bulk API payload
364      */
365     @Override
366     public Promise<Void, BatchException> publishBatch(final String payload) {
367         final Request request;
368         try {
369             request = createRequest(POST, buildBulkUri(), payload);
370         } catch (URISyntaxException e) {
371             return newExceptionPromise(new BatchException("Incorrect URI", e));
372         }
373 
374         return client.send(request)
375                 .then(closeSilently(processBatchResponse()), Responses.<Void, BatchException>noopExceptionFunction());
376     }
377 
378     private Function<Response, Void, BatchException> processBatchResponse() {
379         return new Function<Response, Void, BatchException>() {
380             @Override
381             public Void apply(Response response) throws BatchException {
382                 try {
383                     if (!response.getStatus().isSuccessful()) {
384                         throw new BatchException("Elasticsearch batch index failed: " + response.getEntity());
385                     } else {
386                         final JsonValue responseJson = json(response.getEntity().getJson());
387                         if (responseJson.get("errors").asBoolean()) {
388                             // one or more batch index operations failed, so log failures
389                             final JsonValue items = responseJson.get("items");
390                             final int n = items.size();
391                             final List<Object> failureItems = new ArrayList<>(n);
392                             for (int i = 0; i < n; ++i) {
393                                 final JsonValue item = items.get(i).get("index");
394                                 final Integer status = item.get("status").asInteger();
395                                 if (status >= 400) {
396                                     failureItems.add(item);
397                                 }
398                             }
399                             final String message = "One or more Elasticsearch batch index entries failed: "
400                                     + OBJECT_MAPPER.writeValueAsString(failureItems);
401                             throw new BatchException(message);
402                         }
403                     }
404                 } catch (IOException e) {
405                     throw new BatchException("Unexpected error while publishing batch", e);
406                 }
407                 return null;
408             }
409         };
410     }
411 
412     /**
413      * Builds a basic authentication header-value, if username and password are provided in configuration.
414      *
415      * @return Basic authentication header-value or {@code null} if not configured
416      */
417     protected String buildBasicAuthHeaderValue() {
418         if (basicAuthHeaderValue != null) {
419             return basicAuthHeaderValue;
420         }
421         final ConnectionConfiguration connection = configuration.getConnection();
422         if (connection.getUsername() == null || connection.getUsername().isEmpty()
423                 || connection.getPassword() == null || connection.getPassword().isEmpty()) {
424             return null;
425         }
426         final String credentials = connection.getUsername() + ":" + connection.getPassword();
427         return "Basic " + Base64.encode(credentials.getBytes());
428     }
429 
430     /**
431      * Builds an Elasticsearch API URI for operating on a single event (e.g., index, get, etc.).
432      *
433      * @param topic Audit topic
434      * @param eventId Event ID
435      * @return URI
436      */
437     protected String buildEventUri(final String topic, final String eventId) {
438         return buildBaseUri() + "/" + topic + "/" + eventId;
439     }
440 
441     /**
442      * Builds an Elasticsearch API URI for Bulk API.
443      *
444      * @return URI
445      */
446     protected String buildBulkUri() {
447         if (bulkUri != null) {
448             return bulkUri;
449         }
450         return buildBaseUri() + BULK;
451     }
452 
453     /**
454      * Builds an Elasticsearch API URI for Search API.
455      *
456      * @param topic The audit topic to search.
457      * @param pageSize The number of results to return.
458      * @param offset The number of results to skip.
459      * @return The search uri.
460      */
461     protected String buildSearchUri(final String topic, final int pageSize, final int offset) {
462         return buildBaseUri() + "/" + topic + SEARCH + "?size=" + pageSize + "&from=" + offset;
463     }
464 
465     /**
466      * Builds an Elasticsearch API base URI. The format is,
467      * <pre>http[s]://host:port/indexName</pre>
468      *
469      * @return Base URI
470      */
471     protected String buildBaseUri() {
472         if (baseUri != null) {
473             return baseUri;
474         }
475         final ConnectionConfiguration connection = configuration.getConnection();
476         return (connection.isUseSSL() ? "https" : "http") + "://" + connection.getHost() + ":" + connection.getPort()
477                 + "/" + indexName;
478     }
479 
480     /**
481      * Gets an {@code Exception} {@link Promise} containing an Elasticsearch HTTP response status and payload.
482      *
483      * @param indexName Index name
484      * @param topic Event topic
485      * @param resourceId Event ID
486      * @param response HTTP response
487      * @return {@code Exception} {@link Promise}
488      */
489     protected static ResourceException resourceException(
490             final String indexName, final String topic, final String resourceId, final Response response) {
491         if (response.getStatus().getCode() == ResourceException.NOT_FOUND) {
492             return new NotFoundException("Object " + resourceId + " not found in " + indexName + "/" + topic);
493         }
494         final String message = "Elasticsearch response (" + indexName + "/" + topic + "/" + resourceId + "): "
495                 + response.getEntity();
496         return newResourceException(response.getStatus().getCode(), message);
497     }
498 
499     private Request createRequest(final String method, final String uri, final Object payload)
500             throws URISyntaxException {
501         final Request request = new Request();
502         request.setMethod(method);
503         request.setUri(uri);
504         if (payload != null) {
505             request.getHeaders().put(ContentTypeHeader.NAME, "application/json; charset=UTF-8");
506             request.setEntity(payload);
507         }
508         if (basicAuthHeaderValue != null) {
509             request.getHeaders().put("Authorization", basicAuthHeaderValue);
510         }
511         return request;
512     }
513 
514     private HttpClientHandler defaultHttpClientHandler() {
515         try {
516             return new HttpClientHandler(
517                     Options.defaultOptions()
518                             .set(OPTION_LOADER, new Loader() {
519                                 @Override
520                                 public <S> S load(Class<S> service, Options options) {
521                                     return service.cast(new AsyncHttpClientProvider());
522                                 }
523                             }));
524         } catch (HttpApplicationException e) {
525             throw new RuntimeException("Error while building default HTTP Client", e);
526         }
527     }
528 }