View Javadoc
1   /*
2    * The contents of this file are subject to the terms of the Common Development and
3    * Distribution License (the License). You may not use this file except in compliance with the
4    * License.
5    *
6    * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
7    * specific language governing permission and limitations under the License.
8    *
9    * When distributing Covered Software, include this CDDL Header Notice in each file and include
10   * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
11   * Header, with the fields enclosed by brackets [] replaced by your own identifying
12   * information: "Portions copyright [year] [name of copyright owner]".
13   *
14   * Copyright 2016 ForgeRock AS.
15   */
16  package org.forgerock.audit.handlers.splunk;
17  
18  import static org.wrensecurity.guava.common.base.Strings.isNullOrEmpty;
19  import static org.forgerock.http.handler.HttpClientHandler.OPTION_LOADER;
20  import static org.forgerock.json.resource.Responses.newResourceResponse;
21  import static org.forgerock.util.CloseSilentlyFunction.closeSilently;
22  import static org.forgerock.util.promise.Promises.newExceptionPromise;
23  
24  import java.io.IOException;
25  import java.net.URISyntaxException;
26  import java.util.UUID;
27  
28  import org.forgerock.audit.Audit;
29  import org.forgerock.audit.events.EventTopicsMetaData;
30  import org.forgerock.audit.events.handlers.AuditEventHandlerBase;
31  import org.forgerock.audit.events.handlers.buffering.BatchConsumer;
32  import org.forgerock.audit.events.handlers.buffering.BatchException;
33  import org.forgerock.audit.events.handlers.buffering.BatchPublisher;
34  import org.forgerock.audit.events.handlers.buffering.BatchPublisherFactory;
35  import org.forgerock.audit.events.handlers.buffering.BatchPublisherFactoryImpl;
36  import org.forgerock.audit.handlers.splunk.SplunkAuditEventHandlerConfiguration.BufferingConfiguration;
37  import org.forgerock.audit.handlers.splunk.SplunkAuditEventHandlerConfiguration.ConnectionConfiguration;
38  import org.forgerock.http.Client;
39  import org.forgerock.http.HttpApplicationException;
40  import org.forgerock.http.apache.async.AsyncHttpClientProvider;
41  import org.forgerock.http.handler.HttpClientHandler;
42  import org.forgerock.http.header.ContentTypeHeader;
43  import org.forgerock.http.protocol.Request;
44  import org.forgerock.http.protocol.Response;
45  import org.forgerock.http.protocol.Responses;
46  import org.forgerock.http.spi.Loader;
47  import org.forgerock.json.JsonValue;
48  import org.forgerock.json.resource.NotSupportedException;
49  import org.forgerock.json.resource.QueryRequest;
50  import org.forgerock.json.resource.QueryResourceHandler;
51  import org.forgerock.json.resource.QueryResponse;
52  import org.forgerock.json.resource.ResourceException;
53  import org.forgerock.json.resource.ResourceResponse;
54  import org.forgerock.json.resource.ServiceUnavailableException;
55  import org.forgerock.services.context.Context;
56  import org.forgerock.util.Function;
57  import org.forgerock.util.Options;
58  import org.forgerock.util.promise.Promise;
59  import org.forgerock.util.time.Duration;
60  
61  import com.fasterxml.jackson.core.JsonProcessingException;
62  import com.fasterxml.jackson.databind.ObjectMapper;
63  
64  /**
65   * Audit event handler that writes out to Splunk's HTTP event collector RAW endpoint.
66   */
67  public final class SplunkAuditEventHandler extends AuditEventHandlerBase implements BatchConsumer {
68  
69      /*
70       * Value is used to initialize the size of buffers, but if the value
71       * is too low, the buffers will automatically resize as needed.
72       */
73      private static final int BATCH_INDEX_AVERAGE_PER_EVENT_PAYLOAD_SIZE = 1280;
74  
75      /*
76       * The Elasticsearch {@link AuditEventHandler} <b>always</b> flushes
77       * events in the batch queue on shutdown or  configuration change.
78       */
79      private static final boolean ALWAYS_FLUSH_BATCH_QUEUE = true;
80  
81      /*
82       * Using {@link ObjectMapper} in favour over {@link JsonValue#toString}
83       * as this is considered to produce more reliable json.
84       */
85      private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
86  
87      private final SplunkAuditEventHandlerConfiguration configuration;
88      private final Client client;
89      private final HttpClientHandler defaultHttpClientHandler;
90      private final String channelId;
91      private final BatchPublisher batchPublisher;
92      private final String serviceUrl;
93  
94      /**
95       * Constructs a new Splunk audit event handler.
96       *
97       * @param configuration
98       *         the Splunk audit event handler configuration
99       * @param eventTopicsMetaData
100      *         topic meta data
101      * @param publisherFactory
102      *         the batch publisher factory or {@code null}
103      * @param client
104      *         HTTP client or {@code null}
105      */
106     public SplunkAuditEventHandler(
107             final SplunkAuditEventHandlerConfiguration configuration, final EventTopicsMetaData eventTopicsMetaData,
108             @Audit BatchPublisherFactory publisherFactory, final @Audit Client client) {
109         super(configuration.getName(), eventTopicsMetaData, configuration.getTopics(), configuration.isEnabled());
110 
111         this.configuration = configuration;
112         if (client == null) {
113             this.defaultHttpClientHandler = defaultHttpClientHandler();
114             this.client = new Client(defaultHttpClientHandler);
115         } else {
116             this.defaultHttpClientHandler = null;
117             this.client = client;
118         }
119         channelId = UUID.randomUUID().toString();
120 
121         final ConnectionConfiguration connection = configuration.getConnection();
122         serviceUrl = connection.isUseSSL() ? "https://" : "http://"
123                 + configuration.getConnection().getHost()
124                 + ':'
125                 + configuration.getConnection().getPort()
126                 + "/services/collector/raw";
127 
128         final BufferingConfiguration bufferingConfiguration = configuration.getBuffering();
129         final Duration writeInterval = isNullOrEmpty(bufferingConfiguration.getWriteInterval()) ? null
130                 : Duration.duration(bufferingConfiguration.getWriteInterval());
131 
132         if (publisherFactory == null) {
133             publisherFactory = new BatchPublisherFactoryImpl();
134         }
135         batchPublisher = publisherFactory.newBufferedPublisher(this)
136                 .capacity(bufferingConfiguration.getMaxSize())
137                 .writeInterval(writeInterval)
138                 .maxBatchEvents(bufferingConfiguration.getMaxBatchedEvents())
139                 .averagePerEventPayloadSize(BATCH_INDEX_AVERAGE_PER_EVENT_PAYLOAD_SIZE)
140                 .autoFlush(ALWAYS_FLUSH_BATCH_QUEUE)
141                 .build();
142     }
143 
144     @Override
145     public void startup() throws ResourceException {
146         batchPublisher.startup();
147     }
148 
149     @Override
150     public void shutdown() throws ResourceException {
151         batchPublisher.shutdown();
152         if (defaultHttpClientHandler != null) {
153             try {
154                 defaultHttpClientHandler.close();
155             } catch (IOException e) {
156                 throw ResourceException.newResourceException(ResourceException.INTERNAL_ERROR,
157                         "An error occurred while closing the default HTTP client handler", e);
158             }
159         }
160     }
161 
162     @Override
163     public Promise<ResourceResponse, ResourceException> publishEvent(final Context context,
164             final String topic, final JsonValue event) {
165         final String resourceId = event.get(ResourceResponse.FIELD_CONTENT_ID).asString();
166 
167         if (!batchPublisher.offer(topic, event)) {
168             return new ServiceUnavailableException(
169                     "Splunk batch buffer full, dropping audit event " + topic + "/" + resourceId).asPromise();
170         }
171 
172         return newResourceResponse(resourceId, null, event).asPromise();
173     }
174 
175     @Override
176     public Promise<ResourceResponse, ResourceException> readEvent(final Context context,
177             final String topic, final String resourceId) {
178         return new NotSupportedException(
179                 "Read operations are currently not supported by the Splunk handler").asPromise();
180     }
181 
182     @Override
183     public Promise<QueryResponse, ResourceException> queryEvents(final Context context,
184             final String topic, final QueryRequest query, final QueryResourceHandler handler) {
185         return new NotSupportedException(
186                 "Query operations are currently not supported by the Splunk handler").asPromise();
187     }
188 
189     @Override
190     public void addToBatch(final String topic, final JsonValue event,
191             final StringBuilder payload) throws BatchException {
192         event.put("_topic", topic);
193 
194         try {
195             final String eventJsonString = OBJECT_MAPPER.writeValueAsString(event.getObject());
196             payload.append(eventJsonString).append('\n');
197         } catch (final JsonProcessingException e) {
198             throw new BatchException("Unable to parse event object to JSON", e);
199         } finally {
200             event.remove("_topic");
201         }
202     }
203 
204     @Override
205     public Promise<Void, BatchException> publishBatch(final String payload) {
206         final Request request = new Request();
207         request.setMethod("POST");
208 
209         try {
210             request.setUri(serviceUrl);
211         } catch (URISyntaxException e) {
212             return newExceptionPromise(new BatchException("Incorrect URI " + serviceUrl, e));
213         }
214 
215         request.getHeaders().put(ContentTypeHeader.NAME, "application/json; charset=UTF-8");
216         request.getHeaders().put("Authorization", "Splunk " + configuration.getAuthzToken());
217         request.getHeaders().put("X-Splunk-Request-Channel", channelId);
218         request.setEntity(payload);
219 
220         return client.send(request).then(
221                 closeSilently(new Function<Response, Void, BatchException>() {
222 
223                     @Override
224                     public Void apply(final Response response) throws BatchException {
225                         if (!response.getStatus().isSuccessful()) {
226                             throw new BatchException("Publishing to Splunk failed: " + response.getEntity());
227                         }
228 
229                         return null;
230                     }
231 
232                 }), Responses.<Void, BatchException>noopExceptionFunction());
233     }
234 
235     private HttpClientHandler defaultHttpClientHandler() {
236         try {
237             return new HttpClientHandler(
238                     Options.defaultOptions()
239                             .set(OPTION_LOADER, new Loader() {
240                                 @Override
241                                 public <S> S load(Class<S> service, Options options) {
242                                     return service.cast(new AsyncHttpClientProvider());
243                                 }
244                             }));
245         } catch (HttpApplicationException e) {
246             throw new RuntimeException("Error while building default HTTP Client", e);
247         }
248     }
249 }