SplunkAuditEventHandler.java
/*
* The contents of this file are subject to the terms of the Common Development and
* Distribution License (the License). You may not use this file except in compliance with the
* License.
*
* You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
* specific language governing permission and limitations under the License.
*
* When distributing Covered Software, include this CDDL Header Notice in each file and include
* the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
* Header, with the fields enclosed by brackets [] replaced by your own identifying
* information: "Portions copyright [year] [name of copyright owner]".
*
* Copyright 2016 ForgeRock AS.
*/
package org.forgerock.audit.handlers.splunk;
import static org.wrensecurity.guava.common.base.Strings.isNullOrEmpty;
import static org.forgerock.http.handler.HttpClientHandler.OPTION_LOADER;
import static org.forgerock.json.resource.Responses.newResourceResponse;
import static org.forgerock.util.CloseSilentlyFunction.closeSilently;
import static org.forgerock.util.promise.Promises.newExceptionPromise;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.UUID;
import org.forgerock.audit.Audit;
import org.forgerock.audit.events.EventTopicsMetaData;
import org.forgerock.audit.events.handlers.AuditEventHandlerBase;
import org.forgerock.audit.events.handlers.buffering.BatchConsumer;
import org.forgerock.audit.events.handlers.buffering.BatchException;
import org.forgerock.audit.events.handlers.buffering.BatchPublisher;
import org.forgerock.audit.events.handlers.buffering.BatchPublisherFactory;
import org.forgerock.audit.events.handlers.buffering.BatchPublisherFactoryImpl;
import org.forgerock.audit.handlers.splunk.SplunkAuditEventHandlerConfiguration.BufferingConfiguration;
import org.forgerock.audit.handlers.splunk.SplunkAuditEventHandlerConfiguration.ConnectionConfiguration;
import org.forgerock.http.Client;
import org.forgerock.http.HttpApplicationException;
import org.forgerock.http.apache.async.AsyncHttpClientProvider;
import org.forgerock.http.handler.HttpClientHandler;
import org.forgerock.http.header.ContentTypeHeader;
import org.forgerock.http.protocol.Request;
import org.forgerock.http.protocol.Response;
import org.forgerock.http.protocol.Responses;
import org.forgerock.http.spi.Loader;
import org.forgerock.json.JsonValue;
import org.forgerock.json.resource.NotSupportedException;
import org.forgerock.json.resource.QueryRequest;
import org.forgerock.json.resource.QueryResourceHandler;
import org.forgerock.json.resource.QueryResponse;
import org.forgerock.json.resource.ResourceException;
import org.forgerock.json.resource.ResourceResponse;
import org.forgerock.json.resource.ServiceUnavailableException;
import org.forgerock.services.context.Context;
import org.forgerock.util.Function;
import org.forgerock.util.Options;
import org.forgerock.util.promise.Promise;
import org.forgerock.util.time.Duration;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
/**
* Audit event handler that writes out to Splunk's HTTP event collector RAW endpoint.
*/
public final class SplunkAuditEventHandler extends AuditEventHandlerBase implements BatchConsumer {
/*
* Value is used to initialize the size of buffers, but if the value
* is too low, the buffers will automatically resize as needed.
*/
private static final int BATCH_INDEX_AVERAGE_PER_EVENT_PAYLOAD_SIZE = 1280;
/*
* The Elasticsearch {@link AuditEventHandler} <b>always</b> flushes
* events in the batch queue on shutdown or configuration change.
*/
private static final boolean ALWAYS_FLUSH_BATCH_QUEUE = true;
/*
* Using {@link ObjectMapper} in favour over {@link JsonValue#toString}
* as this is considered to produce more reliable json.
*/
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final SplunkAuditEventHandlerConfiguration configuration;
private final Client client;
private final HttpClientHandler defaultHttpClientHandler;
private final String channelId;
private final BatchPublisher batchPublisher;
private final String serviceUrl;
/**
* Constructs a new Splunk audit event handler.
*
* @param configuration
* the Splunk audit event handler configuration
* @param eventTopicsMetaData
* topic meta data
* @param publisherFactory
* the batch publisher factory or {@code null}
* @param client
* HTTP client or {@code null}
*/
public SplunkAuditEventHandler(
final SplunkAuditEventHandlerConfiguration configuration, final EventTopicsMetaData eventTopicsMetaData,
@Audit BatchPublisherFactory publisherFactory, final @Audit Client client) {
super(configuration.getName(), eventTopicsMetaData, configuration.getTopics(), configuration.isEnabled());
this.configuration = configuration;
if (client == null) {
this.defaultHttpClientHandler = defaultHttpClientHandler();
this.client = new Client(defaultHttpClientHandler);
} else {
this.defaultHttpClientHandler = null;
this.client = client;
}
channelId = UUID.randomUUID().toString();
final ConnectionConfiguration connection = configuration.getConnection();
serviceUrl = connection.isUseSSL() ? "https://" : "http://"
+ configuration.getConnection().getHost()
+ ':'
+ configuration.getConnection().getPort()
+ "/services/collector/raw";
final BufferingConfiguration bufferingConfiguration = configuration.getBuffering();
final Duration writeInterval = isNullOrEmpty(bufferingConfiguration.getWriteInterval()) ? null
: Duration.duration(bufferingConfiguration.getWriteInterval());
if (publisherFactory == null) {
publisherFactory = new BatchPublisherFactoryImpl();
}
batchPublisher = publisherFactory.newBufferedPublisher(this)
.capacity(bufferingConfiguration.getMaxSize())
.writeInterval(writeInterval)
.maxBatchEvents(bufferingConfiguration.getMaxBatchedEvents())
.averagePerEventPayloadSize(BATCH_INDEX_AVERAGE_PER_EVENT_PAYLOAD_SIZE)
.autoFlush(ALWAYS_FLUSH_BATCH_QUEUE)
.build();
}
@Override
public void startup() throws ResourceException {
batchPublisher.startup();
}
@Override
public void shutdown() throws ResourceException {
batchPublisher.shutdown();
if (defaultHttpClientHandler != null) {
try {
defaultHttpClientHandler.close();
} catch (IOException e) {
throw ResourceException.newResourceException(ResourceException.INTERNAL_ERROR,
"An error occurred while closing the default HTTP client handler", e);
}
}
}
@Override
public Promise<ResourceResponse, ResourceException> publishEvent(final Context context,
final String topic, final JsonValue event) {
final String resourceId = event.get(ResourceResponse.FIELD_CONTENT_ID).asString();
if (!batchPublisher.offer(topic, event)) {
return new ServiceUnavailableException(
"Splunk batch buffer full, dropping audit event " + topic + "/" + resourceId).asPromise();
}
return newResourceResponse(resourceId, null, event).asPromise();
}
@Override
public Promise<ResourceResponse, ResourceException> readEvent(final Context context,
final String topic, final String resourceId) {
return new NotSupportedException(
"Read operations are currently not supported by the Splunk handler").asPromise();
}
@Override
public Promise<QueryResponse, ResourceException> queryEvents(final Context context,
final String topic, final QueryRequest query, final QueryResourceHandler handler) {
return new NotSupportedException(
"Query operations are currently not supported by the Splunk handler").asPromise();
}
@Override
public void addToBatch(final String topic, final JsonValue event,
final StringBuilder payload) throws BatchException {
event.put("_topic", topic);
try {
final String eventJsonString = OBJECT_MAPPER.writeValueAsString(event.getObject());
payload.append(eventJsonString).append('\n');
} catch (final JsonProcessingException e) {
throw new BatchException("Unable to parse event object to JSON", e);
} finally {
event.remove("_topic");
}
}
@Override
public Promise<Void, BatchException> publishBatch(final String payload) {
final Request request = new Request();
request.setMethod("POST");
try {
request.setUri(serviceUrl);
} catch (URISyntaxException e) {
return newExceptionPromise(new BatchException("Incorrect URI " + serviceUrl, e));
}
request.getHeaders().put(ContentTypeHeader.NAME, "application/json; charset=UTF-8");
request.getHeaders().put("Authorization", "Splunk " + configuration.getAuthzToken());
request.getHeaders().put("X-Splunk-Request-Channel", channelId);
request.setEntity(payload);
return client.send(request).then(
closeSilently(new Function<Response, Void, BatchException>() {
@Override
public Void apply(final Response response) throws BatchException {
if (!response.getStatus().isSuccessful()) {
throw new BatchException("Publishing to Splunk failed: " + response.getEntity());
}
return null;
}
}), Responses.<Void, BatchException>noopExceptionFunction());
}
private HttpClientHandler defaultHttpClientHandler() {
try {
return new HttpClientHandler(
Options.defaultOptions()
.set(OPTION_LOADER, new Loader() {
@Override
public <S> S load(Class<S> service, Options options) {
return service.cast(new AsyncHttpClientProvider());
}
}));
} catch (HttpApplicationException e) {
throw new RuntimeException("Error while building default HTTP Client", e);
}
}
}