1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.forgerock.audit.handlers.json;
19
20 import static java.lang.Math.max;
21 import static java.nio.charset.StandardCharsets.UTF_8;
22 import static org.forgerock.audit.batch.CommonAuditBatchConfiguration.POLLING_INTERVAL;
23 import static org.forgerock.audit.handlers.json.JsonAuditEventHandler.OBJECT_MAPPER;
24 import static org.forgerock.audit.handlers.json.JsonAuditEventHandler.EVENT_ID_FIELD;
25 import static org.forgerock.audit.util.ElasticsearchUtil.normalizeJson;
26 import static org.forgerock.audit.util.ElasticsearchUtil.renameField;
27 import static org.forgerock.json.resource.ResourceResponse.FIELD_CONTENT_ID;
28 import static org.forgerock.util.Reject.checkNotNull;
29 import static org.forgerock.util.Utils.closeSilently;
30
31 import java.io.Closeable;
32 import java.io.File;
33 import java.io.IOException;
34 import java.nio.ByteBuffer;
35 import java.nio.channels.FileChannel;
36 import java.nio.file.Files;
37 import java.nio.file.Path;
38 import java.nio.file.Paths;
39 import java.nio.file.StandardOpenOption;
40 import java.util.ArrayList;
41 import java.util.Collections;
42 import java.util.HashMap;
43 import java.util.HashSet;
44 import java.util.List;
45 import java.util.Map;
46 import java.util.Set;
47 import java.util.concurrent.ArrayBlockingQueue;
48 import java.util.concurrent.BlockingQueue;
49 import java.util.concurrent.Executors;
50 import java.util.concurrent.ScheduledExecutorService;
51 import java.util.concurrent.TimeUnit;
52
53 import org.forgerock.audit.retention.FileNamingPolicy;
54 import org.forgerock.audit.retention.RetentionPolicy;
55 import org.forgerock.audit.rotation.RotatableObject;
56 import org.forgerock.audit.rotation.RotationHooks;
57 import org.forgerock.audit.rotation.RotationPolicy;
58 import org.forgerock.json.JsonValue;
59 import org.forgerock.util.Utils;
60 import org.forgerock.util.time.Duration;
61 import org.joda.time.DateTime;
62 import org.joda.time.DateTimeZone;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
65
66
67
68
69 class JsonFileWriter {
70
71 private static final Logger logger = LoggerFactory.getLogger(JsonFileWriter.class);
72
73 private static final int MIN_QUEUE_SIZE = 100_000;
74
75 static final String LOG_FILE_NAME_SUFFIX = "audit.json";
76
77 private final boolean elasticsearchCompatible;
78 private final BlockingQueue<QueueEntry> queue;
79 private final ScheduledExecutorService scheduler;
80 private final QueueConsumer queueConsumer;
81 private final Duration writeInterval;
82
83
84
85
86
87
88
89
90
91
92 JsonFileWriter(final Set<String> topics, final JsonAuditEventHandlerConfiguration configuration,
93 final boolean autoFlush) {
94 elasticsearchCompatible = configuration.isElasticsearchCompatible();
95 queue = new ArrayBlockingQueue<>(max(configuration.getBuffering().getMaxSize(), MIN_QUEUE_SIZE));
96 scheduler = Executors.newScheduledThreadPool(1, Utils.newThreadFactory(null, "audit-json-%d", false));
97 writeInterval = parseWriteInterval(configuration);
98
99
100 queueConsumer = new QueueConsumer(LOG_FILE_NAME_SUFFIX, topics, configuration, autoFlush, queue, scheduler,
101 (int) Math.max(1, 1_000_000 / writeInterval.to(TimeUnit.MICROSECONDS)));
102 }
103
104 private Duration parseWriteInterval(final JsonAuditEventHandlerConfiguration configuration) {
105 final String writeIntervalString = configuration.getBuffering().getWriteInterval();
106 Duration writeInterval;
107 try {
108 writeInterval = Duration.duration(writeIntervalString);
109 } catch (Exception e) {
110 writeInterval = null;
111 }
112 if (writeInterval == null || writeInterval.getValue() <= 0) {
113 logger.info("writeInterval '{}' is invalid, so falling back to {}", writeIntervalString, POLLING_INTERVAL);
114 return POLLING_INTERVAL;
115 }
116 return writeInterval;
117 }
118
119
120
121
122 void startup() {
123 scheduler.scheduleAtFixedRate(queueConsumer, 0, writeInterval.to(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
124 }
125
126
127
128
129
130 void shutdown() {
131 if (!scheduler.isShutdown()) {
132 queueConsumer.shutdown();
133 }
134 }
135
136
137
138
139
140
141
142
143
144 void put(final String topic, final JsonValue event) throws InterruptedException, IOException {
145 if (elasticsearchCompatible) {
146
147 renameField(event, FIELD_CONTENT_ID, EVENT_ID_FIELD);
148 try {
149
150 final byte[] bytes = normalizeJson(event).getBytes(UTF_8);
151 queue.put(new QueueEntry(topic, bytes));
152 } finally {
153
154 renameField(event, EVENT_ID_FIELD, FIELD_CONTENT_ID);
155 }
156 } else {
157 queue.put(new QueueEntry(topic, OBJECT_MAPPER.writeValueAsBytes(event.getObject())));
158 }
159 }
160
161
162
163
164
165
166
167
168
169
170
171 boolean rotateFile(final String topic) throws InterruptedException {
172 if (queueConsumer.isRotationEnabled()) {
173 queue.put(new QueueEntry(topic, QueueEntry.ROTATE_FILE_ENTRY));
174 return true;
175 }
176 return false;
177 }
178
179
180
181
182
183
184
185
186
187 void flushFileBuffer(final String topic) throws InterruptedException {
188 queue.put(new QueueEntry(topic, QueueEntry.FLUSH_FILE_ENTRY));
189 }
190
191
192
193
194
195
196
197 Path getTopicFilePath(final String topic) {
198 final QueueConsumer.TopicEntry topicEntry = queueConsumer.topicEntryMap.get(topic);
199 return topicEntry == null ? null : topicEntry.filePath;
200 }
201
202
203
204
205 private static class QueueEntry {
206
207 static final byte[] ROTATE_FILE_ENTRY = new byte[0];
208 static final byte[] FLUSH_FILE_ENTRY = new byte[0];
209
210 private final String topic;
211 private final byte[] event;
212
213
214
215
216
217
218
219 QueueEntry(final String topic, final byte[] event) {
220 this.topic = checkNotNull(topic);
221 this.event = checkNotNull(event);
222 }
223
224 boolean isRotateEntry() {
225 return event == ROTATE_FILE_ENTRY;
226 }
227
228 boolean isFlushEntry() {
229 return event == FLUSH_FILE_ENTRY;
230 }
231 }
232
233
234
235
236
237 private static final class QueueConsumer implements Runnable {
238
239 private static final int BATCH_SIZE = 5000;
240 private static final int OUTPUT_BUF_INITIAL_SIZE = 16 * 1024;
241 private static final byte[] NEWLINE_UTF_8_BYTES = "\n".getBytes(UTF_8);
242
243 private final boolean flushOnShutdown;
244 private final boolean rotationEnabled;
245 private final boolean hasRotationOrRetentionPolicies;
246 private final List<RotationPolicy> rotationPolicies;
247 private final List<RetentionPolicy> retentionPolicies;
248 private final Set<File> filesToDelete;
249 private final BlockingQueue<QueueEntry> queue;
250 private final ScheduledExecutorService scheduler;
251 private final Map<String, TopicEntry> topicEntryMap;
252 private final List<QueueEntry> drainList;
253 private final int iterationsBeforeFlush;
254
255 private volatile boolean shutdown;
256
257
258
259
260
261
262
263
264
265
266
267
268
269 private QueueConsumer(final String fileNameSuffix, final Set<String> topics,
270 final JsonAuditEventHandlerConfiguration configuration, final boolean flushOnShutdown,
271 final BlockingQueue<QueueEntry> queue, final ScheduledExecutorService scheduler,
272 final int iterationsBeforeFlush) {
273 this.queue = queue;
274 this.scheduler = scheduler;
275 this.flushOnShutdown = flushOnShutdown;
276 this.iterationsBeforeFlush = iterationsBeforeFlush;
277 drainList = new ArrayList<>(BATCH_SIZE);
278 rotationEnabled = configuration.getFileRotation().isRotationEnabled();
279 rotationPolicies = configuration.getFileRotation().buildRotationPolicies();
280 retentionPolicies = configuration.getFileRetention().buildRetentionPolicies();
281 hasRotationOrRetentionPolicies = (rotationEnabled && !rotationPolicies.isEmpty())
282 || !retentionPolicies.isEmpty();
283 filesToDelete = new HashSet<>();
284
285
286 final Map<String, TopicEntry> topicEntryMap = new HashMap<>();
287 for (final String topic : topics) {
288 final String fileName = topic + '.' + fileNameSuffix;
289 topicEntryMap.put(topic, new TopicEntry(fileName, configuration));
290 }
291 this.topicEntryMap = Collections.unmodifiableMap(topicEntryMap);
292 }
293
294
295
296
297
298 void shutdown() {
299 if (!shutdown) {
300 shutdown = true;
301 scheduler.shutdown();
302 try {
303 if (flushOnShutdown) {
304
305 boolean interrupted = false;
306 while (!scheduler.isTerminated()) {
307 try {
308 scheduler.awaitTermination(1L, TimeUnit.MINUTES);
309 logger.info("Awaiting audit event consumer termination.");
310 } catch (InterruptedException e) {
311 interrupted = true;
312 }
313 }
314 if (interrupted) {
315 Thread.currentThread().interrupt();
316 }
317
318 while (!queue.isEmpty()) {
319 writeEvents();
320 }
321 for (final TopicEntry topicEntry : topicEntryMap.values()) {
322 topicEntry.flush();
323 }
324 }
325 } finally {
326 closeSilently(topicEntryMap.values());
327 }
328 }
329 }
330
331 @Override
332 public void run() {
333
334 do {
335 writeEvents();
336 } while (!queue.isEmpty() && !shutdown);
337 }
338
339 private void writeEvents() {
340 drainList.clear();
341 try {
342
343 final int n = queue.drainTo(drainList, BATCH_SIZE);
344 for (int i = 0; i < n; ++i) {
345 final QueueEntry entry = drainList.get(i);
346 final TopicEntry topicEntry = topicEntryMap.get(entry.topic);
347 if (topicEntry == null) {
348 logger.warn("Unrecognised topic: " + entry.topic);
349 } else {
350 if (entry.isRotateEntry()) {
351 topicEntry.rotateNow();
352 } else if (entry.isFlushEntry()) {
353 topicEntry.flush();
354 } else {
355 topicEntry.write(entry.event);
356 }
357 }
358 }
359 for (final TopicEntry topicEntry : topicEntryMap.values()) {
360
361 if (topicEntry.currentIterationsWithoutEvents() >= iterationsBeforeFlush) {
362 topicEntry.flush();
363 }
364 }
365
366 if (hasRotationOrRetentionPolicies) {
367
368 for (final TopicEntry topicEntry : topicEntryMap.values()) {
369 topicEntry.rotateIfNeeded();
370 }
371 }
372 } catch (IOException e) {
373 logger.error("JSON file write failed", e);
374 } catch (Exception e) {
375 logger.error("Unexpected failure", e);
376 }
377 }
378
379
380
381
382
383
384 boolean isRotationEnabled() {
385 return rotationEnabled;
386 }
387
388
389
390
391 private class TopicEntry implements RotatableObject, Closeable {
392 private static final int FILE_BUFFER_THRESHOLD = 8 * 1024;
393
394 private final Path filePath;
395 private final FileNamingPolicy fileNamingPolicy;
396 private final ByteBufferOutputStream outputStream;
397 private DateTime lastRotationTime;
398 private FileChannel fileChannel;
399 private long positionInFile;
400 private int iterationsWithoutEventsCounter;
401
402 TopicEntry(final String fileName, final JsonAuditEventHandlerConfiguration configuration) {
403 try {
404 outputStream = new ByteBufferOutputStream(ByteBuffer.allocateDirect(OUTPUT_BUF_INITIAL_SIZE));
405
406 final Path directoryPath = Paths.get(configuration.getLogDirectory());
407 if (Files.notExists(directoryPath)) {
408 Files.createDirectory(directoryPath);
409 }
410 filePath = directoryPath.resolve(fileName);
411 openFileChannel();
412
413 final File currentFile = filePath.toFile();
414 fileNamingPolicy = configuration.getFileRotation().buildTimeStampFileNamingPolicy(currentFile);
415
416 final long lastModified = currentFile.lastModified();
417 this.lastRotationTime = lastModified > 0
418 ? new DateTime(lastModified, DateTimeZone.UTC)
419 : DateTime.now(DateTimeZone.UTC);
420 } catch (IOException e) {
421 throw new RuntimeException("Failed to create or open file", e);
422 }
423 }
424
425 void write(final byte[] bytes) throws IOException {
426
427 outputStream.write(bytes);
428 outputStream.write(NEWLINE_UTF_8_BYTES);
429 if (outputStream.byteBuffer().position() >= FILE_BUFFER_THRESHOLD) {
430 outputStream.byteBuffer().flip();
431 try {
432 if (Files.notExists(filePath)) {
433 openFileChannel();
434 }
435
436 positionInFile += fileChannel.write(outputStream.byteBuffer(), positionInFile);
437 } finally {
438 outputStream.clear();
439 }
440 }
441 iterationsWithoutEventsCounter = 0;
442 }
443
444 void flush() {
445 if (outputStream.byteBuffer().position() != 0) {
446
447 outputStream.byteBuffer().flip();
448 try {
449 if (Files.notExists(filePath)) {
450 openFileChannel();
451 }
452
453 positionInFile += fileChannel.write(outputStream.byteBuffer(), positionInFile);
454 } catch (IOException e) {
455 logger.error("Failed to flush file buffer", e);
456 } finally {
457 outputStream.clear();
458 }
459 }
460 iterationsWithoutEventsCounter = 0;
461 }
462
463 @Override
464 public long getBytesWritten() {
465 return positionInFile;
466 }
467
468 @Override
469 public DateTime getLastRotationTime() {
470 return lastRotationTime;
471 }
472
473 @Override
474 public void rotateIfNeeded() throws IOException {
475 if (rotationEnabled && !rotationPolicies.isEmpty()) {
476 for (final RotationPolicy rotationPolicy : rotationPolicies) {
477 if (rotationPolicy.shouldRotateFile(this)) {
478 rotateNow();
479 break;
480 }
481 }
482 }
483 if (!retentionPolicies.isEmpty()) {
484 filesToDelete.clear();
485 for (final RetentionPolicy retentionPolicy : retentionPolicies) {
486 filesToDelete.addAll(retentionPolicy.deleteFiles(fileNamingPolicy));
487 }
488 if (!filesToDelete.isEmpty()) {
489 for (final File file : filesToDelete) {
490 if (!file.delete() && logger.isWarnEnabled()) {
491 logger.warn("Could not delete file {}", file.getAbsolutePath());
492 }
493 }
494 }
495 }
496 }
497
498
499
500
501
502
503 void rotateNow() throws IOException {
504
505 fileChannel.close();
506 final Path archivedFilePath = fileNamingPolicy.getNextName().toPath();
507 Files.move(filePath, archivedFilePath);
508
509 openFileChannel();
510 lastRotationTime = DateTime.now(DateTimeZone.UTC);
511 }
512
513 private void openFileChannel() throws IOException {
514 fileChannel = FileChannel.open(filePath, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
515 positionInFile = fileChannel.size();
516 }
517
518 @Override
519 public void close() throws IOException {
520 fileChannel.close();
521 }
522
523 @Override
524 public void registerRotationHooks(RotationHooks rotationHooks) {
525
526 }
527
528 int currentIterationsWithoutEvents() {
529 return ++iterationsWithoutEventsCounter;
530 }
531 }
532 }
533
534 }