1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.forgerock.audit.handlers.splunk;
17
18 import static org.wrensecurity.guava.common.base.Strings.isNullOrEmpty;
19 import static org.forgerock.http.handler.HttpClientHandler.OPTION_LOADER;
20 import static org.forgerock.json.resource.Responses.newResourceResponse;
21 import static org.forgerock.util.CloseSilentlyFunction.closeSilently;
22 import static org.forgerock.util.promise.Promises.newExceptionPromise;
23
24 import java.io.IOException;
25 import java.net.URISyntaxException;
26 import java.util.UUID;
27
28 import org.forgerock.audit.Audit;
29 import org.forgerock.audit.events.EventTopicsMetaData;
30 import org.forgerock.audit.events.handlers.AuditEventHandlerBase;
31 import org.forgerock.audit.events.handlers.buffering.BatchConsumer;
32 import org.forgerock.audit.events.handlers.buffering.BatchException;
33 import org.forgerock.audit.events.handlers.buffering.BatchPublisher;
34 import org.forgerock.audit.events.handlers.buffering.BatchPublisherFactory;
35 import org.forgerock.audit.events.handlers.buffering.BatchPublisherFactoryImpl;
36 import org.forgerock.audit.handlers.splunk.SplunkAuditEventHandlerConfiguration.BufferingConfiguration;
37 import org.forgerock.audit.handlers.splunk.SplunkAuditEventHandlerConfiguration.ConnectionConfiguration;
38 import org.forgerock.http.Client;
39 import org.forgerock.http.HttpApplicationException;
40 import org.forgerock.http.apache.async.AsyncHttpClientProvider;
41 import org.forgerock.http.handler.HttpClientHandler;
42 import org.forgerock.http.header.ContentTypeHeader;
43 import org.forgerock.http.protocol.Request;
44 import org.forgerock.http.protocol.Response;
45 import org.forgerock.http.protocol.Responses;
46 import org.forgerock.http.spi.Loader;
47 import org.forgerock.json.JsonValue;
48 import org.forgerock.json.resource.NotSupportedException;
49 import org.forgerock.json.resource.QueryRequest;
50 import org.forgerock.json.resource.QueryResourceHandler;
51 import org.forgerock.json.resource.QueryResponse;
52 import org.forgerock.json.resource.ResourceException;
53 import org.forgerock.json.resource.ResourceResponse;
54 import org.forgerock.json.resource.ServiceUnavailableException;
55 import org.forgerock.services.context.Context;
56 import org.forgerock.util.Function;
57 import org.forgerock.util.Options;
58 import org.forgerock.util.promise.Promise;
59 import org.forgerock.util.time.Duration;
60
61 import com.fasterxml.jackson.core.JsonProcessingException;
62 import com.fasterxml.jackson.databind.ObjectMapper;
63
64
65
66
67 public final class SplunkAuditEventHandler extends AuditEventHandlerBase implements BatchConsumer {
68
69
70
71
72
73 private static final int BATCH_INDEX_AVERAGE_PER_EVENT_PAYLOAD_SIZE = 1280;
74
75
76
77
78
79 private static final boolean ALWAYS_FLUSH_BATCH_QUEUE = true;
80
81
82
83
84
85 private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
86
87 private final SplunkAuditEventHandlerConfiguration configuration;
88 private final Client client;
89 private final HttpClientHandler defaultHttpClientHandler;
90 private final String channelId;
91 private final BatchPublisher batchPublisher;
92 private final String serviceUrl;
93
94
95
96
97
98
99
100
101
102
103
104
105
106 public SplunkAuditEventHandler(
107 final SplunkAuditEventHandlerConfiguration configuration, final EventTopicsMetaData eventTopicsMetaData,
108 @Audit BatchPublisherFactory publisherFactory, final @Audit Client client) {
109 super(configuration.getName(), eventTopicsMetaData, configuration.getTopics(), configuration.isEnabled());
110
111 this.configuration = configuration;
112 if (client == null) {
113 this.defaultHttpClientHandler = defaultHttpClientHandler();
114 this.client = new Client(defaultHttpClientHandler);
115 } else {
116 this.defaultHttpClientHandler = null;
117 this.client = client;
118 }
119 channelId = UUID.randomUUID().toString();
120
121 final ConnectionConfiguration connection = configuration.getConnection();
122 serviceUrl = connection.isUseSSL() ? "https://" : "http://"
123 + configuration.getConnection().getHost()
124 + ':'
125 + configuration.getConnection().getPort()
126 + "/services/collector/raw";
127
128 final BufferingConfiguration bufferingConfiguration = configuration.getBuffering();
129 final Duration writeInterval = isNullOrEmpty(bufferingConfiguration.getWriteInterval()) ? null
130 : Duration.duration(bufferingConfiguration.getWriteInterval());
131
132 if (publisherFactory == null) {
133 publisherFactory = new BatchPublisherFactoryImpl();
134 }
135 batchPublisher = publisherFactory.newBufferedPublisher(this)
136 .capacity(bufferingConfiguration.getMaxSize())
137 .writeInterval(writeInterval)
138 .maxBatchEvents(bufferingConfiguration.getMaxBatchedEvents())
139 .averagePerEventPayloadSize(BATCH_INDEX_AVERAGE_PER_EVENT_PAYLOAD_SIZE)
140 .autoFlush(ALWAYS_FLUSH_BATCH_QUEUE)
141 .build();
142 }
143
144 @Override
145 public void startup() throws ResourceException {
146 batchPublisher.startup();
147 }
148
149 @Override
150 public void shutdown() throws ResourceException {
151 batchPublisher.shutdown();
152 if (defaultHttpClientHandler != null) {
153 try {
154 defaultHttpClientHandler.close();
155 } catch (IOException e) {
156 throw ResourceException.newResourceException(ResourceException.INTERNAL_ERROR,
157 "An error occurred while closing the default HTTP client handler", e);
158 }
159 }
160 }
161
162 @Override
163 public Promise<ResourceResponse, ResourceException> publishEvent(final Context context,
164 final String topic, final JsonValue event) {
165 final String resourceId = event.get(ResourceResponse.FIELD_CONTENT_ID).asString();
166
167 if (!batchPublisher.offer(topic, event)) {
168 return new ServiceUnavailableException(
169 "Splunk batch buffer full, dropping audit event " + topic + "/" + resourceId).asPromise();
170 }
171
172 return newResourceResponse(resourceId, null, event).asPromise();
173 }
174
175 @Override
176 public Promise<ResourceResponse, ResourceException> readEvent(final Context context,
177 final String topic, final String resourceId) {
178 return new NotSupportedException(
179 "Read operations are currently not supported by the Splunk handler").asPromise();
180 }
181
182 @Override
183 public Promise<QueryResponse, ResourceException> queryEvents(final Context context,
184 final String topic, final QueryRequest query, final QueryResourceHandler handler) {
185 return new NotSupportedException(
186 "Query operations are currently not supported by the Splunk handler").asPromise();
187 }
188
189 @Override
190 public void addToBatch(final String topic, final JsonValue event,
191 final StringBuilder payload) throws BatchException {
192 event.put("_topic", topic);
193
194 try {
195 final String eventJsonString = OBJECT_MAPPER.writeValueAsString(event.getObject());
196 payload.append(eventJsonString).append('\n');
197 } catch (final JsonProcessingException e) {
198 throw new BatchException("Unable to parse event object to JSON", e);
199 } finally {
200 event.remove("_topic");
201 }
202 }
203
204 @Override
205 public Promise<Void, BatchException> publishBatch(final String payload) {
206 final Request request = new Request();
207 request.setMethod("POST");
208
209 try {
210 request.setUri(serviceUrl);
211 } catch (URISyntaxException e) {
212 return newExceptionPromise(new BatchException("Incorrect URI " + serviceUrl, e));
213 }
214
215 request.getHeaders().put(ContentTypeHeader.NAME, "application/json; charset=UTF-8");
216 request.getHeaders().put("Authorization", "Splunk " + configuration.getAuthzToken());
217 request.getHeaders().put("X-Splunk-Request-Channel", channelId);
218 request.setEntity(payload);
219
220 return client.send(request).then(
221 closeSilently(new Function<Response, Void, BatchException>() {
222
223 @Override
224 public Void apply(final Response response) throws BatchException {
225 if (!response.getStatus().isSuccessful()) {
226 throw new BatchException("Publishing to Splunk failed: " + response.getEntity());
227 }
228
229 return null;
230 }
231
232 }), Responses.<Void, BatchException>noopExceptionFunction());
233 }
234
235 private HttpClientHandler defaultHttpClientHandler() {
236 try {
237 return new HttpClientHandler(
238 Options.defaultOptions()
239 .set(OPTION_LOADER, new Loader() {
240 @Override
241 public <S> S load(Class<S> service, Options options) {
242 return service.cast(new AsyncHttpClientProvider());
243 }
244 }));
245 } catch (HttpApplicationException e) {
246 throw new RuntimeException("Error while building default HTTP Client", e);
247 }
248 }
249 }