View Javadoc
1   /*
2    * The contents of this file are subject to the terms of the Common Development and
3    * Distribution License (the License). You may not use this file except in compliance with the
4    * License.
5    *
6    * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
7    * specific language governing permission and limitations under the License.
8    *
9    * When distributing Covered Software, include this CDDL Header Notice in each file and include
10   * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
11   * Header, with the fields enclosed by brackets [] replaced by your own identifying
12   * information: "Portions copyright [year] [name of copyright owner]".
13   *
14   * Copyright 2016 ForgeRock AS.
15   * Portions copyright 2021 Wren Security.
16   */
17  
18  package org.forgerock.audit.handlers.json;
19  
20  import static java.lang.Math.max;
21  import static java.nio.charset.StandardCharsets.UTF_8;
22  import static org.forgerock.audit.batch.CommonAuditBatchConfiguration.POLLING_INTERVAL;
23  import static org.forgerock.audit.handlers.json.JsonAuditEventHandler.OBJECT_MAPPER;
24  import static org.forgerock.audit.handlers.json.JsonAuditEventHandler.EVENT_ID_FIELD;
25  import static org.forgerock.audit.util.ElasticsearchUtil.normalizeJson;
26  import static org.forgerock.audit.util.ElasticsearchUtil.renameField;
27  import static org.forgerock.json.resource.ResourceResponse.FIELD_CONTENT_ID;
28  import static org.forgerock.util.Reject.checkNotNull;
29  import static org.forgerock.util.Utils.closeSilently;
30  
31  import java.io.Closeable;
32  import java.io.File;
33  import java.io.IOException;
34  import java.nio.ByteBuffer;
35  import java.nio.channels.FileChannel;
36  import java.nio.file.Files;
37  import java.nio.file.Path;
38  import java.nio.file.Paths;
39  import java.nio.file.StandardOpenOption;
40  import java.util.ArrayList;
41  import java.util.Collections;
42  import java.util.HashMap;
43  import java.util.HashSet;
44  import java.util.List;
45  import java.util.Map;
46  import java.util.Set;
47  import java.util.concurrent.ArrayBlockingQueue;
48  import java.util.concurrent.BlockingQueue;
49  import java.util.concurrent.Executors;
50  import java.util.concurrent.ScheduledExecutorService;
51  import java.util.concurrent.TimeUnit;
52  
53  import org.forgerock.audit.retention.FileNamingPolicy;
54  import org.forgerock.audit.retention.RetentionPolicy;
55  import org.forgerock.audit.rotation.RotatableObject;
56  import org.forgerock.audit.rotation.RotationHooks;
57  import org.forgerock.audit.rotation.RotationPolicy;
58  import org.forgerock.json.JsonValue;
59  import org.forgerock.util.Utils;
60  import org.forgerock.util.time.Duration;
61  import org.joda.time.DateTime;
62  import org.joda.time.DateTimeZone;
63  import org.slf4j.Logger;
64  import org.slf4j.LoggerFactory;
65  
66  /**
67   * Periodically writes JSON events to a file.
68   */
69  class JsonFileWriter {
70  
71      private static final Logger logger = LoggerFactory.getLogger(JsonFileWriter.class);
72  
73      private static final int MIN_QUEUE_SIZE = 100_000;
74  
75      static final String LOG_FILE_NAME_SUFFIX = "audit.json";
76  
77      private final boolean elasticsearchCompatible;
78      private final BlockingQueue<QueueEntry> queue;
79      private final ScheduledExecutorService scheduler;
80      private final QueueConsumer queueConsumer;
81      private final Duration writeInterval;
82  
83      /**
84       * Creates a {@link JsonFileWriter}. For arguments with minimum values, the minimum will be used
85       * without warning if the provided value is lower than that minimum.
86       *
87       * @param topics Supported topics
88       * @param configuration Configuration
89       * @param autoFlush {@code true} when data in queue should always be flushed on shutdown and {@code false} when
90       * it may be discarded
91       */
92      JsonFileWriter(final Set<String> topics, final JsonAuditEventHandlerConfiguration configuration,
93              final boolean autoFlush) {
94          elasticsearchCompatible = configuration.isElasticsearchCompatible();
95          queue = new ArrayBlockingQueue<>(max(configuration.getBuffering().getMaxSize(), MIN_QUEUE_SIZE));
96          scheduler = Executors.newScheduledThreadPool(1, Utils.newThreadFactory(null, "audit-json-%d", false));
97          writeInterval = parseWriteInterval(configuration);
98          // checking for events to write on disk happens at most once a second, since {@code run()}
99          // is called periodically compute how many iterations are needed beofre writing on file
100         queueConsumer = new QueueConsumer(LOG_FILE_NAME_SUFFIX, topics, configuration, autoFlush, queue, scheduler,
101                 (int) Math.max(1, 1_000_000 / writeInterval.to(TimeUnit.MICROSECONDS)));
102     }
103 
104     private Duration parseWriteInterval(final JsonAuditEventHandlerConfiguration configuration) {
105         final String writeIntervalString = configuration.getBuffering().getWriteInterval();
106         Duration writeInterval;
107         try {
108             writeInterval = Duration.duration(writeIntervalString);
109         } catch (Exception e) {
110             writeInterval = null;
111         }
112         if (writeInterval == null || writeInterval.getValue() <= 0) {
113             logger.info("writeInterval '{}' is invalid, so falling back to {}", writeIntervalString, POLLING_INTERVAL);
114             return POLLING_INTERVAL;
115         }
116         return writeInterval;
117     }
118 
119     /**
120      * Starts periodically writing JSON events to a file.
121      */
122     void startup() {
123         scheduler.scheduleAtFixedRate(queueConsumer, 0, writeInterval.to(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
124     }
125 
126     /**
127      * Stops writing JSON events to a file, and awaits termination of pending queue tasks when {@code autoFlush}
128      * is enabled.
129      */
130     void shutdown() {
131         if (!scheduler.isShutdown()) {
132             queueConsumer.shutdown();
133         }
134     }
135 
136     /**
137      * Inserts the specified element at the tail of this queue, and blocks if this queue is full.
138      *
139      * @param topic Event topic
140      * @param event Event payload to index, where {@code _id} field is the identifier
141      * @throws InterruptedException thread interrupted while blocking on a full queue
142      * @throws IOException failed to serialize JSON
143      */
144     void put(final String topic, final JsonValue event) throws InterruptedException, IOException {
145         if (elasticsearchCompatible) {
146             // rename _id field to be _eventId, because _id is reserved by ElasticSearch
147             renameField(event, FIELD_CONTENT_ID, EVENT_ID_FIELD);
148             try {
149                 // apply ElasticSearch JSON normalization, if necessary
150                 final byte[] bytes = normalizeJson(event).getBytes(UTF_8);
151                 queue.put(new QueueEntry(topic, bytes));
152             } finally {
153                 // restore _id field, because original event is same instance as normalizedEvent
154                 renameField(event, EVENT_ID_FIELD, FIELD_CONTENT_ID);
155             }
156         } else {
157             queue.put(new QueueEntry(topic, OBJECT_MAPPER.writeValueAsBytes(event.getObject())));
158         }
159     }
160 
161     /**
162      * Requests an unscheduled rotation of the underlying JSON audit file.
163      * <p>
164      * Rotation is only possible when enabled in the {@link JsonAuditEventHandlerConfiguration configuration},
165      * and will happen after all preexisting events in the queue have been processed.
166      *
167      * @param topic Event topic
168      * @return {@code true} if rotation is enabled, and {@code false} otherwise
169      * @throws InterruptedException thread interrupted while blocking on a full queue
170      */
171     boolean rotateFile(final String topic) throws InterruptedException {
172         if (queueConsumer.isRotationEnabled()) {
173             queue.put(new QueueEntry(topic, QueueEntry.ROTATE_FILE_ENTRY));
174             return true;
175         }
176         return false;
177     }
178 
179     /**
180      * Requests an unscheduled buffer-flush of the underlying JSON audit file, which is useful for testing.
181      * <p>
182      * The flush will happen after all preexisting events in the queue have been processed.
183      *
184      * @param topic Event topic
185      * @throws InterruptedException thread interrupted while blocking on a full queue
186      */
187     void flushFileBuffer(final String topic) throws InterruptedException {
188         queue.put(new QueueEntry(topic, QueueEntry.FLUSH_FILE_ENTRY));
189     }
190 
191     /**
192      * Gets the current log-file for the given topic.
193      *
194      * @param topic Topic name (case-sensitive)
195      * @return {@link Path} or {@code null} if topic is unrecognised
196      */
197     Path getTopicFilePath(final String topic) {
198         final QueueConsumer.TopicEntry topicEntry = queueConsumer.topicEntryMap.get(topic);
199         return topicEntry == null ? null : topicEntry.filePath;
200     }
201 
202     /**
203      * A single audit-event entry.
204      */
205     private static class QueueEntry {
206 
207         static final byte[] ROTATE_FILE_ENTRY = new byte[0];
208         static final byte[] FLUSH_FILE_ENTRY = new byte[0];
209 
210         private final String topic;
211         private final byte[] event;
212 
213         /**
214          * Creates a new audit-event batch entry.
215          *
216          * @param topic Event topic
217          * @param event Event JSON payload
218          */
219         QueueEntry(final String topic, final byte[] event) {
220             this.topic = checkNotNull(topic);
221             this.event = checkNotNull(event);
222         }
223 
224         boolean isRotateEntry() {
225             return event == ROTATE_FILE_ENTRY;
226         }
227 
228         boolean isFlushEntry() {
229             return event == FLUSH_FILE_ENTRY;
230         }
231     }
232 
233     /**
234      * Consumer of the audit-event batch queue, which can be scheduled to run periodically. This class is not
235      * thread-safe, and is intended to be run by a single thread.
236      */
237     private static final class QueueConsumer implements Runnable {
238 
239         private static final int BATCH_SIZE = 5000;
240         private static final int OUTPUT_BUF_INITIAL_SIZE = 16 * 1024;
241         private static final byte[] NEWLINE_UTF_8_BYTES = "\n".getBytes(UTF_8);
242 
243         private final boolean flushOnShutdown;
244         private final boolean rotationEnabled;
245         private final boolean hasRotationOrRetentionPolicies;
246         private final List<RotationPolicy> rotationPolicies;
247         private final List<RetentionPolicy> retentionPolicies;
248         private final Set<File> filesToDelete;
249         private final BlockingQueue<QueueEntry> queue;
250         private final ScheduledExecutorService scheduler;
251         private final Map<String, TopicEntry> topicEntryMap;
252         private final List<QueueEntry> drainList;
253         private final int iterationsBeforeFlush;
254 
255         private volatile boolean shutdown;
256 
257         /**
258          * Creates a {@code QueueConsumer}.
259          *
260          * @param fileNameSuffix Log file-name suffix
261          * @param topics Supported topics
262          * @param configuration Configuration
263          * @param flushOnShutdown When {@code true}, the queue will be flushed on shutdown and when {@code false},
264          * items in the queue will be dropped
265          * @param queue Audit-event queue
266          * @param scheduler This runnable's scheduler
267          * @param iterationsBeforeFlush number of times {@code run()} is called before topic events are written on file
268          */
269         private QueueConsumer(final String fileNameSuffix, final Set<String> topics,
270                 final JsonAuditEventHandlerConfiguration configuration, final boolean flushOnShutdown,
271                 final BlockingQueue<QueueEntry> queue, final ScheduledExecutorService scheduler,
272                 final int iterationsBeforeFlush) {
273             this.queue = queue;
274             this.scheduler = scheduler;
275             this.flushOnShutdown = flushOnShutdown;
276             this.iterationsBeforeFlush = iterationsBeforeFlush;
277             drainList = new ArrayList<>(BATCH_SIZE);
278             rotationEnabled = configuration.getFileRotation().isRotationEnabled();
279             rotationPolicies = configuration.getFileRotation().buildRotationPolicies();
280             retentionPolicies = configuration.getFileRetention().buildRetentionPolicies();
281             hasRotationOrRetentionPolicies = (rotationEnabled && !rotationPolicies.isEmpty())
282                     || !retentionPolicies.isEmpty();
283             filesToDelete = new HashSet<>();
284 
285             // build map of topic files
286             final Map<String, TopicEntry> topicEntryMap = new HashMap<>();
287             for (final String topic : topics) {
288                 final String fileName = topic + '.' + fileNameSuffix;
289                 topicEntryMap.put(topic, new TopicEntry(fileName, configuration));
290             }
291             this.topicEntryMap = Collections.unmodifiableMap(topicEntryMap);
292         }
293 
294         /**
295          * Informs queue consumer that shutdown has been triggered, and when {@code flushOnShutdown} is enabled,
296          * blocks until all events have been flushed from the queue.
297          */
298         void shutdown() {
299             if (!shutdown) {
300                 shutdown = true;
301                 scheduler.shutdown();
302                 try {
303                     if (flushOnShutdown) {
304                         // flush requested, so block in an non-cancelable way
305                         boolean interrupted = false;
306                         while (!scheduler.isTerminated()) {
307                             try {
308                                 scheduler.awaitTermination(1L, TimeUnit.MINUTES);
309                                 logger.info("Awaiting audit event consumer termination.");
310                             } catch (InterruptedException e) {
311                                 interrupted = true;
312                             }
313                         }
314                         if (interrupted) {
315                             Thread.currentThread().interrupt();
316                         }
317                         // process remaining events and flush topic writers
318                         while (!queue.isEmpty()) {
319                             writeEvents();
320                         }
321                         for (final TopicEntry topicEntry : topicEntryMap.values()) {
322                             topicEntry.flush();
323                         }
324                     }
325                 } finally {
326                     closeSilently(topicEntryMap.values());
327                 }
328             }
329         }
330 
331         @Override
332         public void run() {
333             // following loop will run at least once, even if queue is empty, so that rotation policies will run
334             do {
335                 writeEvents();
336             } while (!queue.isEmpty() && !shutdown);
337         }
338 
339         private void writeEvents() {
340             drainList.clear();
341             try {
342                 // handle one batch of events
343                 final int n = queue.drainTo(drainList, BATCH_SIZE);
344                 for (int i = 0; i < n; ++i) {
345                     final QueueEntry entry = drainList.get(i);
346                     final TopicEntry topicEntry = topicEntryMap.get(entry.topic);
347                     if (topicEntry == null) {
348                         logger.warn("Unrecognised topic: " + entry.topic);
349                     } else {
350                         if (entry.isRotateEntry()) {
351                             topicEntry.rotateNow();
352                         } else if (entry.isFlushEntry()) {
353                             topicEntry.flush();
354                         } else {
355                             topicEntry.write(entry.event);
356                         }
357                     }
358                 }
359                 for (final TopicEntry topicEntry : topicEntryMap.values()) {
360                     // no new events, so flush all file buffers, to prevent appearance that events are stuck/lost
361                     if (topicEntry.currentIterationsWithoutEvents() >= iterationsBeforeFlush) {
362                         topicEntry.flush();
363                     }
364                 }
365 
366                 if (hasRotationOrRetentionPolicies) {
367                     // enforce rotation and/or retention policies for all topic files
368                     for (final TopicEntry topicEntry : topicEntryMap.values()) {
369                         topicEntry.rotateIfNeeded();
370                     }
371                 }
372             } catch (IOException e) {
373                 logger.error("JSON file write failed", e);
374             } catch (Exception e) {
375                 logger.error("Unexpected failure", e);
376             }
377         }
378 
379         /**
380          * Checks if rotation is enabled.
381          *
382          * @return {@code true} if rotation is enabled and {@code false} otherwise
383          */
384         boolean isRotationEnabled() {
385             return rotationEnabled;
386         }
387 
388         /**
389          * Represents state for a single topic audit-file.
390          */
391         private class TopicEntry implements RotatableObject, Closeable {
392             private static final int FILE_BUFFER_THRESHOLD = 8 * 1024;
393 
394             private final Path filePath;
395             private final FileNamingPolicy fileNamingPolicy;
396             private final ByteBufferOutputStream outputStream;
397             private DateTime lastRotationTime;
398             private FileChannel fileChannel;
399             private long positionInFile;
400             private int iterationsWithoutEventsCounter;
401 
402             TopicEntry(final String fileName, final JsonAuditEventHandlerConfiguration configuration) {
403                 try {
404                     outputStream = new ByteBufferOutputStream(ByteBuffer.allocateDirect(OUTPUT_BUF_INITIAL_SIZE));
405 
406                     final Path directoryPath = Paths.get(configuration.getLogDirectory());
407                     if (Files.notExists(directoryPath)) {
408                         Files.createDirectory(directoryPath);
409                     }
410                     filePath = directoryPath.resolve(fileName);
411                     openFileChannel();
412 
413                     final File currentFile = filePath.toFile();
414                     fileNamingPolicy = configuration.getFileRotation().buildTimeStampFileNamingPolicy(currentFile);
415 
416                     final long lastModified = currentFile.lastModified();
417                     this.lastRotationTime = lastModified > 0
418                             ? new DateTime(lastModified, DateTimeZone.UTC)
419                             : DateTime.now(DateTimeZone.UTC);
420                 } catch (IOException e) {
421                     throw new RuntimeException("Failed to create or open file", e);
422                 }
423             }
424 
425             void write(final byte[] bytes) throws IOException {
426                 // newline delimited JSON with UTF-8 character encoding
427                 outputStream.write(bytes);
428                 outputStream.write(NEWLINE_UTF_8_BYTES);
429                 if (outputStream.byteBuffer().position() >= FILE_BUFFER_THRESHOLD) {
430                     outputStream.byteBuffer().flip();
431                     try {
432                         if (Files.notExists(filePath)) {
433                             openFileChannel();
434                         }
435                         // write buffer to file
436                         positionInFile += fileChannel.write(outputStream.byteBuffer(), positionInFile);
437                     } finally {
438                         outputStream.clear();
439                     }
440                 }
441                 iterationsWithoutEventsCounter = 0;
442             }
443 
444             void flush() {
445                 if (outputStream.byteBuffer().position() != 0) {
446                     // write buffer to file
447                     outputStream.byteBuffer().flip();
448                     try {
449                         if (Files.notExists(filePath)) {
450                             openFileChannel();
451                         }
452                         // write buffer to file
453                         positionInFile += fileChannel.write(outputStream.byteBuffer(), positionInFile);
454                     } catch (IOException e) {
455                         logger.error("Failed to flush file buffer", e);
456                     } finally {
457                         outputStream.clear();
458                     }
459                 }
460                 iterationsWithoutEventsCounter = 0;
461             }
462 
463             @Override
464             public long getBytesWritten() {
465                 return positionInFile;
466             }
467 
468             @Override
469             public DateTime getLastRotationTime() {
470                 return lastRotationTime;
471             }
472 
473             @Override
474             public void rotateIfNeeded() throws IOException {
475                 if (rotationEnabled && !rotationPolicies.isEmpty()) {
476                     for (final RotationPolicy rotationPolicy : rotationPolicies) {
477                         if (rotationPolicy.shouldRotateFile(this)) {
478                             rotateNow();
479                             break;
480                         }
481                     }
482                 }
483                 if (!retentionPolicies.isEmpty()) {
484                     filesToDelete.clear();
485                     for (final RetentionPolicy retentionPolicy : retentionPolicies) {
486                         filesToDelete.addAll(retentionPolicy.deleteFiles(fileNamingPolicy));
487                     }
488                     if (!filesToDelete.isEmpty()) {
489                         for (final File file : filesToDelete) {
490                             if (!file.delete() && logger.isWarnEnabled()) {
491                                 logger.warn("Could not delete file {}", file.getAbsolutePath());
492                             }
493                         }
494                     }
495                 }
496             }
497 
498             /**
499              * Rotates the underlying JSON audit file.
500              *
501              * @throws IOException error rotating file
502              */
503             void rotateNow() throws IOException {
504                 // close and rename current file
505                 fileChannel.close();
506                 final Path archivedFilePath = fileNamingPolicy.getNextName().toPath();
507                 Files.move(filePath, archivedFilePath);
508                 // create new file
509                 openFileChannel();
510                 lastRotationTime = DateTime.now(DateTimeZone.UTC);
511             }
512 
513             private void openFileChannel() throws IOException {
514                 fileChannel = FileChannel.open(filePath, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
515                 positionInFile = fileChannel.size();
516             }
517 
518             @Override
519             public void close() throws IOException {
520                 fileChannel.close();
521             }
522 
523             @Override
524             public void registerRotationHooks(RotationHooks rotationHooks) {
525                 // not implemented
526             }
527 
528             int currentIterationsWithoutEvents() {
529                 return ++iterationsWithoutEventsCounter;
530             }
531         }
532     }
533 
534 }