1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.forgerock.audit.handlers.elasticsearch;
17
18 import static org.forgerock.audit.util.ElasticsearchUtil.OBJECT_MAPPER;
19 import static org.forgerock.http.handler.HttpClientHandler.OPTION_LOADER;
20 import static org.forgerock.json.JsonValue.field;
21 import static org.forgerock.json.JsonValue.json;
22 import static org.forgerock.json.JsonValue.object;
23 import static org.forgerock.json.resource.ResourceException.newResourceException;
24 import static org.forgerock.json.resource.ResourceResponse.FIELD_CONTENT_ID;
25 import static org.forgerock.json.resource.Responses.newQueryResponse;
26 import static org.forgerock.json.resource.Responses.newResourceResponse;
27 import static org.forgerock.util.CloseSilentlyFunction.closeSilently;
28 import static org.forgerock.util.promise.Promises.newExceptionPromise;
29
30 import java.io.IOException;
31 import java.net.URISyntaxException;
32 import java.util.ArrayList;
33 import java.util.List;
34
35 import org.forgerock.audit.Audit;
36 import org.forgerock.audit.events.EventTopicsMetaData;
37 import org.forgerock.audit.events.handlers.AuditEventHandler;
38 import org.forgerock.audit.events.handlers.AuditEventHandlerBase;
39 import org.forgerock.audit.events.handlers.buffering.BufferedBatchPublisher;
40 import org.forgerock.audit.handlers.elasticsearch.ElasticsearchAuditEventHandlerConfiguration.ConnectionConfiguration;
41 import org.forgerock.audit.handlers.elasticsearch.ElasticsearchAuditEventHandlerConfiguration.EventBufferingConfiguration;
42 import org.forgerock.audit.events.handlers.buffering.BatchConsumer;
43 import org.forgerock.audit.events.handlers.buffering.BatchPublisher;
44 import org.forgerock.audit.events.handlers.buffering.BatchException;
45 import org.forgerock.audit.util.ElasticsearchUtil;
46 import org.forgerock.http.Client;
47 import org.forgerock.http.HttpApplicationException;
48 import org.forgerock.http.apache.async.AsyncHttpClientProvider;
49 import org.forgerock.http.handler.HttpClientHandler;
50 import org.forgerock.http.header.ContentTypeHeader;
51 import org.forgerock.http.protocol.Request;
52 import org.forgerock.http.protocol.Response;
53 import org.forgerock.http.protocol.Responses;
54 import org.forgerock.http.spi.Loader;
55 import org.forgerock.json.JsonValue;
56 import org.forgerock.json.resource.CountPolicy;
57 import org.forgerock.json.resource.InternalServerErrorException;
58 import org.forgerock.json.resource.NotFoundException;
59 import org.forgerock.json.resource.QueryRequest;
60 import org.forgerock.json.resource.QueryResourceHandler;
61 import org.forgerock.json.resource.QueryResponse;
62 import org.forgerock.json.resource.ResourceException;
63 import org.forgerock.json.resource.ResourceResponse;
64 import org.forgerock.json.resource.ServiceUnavailableException;
65 import org.forgerock.services.context.Context;
66 import org.forgerock.util.Function;
67 import org.forgerock.util.Options;
68 import org.forgerock.util.Reject;
69 import org.forgerock.util.encode.Base64;
70 import org.forgerock.util.promise.Promise;
71 import org.forgerock.util.time.Duration;
72 import org.slf4j.Logger;
73 import org.slf4j.LoggerFactory;
74
75
76
77
78 public class ElasticsearchAuditEventHandler extends AuditEventHandlerBase implements
79 BatchConsumer {
80
81 private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchAuditEventHandler.class);
82 private static final ElasticsearchQueryFilterVisitor ELASTICSEARCH_QUERY_FILTER_VISITOR =
83 new ElasticsearchQueryFilterVisitor();
84
85 private static final String QUERY = "query";
86 private static final String GET = "GET";
87 private static final String SEARCH = "/_search";
88 private static final String BULK = "/_bulk";
89 private static final String HITS = "hits";
90 private static final String SOURCE = "_source";
91 private static final int DEFAULT_PAGE_SIZE = 10;
92 private static final String TOTAL = "total";
93 private static final String PUT = "PUT";
94 private static final String POST = "POST";
95
96
97
98
99
100
101 private static final int BATCH_INDEX_AVERAGE_PER_EVENT_PAYLOAD_SIZE = 1280;
102
103
104
105
106
107 private static final boolean ALWAYS_FLUSH_BATCH_QUEUE = true;
108 private static final int DEFAULT_OFFSET = 0;
109
110 private final String indexName;
111 private final String basicAuthHeaderValue;
112 private final String baseUri;
113 private final String bulkUri;
114 private final ElasticsearchAuditEventHandlerConfiguration configuration;
115 private final Client client;
116 private final BatchPublisher batchIndexer;
117 private final HttpClientHandler defaultHttpClientHandler;
118
119
120
121
122
123
124
125
126 public ElasticsearchAuditEventHandler(
127 final ElasticsearchAuditEventHandlerConfiguration configuration,
128 final EventTopicsMetaData eventTopicsMetaData,
129 @Audit final Client client) {
130 super(configuration.getName(), eventTopicsMetaData, configuration.getTopics(), configuration.isEnabled());
131 this.configuration = Reject.checkNotNull(configuration);
132 if (client == null) {
133 this.defaultHttpClientHandler = defaultHttpClientHandler();
134 this.client = new Client(defaultHttpClientHandler);
135 } else {
136 this.defaultHttpClientHandler = null;
137 this.client = client;
138 }
139 indexName = configuration.getIndexMapping().getIndexName();
140 basicAuthHeaderValue = buildBasicAuthHeaderValue();
141 baseUri = buildBaseUri();
142 bulkUri = buildBulkUri();
143
144 final EventBufferingConfiguration bufferConfig = configuration.getBuffering();
145 if (bufferConfig.isEnabled()) {
146 final Duration writeInterval =
147 bufferConfig.getWriteInterval() == null || bufferConfig.getWriteInterval().isEmpty()
148 ? null
149 : Duration.duration(bufferConfig.getWriteInterval());
150 batchIndexer = BufferedBatchPublisher.newBuilder(this)
151 .capacity(bufferConfig.getMaxSize())
152 .writeInterval(writeInterval)
153 .maxBatchEvents(bufferConfig.getMaxBatchedEvents())
154 .averagePerEventPayloadSize(BATCH_INDEX_AVERAGE_PER_EVENT_PAYLOAD_SIZE)
155 .autoFlush(ALWAYS_FLUSH_BATCH_QUEUE)
156 .build();
157 } else {
158 batchIndexer = null;
159 }
160 }
161
162 @Override
163 public void startup() throws ResourceException {
164 if (batchIndexer != null) {
165 batchIndexer.startup();
166 }
167 }
168
169 @Override
170 public void shutdown() throws ResourceException {
171 if (batchIndexer != null) {
172 batchIndexer.shutdown();
173 }
174 if (defaultHttpClientHandler != null) {
175 try {
176 defaultHttpClientHandler.close();
177 } catch (IOException e) {
178 throw ResourceException.newResourceException(ResourceException.INTERNAL_ERROR,
179 "An error occurred while closing the default HTTP client handler", e);
180 }
181 }
182 }
183
184
185
186
187
188
189
190
191 @Override
192 public Promise<QueryResponse, ResourceException> queryEvents(final Context context, final String topic,
193 final QueryRequest query, final QueryResourceHandler handler) {
194 final int pageSize = query.getPageSize() <= 0 ? DEFAULT_PAGE_SIZE : query.getPageSize();
195
196 final int offset;
197 if (query.getPagedResultsOffset() != 0) {
198 offset = query.getPagedResultsOffset();
199 } else if (query.getPagedResultsCookie() != null) {
200 offset = Integer.valueOf(query.getPagedResultsCookie());
201 } else {
202 offset = DEFAULT_OFFSET;
203 }
204
205 final JsonValue payload =
206 json(object(field(
207 QUERY, query.getQueryFilter().accept(ELASTICSEARCH_QUERY_FILTER_VISITOR, null).getObject())));
208 try {
209 final Request request = createRequest(GET, buildSearchUri(topic, pageSize, offset), payload.getObject());
210 return client.send(request).then(closeSilently(new Function<Response, QueryResponse, ResourceException>() {
211 @Override
212 public QueryResponse apply(Response response) throws ResourceException {
213 if (!response.getStatus().isSuccessful()) {
214 final String message =
215 "Elasticsearch response (" + indexName + "/" + topic + SEARCH + "): "
216 + response.getEntity();
217 throw newResourceException(response.getStatus().getCode(), message);
218 }
219 try {
220 JsonValue events = json(response.getEntity().getJson());
221 for (JsonValue event : events.get(HITS).get(HITS)) {
222 handler.handleResource(
223 newResourceResponse(event.get(FIELD_CONTENT_ID).asString(), null,
224 ElasticsearchUtil.denormalizeJson(event.get(SOURCE))));
225 }
226 final int totalResults = events.get(HITS).get(TOTAL).asInteger();
227 final String pagedResultsCookie = (pageSize + offset) >= totalResults
228 ? null
229 : Integer.toString(pageSize + offset);
230 return newQueryResponse(pagedResultsCookie,
231 CountPolicy.EXACT,
232 totalResults);
233 } catch (IOException e) {
234 throw new InternalServerErrorException(e.getMessage(), e);
235 }
236 }
237 }), Responses.<QueryResponse, ResourceException>noopExceptionFunction());
238 } catch (URISyntaxException e) {
239 return new InternalServerErrorException(e.getMessage(), e).asPromise();
240 }
241 }
242
243 @Override
244 public Promise<ResourceResponse, ResourceException> readEvent(final Context context, final String topic,
245 final String resourceId) {
246 final Request request;
247 try {
248 request = createRequest(GET, buildEventUri(topic, resourceId), null);
249 } catch (Exception e) {
250 final String error = String.format("Unable to read audit entry for topic=%s, _id=%s", topic, resourceId);
251 LOGGER.error(error, e);
252 return new InternalServerErrorException(error, e).asPromise();
253 }
254
255 return client.send(request).then(closeSilently(new Function<Response, ResourceResponse, ResourceException>() {
256 @Override
257 public ResourceResponse apply(Response response) throws ResourceException {
258 if (!response.getStatus().isSuccessful()) {
259 throw resourceException(indexName, topic, resourceId, response);
260 }
261
262 try {
263
264 JsonValue jsonValue = json(response.getEntity().getJson());
265 jsonValue = ElasticsearchUtil.denormalizeJson(jsonValue.get(SOURCE));
266 jsonValue.put(FIELD_CONTENT_ID, resourceId);
267 return newResourceResponse(resourceId, null, jsonValue);
268 } catch (IOException e) {
269 throw new InternalServerErrorException(e.getMessage(), e);
270 }
271 }
272 }), Responses.<ResourceResponse, ResourceException>noopExceptionFunction());
273 }
274
275 @Override
276 public Promise<ResourceResponse, ResourceException> publishEvent(final Context context, final String topic,
277 final JsonValue event) {
278 if (batchIndexer == null) {
279 return publishSingleEvent(topic, event);
280 } else {
281 if (!batchIndexer.offer(topic, event)) {
282 return new ServiceUnavailableException("Elasticsearch batch indexer full, so dropping audit event "
283 + indexName + "/" + topic + "/" + event.get("_id").asString()).asPromise();
284 }
285 return newResourceResponse(event.get(ResourceResponse.FIELD_CONTENT_ID).asString(), null,
286 event).asPromise();
287 }
288 }
289
290
291
292
293
294
295
296
297 protected Promise<ResourceResponse, ResourceException> publishSingleEvent(final String topic,
298 final JsonValue event) {
299
300 final String resourceId = event.get(FIELD_CONTENT_ID).asString();
301 event.remove(FIELD_CONTENT_ID);
302
303 try {
304 final String jsonPayload = ElasticsearchUtil.normalizeJson(event);
305 event.put(FIELD_CONTENT_ID, resourceId);
306
307 final Request request = createRequest(PUT, buildEventUri(topic, resourceId), jsonPayload);
308
309 return client.send(request).then(
310 closeSilently(new Function<Response, ResourceResponse, ResourceException>() {
311 @Override
312 public ResourceResponse apply(Response response) throws ResourceException {
313 if (!response.getStatus().isSuccessful()) {
314 throw resourceException(indexName, topic, resourceId, response);
315 }
316 return newResourceResponse(event.get(ResourceResponse.FIELD_CONTENT_ID).asString(), null,
317 event);
318 }
319 }), Responses.<ResourceResponse, ResourceException>noopExceptionFunction());
320 } catch (Exception e) {
321 final String error = String.format("Unable to create audit entry for topic=%s, _id=%s", topic, resourceId);
322 LOGGER.error(error, e);
323 return new InternalServerErrorException(error, e).asPromise();
324 }
325 }
326
327
328
329
330
331
332
333
334
335 @Override
336 public void addToBatch(final String topic, final JsonValue event, final StringBuilder payload)
337 throws BatchException {
338 try {
339
340 final String resourceId = event.get(FIELD_CONTENT_ID).asString();
341 event.remove(FIELD_CONTENT_ID);
342 final String jsonPayload = ElasticsearchUtil.normalizeJson(event);
343 event.put(FIELD_CONTENT_ID, resourceId);
344
345
346
347 payload.append("{ \"index\" : { \"_type\" : ")
348 .append(OBJECT_MAPPER.writeValueAsString(topic))
349 .append(", \"_id\" : ")
350 .append(OBJECT_MAPPER.writeValueAsString(resourceId))
351 .append(" } }\n")
352 .append(jsonPayload)
353 .append('\n');
354 } catch (IOException e) {
355 throw new BatchException("Unexpected error while adding to batch", e);
356 }
357 }
358
359
360
361
362
363
364
365 @Override
366 public Promise<Void, BatchException> publishBatch(final String payload) {
367 final Request request;
368 try {
369 request = createRequest(POST, buildBulkUri(), payload);
370 } catch (URISyntaxException e) {
371 return newExceptionPromise(new BatchException("Incorrect URI", e));
372 }
373
374 return client.send(request)
375 .then(closeSilently(processBatchResponse()), Responses.<Void, BatchException>noopExceptionFunction());
376 }
377
378 private Function<Response, Void, BatchException> processBatchResponse() {
379 return new Function<Response, Void, BatchException>() {
380 @Override
381 public Void apply(Response response) throws BatchException {
382 try {
383 if (!response.getStatus().isSuccessful()) {
384 throw new BatchException("Elasticsearch batch index failed: " + response.getEntity());
385 } else {
386 final JsonValue responseJson = json(response.getEntity().getJson());
387 if (responseJson.get("errors").asBoolean()) {
388
389 final JsonValue items = responseJson.get("items");
390 final int n = items.size();
391 final List<Object> failureItems = new ArrayList<>(n);
392 for (int i = 0; i < n; ++i) {
393 final JsonValue item = items.get(i).get("index");
394 final Integer status = item.get("status").asInteger();
395 if (status >= 400) {
396 failureItems.add(item);
397 }
398 }
399 final String message = "One or more Elasticsearch batch index entries failed: "
400 + OBJECT_MAPPER.writeValueAsString(failureItems);
401 throw new BatchException(message);
402 }
403 }
404 } catch (IOException e) {
405 throw new BatchException("Unexpected error while publishing batch", e);
406 }
407 return null;
408 }
409 };
410 }
411
412
413
414
415
416
417 protected String buildBasicAuthHeaderValue() {
418 if (basicAuthHeaderValue != null) {
419 return basicAuthHeaderValue;
420 }
421 final ConnectionConfiguration connection = configuration.getConnection();
422 if (connection.getUsername() == null || connection.getUsername().isEmpty()
423 || connection.getPassword() == null || connection.getPassword().isEmpty()) {
424 return null;
425 }
426 final String credentials = connection.getUsername() + ":" + connection.getPassword();
427 return "Basic " + Base64.encode(credentials.getBytes());
428 }
429
430
431
432
433
434
435
436
437 protected String buildEventUri(final String topic, final String eventId) {
438 return buildBaseUri() + "/" + topic + "/" + eventId;
439 }
440
441
442
443
444
445
446 protected String buildBulkUri() {
447 if (bulkUri != null) {
448 return bulkUri;
449 }
450 return buildBaseUri() + BULK;
451 }
452
453
454
455
456
457
458
459
460
461 protected String buildSearchUri(final String topic, final int pageSize, final int offset) {
462 return buildBaseUri() + "/" + topic + SEARCH + "?size=" + pageSize + "&from=" + offset;
463 }
464
465
466
467
468
469
470
471 protected String buildBaseUri() {
472 if (baseUri != null) {
473 return baseUri;
474 }
475 final ConnectionConfiguration connection = configuration.getConnection();
476 return (connection.isUseSSL() ? "https" : "http") + "://" + connection.getHost() + ":" + connection.getPort()
477 + "/" + indexName;
478 }
479
480
481
482
483
484
485
486
487
488
489 protected static ResourceException resourceException(
490 final String indexName, final String topic, final String resourceId, final Response response) {
491 if (response.getStatus().getCode() == ResourceException.NOT_FOUND) {
492 return new NotFoundException("Object " + resourceId + " not found in " + indexName + "/" + topic);
493 }
494 final String message = "Elasticsearch response (" + indexName + "/" + topic + "/" + resourceId + "): "
495 + response.getEntity();
496 return newResourceException(response.getStatus().getCode(), message);
497 }
498
499 private Request createRequest(final String method, final String uri, final Object payload)
500 throws URISyntaxException {
501 final Request request = new Request();
502 request.setMethod(method);
503 request.setUri(uri);
504 if (payload != null) {
505 request.getHeaders().put(ContentTypeHeader.NAME, "application/json; charset=UTF-8");
506 request.setEntity(payload);
507 }
508 if (basicAuthHeaderValue != null) {
509 request.getHeaders().put("Authorization", basicAuthHeaderValue);
510 }
511 return request;
512 }
513
514 private HttpClientHandler defaultHttpClientHandler() {
515 try {
516 return new HttpClientHandler(
517 Options.defaultOptions()
518 .set(OPTION_LOADER, new Loader() {
519 @Override
520 public <S> S load(Class<S> service, Options options) {
521 return service.cast(new AsyncHttpClientProvider());
522 }
523 }));
524 } catch (HttpApplicationException e) {
525 throw new RuntimeException("Error while building default HTTP Client", e);
526 }
527 }
528 }