ElasticsearchAuditEventHandler.java

/*
 * The contents of this file are subject to the terms of the Common Development and
 * Distribution License (the License). You may not use this file except in compliance with the
 * License.
 *
 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
 * specific language governing permission and limitations under the License.
 *
 * When distributing Covered Software, include this CDDL Header Notice in each file and include
 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
 * Header, with the fields enclosed by brackets [] replaced by your own identifying
 * information: "Portions copyright [year] [name of copyright owner]".
 *
 * Copyright 2016 ForgeRock AS.
 */
package org.forgerock.audit.handlers.elasticsearch;

import static org.forgerock.audit.util.ElasticsearchUtil.OBJECT_MAPPER;
import static org.forgerock.http.handler.HttpClientHandler.OPTION_LOADER;
import static org.forgerock.json.JsonValue.field;
import static org.forgerock.json.JsonValue.json;
import static org.forgerock.json.JsonValue.object;
import static org.forgerock.json.resource.ResourceException.newResourceException;
import static org.forgerock.json.resource.ResourceResponse.FIELD_CONTENT_ID;
import static org.forgerock.json.resource.Responses.newQueryResponse;
import static org.forgerock.json.resource.Responses.newResourceResponse;
import static org.forgerock.util.CloseSilentlyFunction.closeSilently;
import static org.forgerock.util.promise.Promises.newExceptionPromise;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;

import org.forgerock.audit.Audit;
import org.forgerock.audit.events.EventTopicsMetaData;
import org.forgerock.audit.events.handlers.AuditEventHandler;
import org.forgerock.audit.events.handlers.AuditEventHandlerBase;
import org.forgerock.audit.events.handlers.buffering.BufferedBatchPublisher;
import org.forgerock.audit.handlers.elasticsearch.ElasticsearchAuditEventHandlerConfiguration.ConnectionConfiguration;
import org.forgerock.audit.handlers.elasticsearch.ElasticsearchAuditEventHandlerConfiguration.EventBufferingConfiguration;
import org.forgerock.audit.events.handlers.buffering.BatchConsumer;
import org.forgerock.audit.events.handlers.buffering.BatchPublisher;
import org.forgerock.audit.events.handlers.buffering.BatchException;
import org.forgerock.audit.util.ElasticsearchUtil;
import org.forgerock.http.Client;
import org.forgerock.http.HttpApplicationException;
import org.forgerock.http.apache.async.AsyncHttpClientProvider;
import org.forgerock.http.handler.HttpClientHandler;
import org.forgerock.http.header.ContentTypeHeader;
import org.forgerock.http.protocol.Request;
import org.forgerock.http.protocol.Response;
import org.forgerock.http.protocol.Responses;
import org.forgerock.http.spi.Loader;
import org.forgerock.json.JsonValue;
import org.forgerock.json.resource.CountPolicy;
import org.forgerock.json.resource.InternalServerErrorException;
import org.forgerock.json.resource.NotFoundException;
import org.forgerock.json.resource.QueryRequest;
import org.forgerock.json.resource.QueryResourceHandler;
import org.forgerock.json.resource.QueryResponse;
import org.forgerock.json.resource.ResourceException;
import org.forgerock.json.resource.ResourceResponse;
import org.forgerock.json.resource.ServiceUnavailableException;
import org.forgerock.services.context.Context;
import org.forgerock.util.Function;
import org.forgerock.util.Options;
import org.forgerock.util.Reject;
import org.forgerock.util.encode.Base64;
import org.forgerock.util.promise.Promise;
import org.forgerock.util.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * {@link AuditEventHandler} for Elasticsearch.
 */
public class ElasticsearchAuditEventHandler extends AuditEventHandlerBase implements
        BatchConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchAuditEventHandler.class);
    private static final ElasticsearchQueryFilterVisitor ELASTICSEARCH_QUERY_FILTER_VISITOR =
            new ElasticsearchQueryFilterVisitor();

    private static final String QUERY = "query";
    private static final String GET = "GET";
    private static final String SEARCH = "/_search";
    private static final String BULK = "/_bulk";
    private static final String HITS = "hits";
    private static final String SOURCE = "_source";
    private static final int DEFAULT_PAGE_SIZE = 10;
    private static final String TOTAL = "total";
    private static final String PUT = "PUT";
    private static final String POST = "POST";

    /**
     * Average number of characters, per event, for batch indexing via Elasticsearch Bulk API. This value
     * is used to initialize the size of buffers, but if the value is too low, the buffers will automatically resize
     * as needed.
     */
    private static final int BATCH_INDEX_AVERAGE_PER_EVENT_PAYLOAD_SIZE = 1280;

    /**
     * The Elasticsearch {@link AuditEventHandler} <b>always</b> flushes events in the batch queue on shutdown or
     * configuration change.
     */
    private static final boolean ALWAYS_FLUSH_BATCH_QUEUE = true;
    private static final int DEFAULT_OFFSET = 0;

    private final String indexName;
    private final String basicAuthHeaderValue;
    private final String baseUri;
    private final String bulkUri;
    private final ElasticsearchAuditEventHandlerConfiguration configuration;
    private final Client client;
    private final BatchPublisher batchIndexer;
    private final HttpClientHandler defaultHttpClientHandler;

    /**
     * Create a new {@code ElasticsearchAuditEventHandler} instance.
     *
     * @param configuration Configuration parameters that can be adjusted by system administrators.
     * @param eventTopicsMetaData Meta-data for all audit event topics.
     * @param client HTTP client or {@code null} to use default client.
     */
    public ElasticsearchAuditEventHandler(
            final ElasticsearchAuditEventHandlerConfiguration configuration,
            final EventTopicsMetaData eventTopicsMetaData,
            @Audit final Client client) {
        super(configuration.getName(), eventTopicsMetaData, configuration.getTopics(), configuration.isEnabled());
        this.configuration = Reject.checkNotNull(configuration);
        if (client == null) {
            this.defaultHttpClientHandler = defaultHttpClientHandler();
            this.client = new Client(defaultHttpClientHandler);
        } else {
            this.defaultHttpClientHandler = null;
            this.client = client;
        }
        indexName = configuration.getIndexMapping().getIndexName();
        basicAuthHeaderValue = buildBasicAuthHeaderValue();
        baseUri = buildBaseUri();
        bulkUri = buildBulkUri();

        final EventBufferingConfiguration bufferConfig = configuration.getBuffering();
        if (bufferConfig.isEnabled()) {
            final Duration writeInterval =
                    bufferConfig.getWriteInterval() == null || bufferConfig.getWriteInterval().isEmpty()
                            ? null
                            : Duration.duration(bufferConfig.getWriteInterval());
            batchIndexer = BufferedBatchPublisher.newBuilder(this)
                    .capacity(bufferConfig.getMaxSize())
                    .writeInterval(writeInterval)
                    .maxBatchEvents(bufferConfig.getMaxBatchedEvents())
                    .averagePerEventPayloadSize(BATCH_INDEX_AVERAGE_PER_EVENT_PAYLOAD_SIZE)
                    .autoFlush(ALWAYS_FLUSH_BATCH_QUEUE)
                    .build();
        } else {
            batchIndexer = null;
        }
    }

    @Override
    public void startup() throws ResourceException {
        if (batchIndexer != null) {
            batchIndexer.startup();
        }
    }

    @Override
    public void shutdown() throws ResourceException {
        if (batchIndexer != null) {
            batchIndexer.shutdown();
        }
        if (defaultHttpClientHandler != null) {
            try {
                defaultHttpClientHandler.close();
            } catch (IOException e) {
                throw ResourceException.newResourceException(ResourceException.INTERNAL_ERROR,
                        "An error occurred while closing the default HTTP client handler", e);
            }
        }
    }

    /**
     * Queries the Elasticsearch
     * <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search.html">Search API</a> for
     * audit events.
     *
     * {@inheritDoc}
     */
    @Override
    public Promise<QueryResponse, ResourceException> queryEvents(final Context context, final String topic,
           final QueryRequest query, final QueryResourceHandler handler) {
        final int pageSize = query.getPageSize() <= 0 ? DEFAULT_PAGE_SIZE : query.getPageSize();
        // set the offset to either first the offset provided, or second the paged result cookie value, or finally 0
        final int offset;
        if (query.getPagedResultsOffset() != 0) {
            offset = query.getPagedResultsOffset();
        } else if (query.getPagedResultsCookie() != null) {
            offset = Integer.valueOf(query.getPagedResultsCookie());
        } else {
            offset = DEFAULT_OFFSET;
        }

        final JsonValue payload =
                json(object(field(
                        QUERY, query.getQueryFilter().accept(ELASTICSEARCH_QUERY_FILTER_VISITOR, null).getObject())));
        try {
            final Request request = createRequest(GET, buildSearchUri(topic, pageSize, offset), payload.getObject());
            return client.send(request).then(closeSilently(new Function<Response, QueryResponse, ResourceException>() {
                    @Override
                    public QueryResponse apply(Response response) throws ResourceException {
                        if (!response.getStatus().isSuccessful()) {
                            final String message =
                                    "Elasticsearch response (" + indexName + "/" + topic + SEARCH + "): "
                                    + response.getEntity();
                            throw newResourceException(response.getStatus().getCode(), message);
                        }
                        try {
                            JsonValue events = json(response.getEntity().getJson());
                            for (JsonValue event : events.get(HITS).get(HITS)) {
                                handler.handleResource(
                                        newResourceResponse(event.get(FIELD_CONTENT_ID).asString(), null,
                                                ElasticsearchUtil.denormalizeJson(event.get(SOURCE))));
                            }
                            final int totalResults = events.get(HITS).get(TOTAL).asInteger();
                            final String pagedResultsCookie = (pageSize + offset) >= totalResults
                                    ? null
                                    : Integer.toString(pageSize + offset);
                            return newQueryResponse(pagedResultsCookie,
                                    CountPolicy.EXACT,
                                    totalResults);
                        } catch (IOException e) {
                            throw new InternalServerErrorException(e.getMessage(), e);
                        }
                    }
            }), Responses.<QueryResponse, ResourceException>noopExceptionFunction());
        } catch (URISyntaxException e) {
            return new InternalServerErrorException(e.getMessage(), e).asPromise();
        }
    }

    @Override
    public Promise<ResourceResponse, ResourceException> readEvent(final Context context, final String topic,
            final String resourceId) {
        final Request request;
        try {
            request = createRequest(GET, buildEventUri(topic, resourceId), null);
        } catch (Exception e) {
            final String error = String.format("Unable to read audit entry for topic=%s, _id=%s", topic, resourceId);
            LOGGER.error(error, e);
            return new InternalServerErrorException(error, e).asPromise();
        }

        return client.send(request).then(closeSilently(new Function<Response, ResourceResponse, ResourceException>() {
                @Override
                public ResourceResponse apply(Response response) throws ResourceException {
                    if (!response.getStatus().isSuccessful()) {
                        throw resourceException(indexName, topic, resourceId, response);
                    }

                    try {
                        // the original audit JSON is under _source, and we also add back the _id
                        JsonValue jsonValue = json(response.getEntity().getJson());
                        jsonValue = ElasticsearchUtil.denormalizeJson(jsonValue.get(SOURCE));
                        jsonValue.put(FIELD_CONTENT_ID, resourceId);
                        return newResourceResponse(resourceId, null, jsonValue);
                    } catch (IOException e) {
                        throw new InternalServerErrorException(e.getMessage(), e);
                    }
                }
        }), Responses.<ResourceResponse, ResourceException>noopExceptionFunction());
    }

    @Override
    public Promise<ResourceResponse, ResourceException> publishEvent(final Context context, final String topic,
            final JsonValue event) {
        if (batchIndexer == null) {
            return publishSingleEvent(topic, event);
        } else {
            if (!batchIndexer.offer(topic, event)) {
                return new ServiceUnavailableException("Elasticsearch batch indexer full, so dropping audit event "
                        + indexName + "/" + topic + "/" + event.get("_id").asString()).asPromise();
            }
            return newResourceResponse(event.get(ResourceResponse.FIELD_CONTENT_ID).asString(), null,
                    event).asPromise();
        }
    }

    /**
     * Publishes a single event to the provided topic.
     *
     * @param topic The topic where to publish the event.
     * @param event The event to publish.
     * @return a promise with either a response or an exception
     */
    protected Promise<ResourceResponse, ResourceException> publishSingleEvent(final String topic,
            final JsonValue event) {
        // _id is a protected Elasticsearch field, so read it and remove it
        final String resourceId = event.get(FIELD_CONTENT_ID).asString();
        event.remove(FIELD_CONTENT_ID);

        try {
            final String jsonPayload = ElasticsearchUtil.normalizeJson(event);
            event.put(FIELD_CONTENT_ID, resourceId);

            final Request request = createRequest(PUT, buildEventUri(topic, resourceId), jsonPayload);

            return client.send(request).then(
                    closeSilently(new Function<Response, ResourceResponse, ResourceException>() {
                        @Override
                        public ResourceResponse apply(Response response) throws ResourceException {
                            if (!response.getStatus().isSuccessful()) {
                                throw resourceException(indexName, topic, resourceId, response);
                            }
                            return newResourceResponse(event.get(ResourceResponse.FIELD_CONTENT_ID).asString(), null,
                                    event);
                        }
                    }), Responses.<ResourceResponse, ResourceException>noopExceptionFunction());
        } catch (Exception e) {
            final String error = String.format("Unable to create audit entry for topic=%s, _id=%s", topic, resourceId);
            LOGGER.error(error, e);
            return new InternalServerErrorException(error, e).asPromise();
        }
    }

    /**
     * Adds an audit event to an Elasticsearch Bulk API payload.
     *
     * @param topic Event topic
     * @param event Event JSON payload
     * @param payload Elasticsearch Bulk API payload
     * @throws BatchException indicates failure to add-to-batch
     */
    @Override
    public void addToBatch(final String topic, final JsonValue event, final StringBuilder payload)
            throws BatchException {
        try {
            // _id is a protected Elasticsearch field
            final String resourceId = event.get(FIELD_CONTENT_ID).asString();
            event.remove(FIELD_CONTENT_ID);
            final String jsonPayload = ElasticsearchUtil.normalizeJson(event);
            event.put(FIELD_CONTENT_ID, resourceId);

            // newlines have special significance in the Bulk API
            // https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
            payload.append("{ \"index\" : { \"_type\" : ")
                    .append(OBJECT_MAPPER.writeValueAsString(topic))
                    .append(", \"_id\" : ")
                    .append(OBJECT_MAPPER.writeValueAsString(resourceId))
                    .append(" } }\n")
                    .append(jsonPayload)
                    .append('\n');
        } catch (IOException e) {
            throw new BatchException("Unexpected error while adding to batch", e);
        }
    }

    /**
     * Publishes a <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html">Bulk API</a>
     * payload to Elasticsearch.
     *
     * @param payload Elasticsearch Bulk API payload
     */
    @Override
    public Promise<Void, BatchException> publishBatch(final String payload) {
        final Request request;
        try {
            request = createRequest(POST, buildBulkUri(), payload);
        } catch (URISyntaxException e) {
            return newExceptionPromise(new BatchException("Incorrect URI", e));
        }

        return client.send(request)
                .then(closeSilently(processBatchResponse()), Responses.<Void, BatchException>noopExceptionFunction());
    }

    private Function<Response, Void, BatchException> processBatchResponse() {
        return new Function<Response, Void, BatchException>() {
            @Override
            public Void apply(Response response) throws BatchException {
                try {
                    if (!response.getStatus().isSuccessful()) {
                        throw new BatchException("Elasticsearch batch index failed: " + response.getEntity());
                    } else {
                        final JsonValue responseJson = json(response.getEntity().getJson());
                        if (responseJson.get("errors").asBoolean()) {
                            // one or more batch index operations failed, so log failures
                            final JsonValue items = responseJson.get("items");
                            final int n = items.size();
                            final List<Object> failureItems = new ArrayList<>(n);
                            for (int i = 0; i < n; ++i) {
                                final JsonValue item = items.get(i).get("index");
                                final Integer status = item.get("status").asInteger();
                                if (status >= 400) {
                                    failureItems.add(item);
                                }
                            }
                            final String message = "One or more Elasticsearch batch index entries failed: "
                                    + OBJECT_MAPPER.writeValueAsString(failureItems);
                            throw new BatchException(message);
                        }
                    }
                } catch (IOException e) {
                    throw new BatchException("Unexpected error while publishing batch", e);
                }
                return null;
            }
        };
    }

    /**
     * Builds a basic authentication header-value, if username and password are provided in configuration.
     *
     * @return Basic authentication header-value or {@code null} if not configured
     */
    protected String buildBasicAuthHeaderValue() {
        if (basicAuthHeaderValue != null) {
            return basicAuthHeaderValue;
        }
        final ConnectionConfiguration connection = configuration.getConnection();
        if (connection.getUsername() == null || connection.getUsername().isEmpty()
                || connection.getPassword() == null || connection.getPassword().isEmpty()) {
            return null;
        }
        final String credentials = connection.getUsername() + ":" + connection.getPassword();
        return "Basic " + Base64.encode(credentials.getBytes());
    }

    /**
     * Builds an Elasticsearch API URI for operating on a single event (e.g., index, get, etc.).
     *
     * @param topic Audit topic
     * @param eventId Event ID
     * @return URI
     */
    protected String buildEventUri(final String topic, final String eventId) {
        return buildBaseUri() + "/" + topic + "/" + eventId;
    }

    /**
     * Builds an Elasticsearch API URI for Bulk API.
     *
     * @return URI
     */
    protected String buildBulkUri() {
        if (bulkUri != null) {
            return bulkUri;
        }
        return buildBaseUri() + BULK;
    }

    /**
     * Builds an Elasticsearch API URI for Search API.
     *
     * @param topic The audit topic to search.
     * @param pageSize The number of results to return.
     * @param offset The number of results to skip.
     * @return The search uri.
     */
    protected String buildSearchUri(final String topic, final int pageSize, final int offset) {
        return buildBaseUri() + "/" + topic + SEARCH + "?size=" + pageSize + "&from=" + offset;
    }

    /**
     * Builds an Elasticsearch API base URI. The format is,
     * <pre>http[s]://host:port/indexName</pre>
     *
     * @return Base URI
     */
    protected String buildBaseUri() {
        if (baseUri != null) {
            return baseUri;
        }
        final ConnectionConfiguration connection = configuration.getConnection();
        return (connection.isUseSSL() ? "https" : "http") + "://" + connection.getHost() + ":" + connection.getPort()
                + "/" + indexName;
    }

    /**
     * Gets an {@code Exception} {@link Promise} containing an Elasticsearch HTTP response status and payload.
     *
     * @param indexName Index name
     * @param topic Event topic
     * @param resourceId Event ID
     * @param response HTTP response
     * @return {@code Exception} {@link Promise}
     */
    protected static ResourceException resourceException(
            final String indexName, final String topic, final String resourceId, final Response response) {
        if (response.getStatus().getCode() == ResourceException.NOT_FOUND) {
            return new NotFoundException("Object " + resourceId + " not found in " + indexName + "/" + topic);
        }
        final String message = "Elasticsearch response (" + indexName + "/" + topic + "/" + resourceId + "): "
                + response.getEntity();
        return newResourceException(response.getStatus().getCode(), message);
    }

    private Request createRequest(final String method, final String uri, final Object payload)
            throws URISyntaxException {
        final Request request = new Request();
        request.setMethod(method);
        request.setUri(uri);
        if (payload != null) {
            request.getHeaders().put(ContentTypeHeader.NAME, "application/json; charset=UTF-8");
            request.setEntity(payload);
        }
        if (basicAuthHeaderValue != null) {
            request.getHeaders().put("Authorization", basicAuthHeaderValue);
        }
        return request;
    }

    private HttpClientHandler defaultHttpClientHandler() {
        try {
            return new HttpClientHandler(
                    Options.defaultOptions()
                            .set(OPTION_LOADER, new Loader() {
                                @Override
                                public <S> S load(Class<S> service, Options options) {
                                    return service.cast(new AsyncHttpClientProvider());
                                }
                            }));
        } catch (HttpApplicationException e) {
            throw new RuntimeException("Error while building default HTTP Client", e);
        }
    }
}