JsonFileWriter.java

/*
 * The contents of this file are subject to the terms of the Common Development and
 * Distribution License (the License). You may not use this file except in compliance with the
 * License.
 *
 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
 * specific language governing permission and limitations under the License.
 *
 * When distributing Covered Software, include this CDDL Header Notice in each file and include
 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
 * Header, with the fields enclosed by brackets [] replaced by your own identifying
 * information: "Portions copyright [year] [name of copyright owner]".
 *
 * Copyright 2016 ForgeRock AS.
 * Portions copyright 2021 Wren Security.
 */

package org.forgerock.audit.handlers.json;

import static java.lang.Math.max;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.forgerock.audit.batch.CommonAuditBatchConfiguration.POLLING_INTERVAL;
import static org.forgerock.audit.handlers.json.JsonAuditEventHandler.OBJECT_MAPPER;
import static org.forgerock.audit.handlers.json.JsonAuditEventHandler.EVENT_ID_FIELD;
import static org.forgerock.audit.util.ElasticsearchUtil.normalizeJson;
import static org.forgerock.audit.util.ElasticsearchUtil.renameField;
import static org.forgerock.json.resource.ResourceResponse.FIELD_CONTENT_ID;
import static org.forgerock.util.Reject.checkNotNull;
import static org.forgerock.util.Utils.closeSilently;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.forgerock.audit.retention.FileNamingPolicy;
import org.forgerock.audit.retention.RetentionPolicy;
import org.forgerock.audit.rotation.RotatableObject;
import org.forgerock.audit.rotation.RotationHooks;
import org.forgerock.audit.rotation.RotationPolicy;
import org.forgerock.json.JsonValue;
import org.forgerock.util.Utils;
import org.forgerock.util.time.Duration;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Periodically writes JSON events to a file.
 */
class JsonFileWriter {

    private static final Logger logger = LoggerFactory.getLogger(JsonFileWriter.class);

    private static final int MIN_QUEUE_SIZE = 100_000;

    static final String LOG_FILE_NAME_SUFFIX = "audit.json";

    private final boolean elasticsearchCompatible;
    private final BlockingQueue<QueueEntry> queue;
    private final ScheduledExecutorService scheduler;
    private final QueueConsumer queueConsumer;
    private final Duration writeInterval;

    /**
     * Creates a {@link JsonFileWriter}. For arguments with minimum values, the minimum will be used
     * without warning if the provided value is lower than that minimum.
     *
     * @param topics Supported topics
     * @param configuration Configuration
     * @param autoFlush {@code true} when data in queue should always be flushed on shutdown and {@code false} when
     * it may be discarded
     */
    JsonFileWriter(final Set<String> topics, final JsonAuditEventHandlerConfiguration configuration,
            final boolean autoFlush) {
        elasticsearchCompatible = configuration.isElasticsearchCompatible();
        queue = new ArrayBlockingQueue<>(max(configuration.getBuffering().getMaxSize(), MIN_QUEUE_SIZE));
        scheduler = Executors.newScheduledThreadPool(1, Utils.newThreadFactory(null, "audit-json-%d", false));
        writeInterval = parseWriteInterval(configuration);
        // checking for events to write on disk happens at most once a second, since {@code run()}
        // is called periodically compute how many iterations are needed beofre writing on file
        queueConsumer = new QueueConsumer(LOG_FILE_NAME_SUFFIX, topics, configuration, autoFlush, queue, scheduler,
                (int) Math.max(1, 1_000_000 / writeInterval.to(TimeUnit.MICROSECONDS)));
    }

    private Duration parseWriteInterval(final JsonAuditEventHandlerConfiguration configuration) {
        final String writeIntervalString = configuration.getBuffering().getWriteInterval();
        Duration writeInterval;
        try {
            writeInterval = Duration.duration(writeIntervalString);
        } catch (Exception e) {
            writeInterval = null;
        }
        if (writeInterval == null || writeInterval.getValue() <= 0) {
            logger.info("writeInterval '{}' is invalid, so falling back to {}", writeIntervalString, POLLING_INTERVAL);
            return POLLING_INTERVAL;
        }
        return writeInterval;
    }

    /**
     * Starts periodically writing JSON events to a file.
     */
    void startup() {
        scheduler.scheduleAtFixedRate(queueConsumer, 0, writeInterval.to(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
    }

    /**
     * Stops writing JSON events to a file, and awaits termination of pending queue tasks when {@code autoFlush}
     * is enabled.
     */
    void shutdown() {
        if (!scheduler.isShutdown()) {
            queueConsumer.shutdown();
        }
    }

    /**
     * Inserts the specified element at the tail of this queue, and blocks if this queue is full.
     *
     * @param topic Event topic
     * @param event Event payload to index, where {@code _id} field is the identifier
     * @throws InterruptedException thread interrupted while blocking on a full queue
     * @throws IOException failed to serialize JSON
     */
    void put(final String topic, final JsonValue event) throws InterruptedException, IOException {
        if (elasticsearchCompatible) {
            // rename _id field to be _eventId, because _id is reserved by ElasticSearch
            renameField(event, FIELD_CONTENT_ID, EVENT_ID_FIELD);
            try {
                // apply ElasticSearch JSON normalization, if necessary
                final byte[] bytes = normalizeJson(event).getBytes(UTF_8);
                queue.put(new QueueEntry(topic, bytes));
            } finally {
                // restore _id field, because original event is same instance as normalizedEvent
                renameField(event, EVENT_ID_FIELD, FIELD_CONTENT_ID);
            }
        } else {
            queue.put(new QueueEntry(topic, OBJECT_MAPPER.writeValueAsBytes(event.getObject())));
        }
    }

    /**
     * Requests an unscheduled rotation of the underlying JSON audit file.
     * <p>
     * Rotation is only possible when enabled in the {@link JsonAuditEventHandlerConfiguration configuration},
     * and will happen after all preexisting events in the queue have been processed.
     *
     * @param topic Event topic
     * @return {@code true} if rotation is enabled, and {@code false} otherwise
     * @throws InterruptedException thread interrupted while blocking on a full queue
     */
    boolean rotateFile(final String topic) throws InterruptedException {
        if (queueConsumer.isRotationEnabled()) {
            queue.put(new QueueEntry(topic, QueueEntry.ROTATE_FILE_ENTRY));
            return true;
        }
        return false;
    }

    /**
     * Requests an unscheduled buffer-flush of the underlying JSON audit file, which is useful for testing.
     * <p>
     * The flush will happen after all preexisting events in the queue have been processed.
     *
     * @param topic Event topic
     * @throws InterruptedException thread interrupted while blocking on a full queue
     */
    void flushFileBuffer(final String topic) throws InterruptedException {
        queue.put(new QueueEntry(topic, QueueEntry.FLUSH_FILE_ENTRY));
    }

    /**
     * Gets the current log-file for the given topic.
     *
     * @param topic Topic name (case-sensitive)
     * @return {@link Path} or {@code null} if topic is unrecognised
     */
    Path getTopicFilePath(final String topic) {
        final QueueConsumer.TopicEntry topicEntry = queueConsumer.topicEntryMap.get(topic);
        return topicEntry == null ? null : topicEntry.filePath;
    }

    /**
     * A single audit-event entry.
     */
    private static class QueueEntry {

        static final byte[] ROTATE_FILE_ENTRY = new byte[0];
        static final byte[] FLUSH_FILE_ENTRY = new byte[0];

        private final String topic;
        private final byte[] event;

        /**
         * Creates a new audit-event batch entry.
         *
         * @param topic Event topic
         * @param event Event JSON payload
         */
        QueueEntry(final String topic, final byte[] event) {
            this.topic = checkNotNull(topic);
            this.event = checkNotNull(event);
        }

        boolean isRotateEntry() {
            return event == ROTATE_FILE_ENTRY;
        }

        boolean isFlushEntry() {
            return event == FLUSH_FILE_ENTRY;
        }
    }

    /**
     * Consumer of the audit-event batch queue, which can be scheduled to run periodically. This class is not
     * thread-safe, and is intended to be run by a single thread.
     */
    private static final class QueueConsumer implements Runnable {

        private static final int BATCH_SIZE = 5000;
        private static final int OUTPUT_BUF_INITIAL_SIZE = 16 * 1024;
        private static final byte[] NEWLINE_UTF_8_BYTES = "\n".getBytes(UTF_8);

        private final boolean flushOnShutdown;
        private final boolean rotationEnabled;
        private final boolean hasRotationOrRetentionPolicies;
        private final List<RotationPolicy> rotationPolicies;
        private final List<RetentionPolicy> retentionPolicies;
        private final Set<File> filesToDelete;
        private final BlockingQueue<QueueEntry> queue;
        private final ScheduledExecutorService scheduler;
        private final Map<String, TopicEntry> topicEntryMap;
        private final List<QueueEntry> drainList;
        private final int iterationsBeforeFlush;

        private volatile boolean shutdown;

        /**
         * Creates a {@code QueueConsumer}.
         *
         * @param fileNameSuffix Log file-name suffix
         * @param topics Supported topics
         * @param configuration Configuration
         * @param flushOnShutdown When {@code true}, the queue will be flushed on shutdown and when {@code false},
         * items in the queue will be dropped
         * @param queue Audit-event queue
         * @param scheduler This runnable's scheduler
         * @param iterationsBeforeFlush number of times {@code run()} is called before topic events are written on file
         */
        private QueueConsumer(final String fileNameSuffix, final Set<String> topics,
                final JsonAuditEventHandlerConfiguration configuration, final boolean flushOnShutdown,
                final BlockingQueue<QueueEntry> queue, final ScheduledExecutorService scheduler,
                final int iterationsBeforeFlush) {
            this.queue = queue;
            this.scheduler = scheduler;
            this.flushOnShutdown = flushOnShutdown;
            this.iterationsBeforeFlush = iterationsBeforeFlush;
            drainList = new ArrayList<>(BATCH_SIZE);
            rotationEnabled = configuration.getFileRotation().isRotationEnabled();
            rotationPolicies = configuration.getFileRotation().buildRotationPolicies();
            retentionPolicies = configuration.getFileRetention().buildRetentionPolicies();
            hasRotationOrRetentionPolicies = (rotationEnabled && !rotationPolicies.isEmpty())
                    || !retentionPolicies.isEmpty();
            filesToDelete = new HashSet<>();

            // build map of topic files
            final Map<String, TopicEntry> topicEntryMap = new HashMap<>();
            for (final String topic : topics) {
                final String fileName = topic + '.' + fileNameSuffix;
                topicEntryMap.put(topic, new TopicEntry(fileName, configuration));
            }
            this.topicEntryMap = Collections.unmodifiableMap(topicEntryMap);
        }

        /**
         * Informs queue consumer that shutdown has been triggered, and when {@code flushOnShutdown} is enabled,
         * blocks until all events have been flushed from the queue.
         */
        void shutdown() {
            if (!shutdown) {
                shutdown = true;
                scheduler.shutdown();
                try {
                    if (flushOnShutdown) {
                        // flush requested, so block in an non-cancelable way
                        boolean interrupted = false;
                        while (!scheduler.isTerminated()) {
                            try {
                                scheduler.awaitTermination(1L, TimeUnit.MINUTES);
                                logger.info("Awaiting audit event consumer termination.");
                            } catch (InterruptedException e) {
                                interrupted = true;
                            }
                        }
                        if (interrupted) {
                            Thread.currentThread().interrupt();
                        }
                        // process remaining events and flush topic writers
                        while (!queue.isEmpty()) {
                            writeEvents();
                        }
                        for (final TopicEntry topicEntry : topicEntryMap.values()) {
                            topicEntry.flush();
                        }
                    }
                } finally {
                    closeSilently(topicEntryMap.values());
                }
            }
        }

        @Override
        public void run() {
            // following loop will run at least once, even if queue is empty, so that rotation policies will run
            do {
                writeEvents();
            } while (!queue.isEmpty() && !shutdown);
        }

        private void writeEvents() {
            drainList.clear();
            try {
                // handle one batch of events
                final int n = queue.drainTo(drainList, BATCH_SIZE);
                for (int i = 0; i < n; ++i) {
                    final QueueEntry entry = drainList.get(i);
                    final TopicEntry topicEntry = topicEntryMap.get(entry.topic);
                    if (topicEntry == null) {
                        logger.warn("Unrecognised topic: " + entry.topic);
                    } else {
                        if (entry.isRotateEntry()) {
                            topicEntry.rotateNow();
                        } else if (entry.isFlushEntry()) {
                            topicEntry.flush();
                        } else {
                            topicEntry.write(entry.event);
                        }
                    }
                }
                for (final TopicEntry topicEntry : topicEntryMap.values()) {
                    // no new events, so flush all file buffers, to prevent appearance that events are stuck/lost
                    if (topicEntry.currentIterationsWithoutEvents() >= iterationsBeforeFlush) {
                        topicEntry.flush();
                    }
                }

                if (hasRotationOrRetentionPolicies) {
                    // enforce rotation and/or retention policies for all topic files
                    for (final TopicEntry topicEntry : topicEntryMap.values()) {
                        topicEntry.rotateIfNeeded();
                    }
                }
            } catch (IOException e) {
                logger.error("JSON file write failed", e);
            } catch (Exception e) {
                logger.error("Unexpected failure", e);
            }
        }

        /**
         * Checks if rotation is enabled.
         *
         * @return {@code true} if rotation is enabled and {@code false} otherwise
         */
        boolean isRotationEnabled() {
            return rotationEnabled;
        }

        /**
         * Represents state for a single topic audit-file.
         */
        private class TopicEntry implements RotatableObject, Closeable {
            private static final int FILE_BUFFER_THRESHOLD = 8 * 1024;

            private final Path filePath;
            private final FileNamingPolicy fileNamingPolicy;
            private final ByteBufferOutputStream outputStream;
            private DateTime lastRotationTime;
            private FileChannel fileChannel;
            private long positionInFile;
            private int iterationsWithoutEventsCounter;

            TopicEntry(final String fileName, final JsonAuditEventHandlerConfiguration configuration) {
                try {
                    outputStream = new ByteBufferOutputStream(ByteBuffer.allocateDirect(OUTPUT_BUF_INITIAL_SIZE));

                    final Path directoryPath = Paths.get(configuration.getLogDirectory());
                    if (Files.notExists(directoryPath)) {
                        Files.createDirectory(directoryPath);
                    }
                    filePath = directoryPath.resolve(fileName);
                    openFileChannel();

                    final File currentFile = filePath.toFile();
                    fileNamingPolicy = configuration.getFileRotation().buildTimeStampFileNamingPolicy(currentFile);

                    final long lastModified = currentFile.lastModified();
                    this.lastRotationTime = lastModified > 0
                            ? new DateTime(lastModified, DateTimeZone.UTC)
                            : DateTime.now(DateTimeZone.UTC);
                } catch (IOException e) {
                    throw new RuntimeException("Failed to create or open file", e);
                }
            }

            void write(final byte[] bytes) throws IOException {
                // newline delimited JSON with UTF-8 character encoding
                outputStream.write(bytes);
                outputStream.write(NEWLINE_UTF_8_BYTES);
                if (outputStream.byteBuffer().position() >= FILE_BUFFER_THRESHOLD) {
                    outputStream.byteBuffer().flip();
                    try {
                        if (Files.notExists(filePath)) {
                            openFileChannel();
                        }
                        // write buffer to file
                        positionInFile += fileChannel.write(outputStream.byteBuffer(), positionInFile);
                    } finally {
                        outputStream.clear();
                    }
                }
                iterationsWithoutEventsCounter = 0;
            }

            void flush() {
                if (outputStream.byteBuffer().position() != 0) {
                    // write buffer to file
                    outputStream.byteBuffer().flip();
                    try {
                        if (Files.notExists(filePath)) {
                            openFileChannel();
                        }
                        // write buffer to file
                        positionInFile += fileChannel.write(outputStream.byteBuffer(), positionInFile);
                    } catch (IOException e) {
                        logger.error("Failed to flush file buffer", e);
                    } finally {
                        outputStream.clear();
                    }
                }
                iterationsWithoutEventsCounter = 0;
            }

            @Override
            public long getBytesWritten() {
                return positionInFile;
            }

            @Override
            public DateTime getLastRotationTime() {
                return lastRotationTime;
            }

            @Override
            public void rotateIfNeeded() throws IOException {
                if (rotationEnabled && !rotationPolicies.isEmpty()) {
                    for (final RotationPolicy rotationPolicy : rotationPolicies) {
                        if (rotationPolicy.shouldRotateFile(this)) {
                            rotateNow();
                            break;
                        }
                    }
                }
                if (!retentionPolicies.isEmpty()) {
                    filesToDelete.clear();
                    for (final RetentionPolicy retentionPolicy : retentionPolicies) {
                        filesToDelete.addAll(retentionPolicy.deleteFiles(fileNamingPolicy));
                    }
                    if (!filesToDelete.isEmpty()) {
                        for (final File file : filesToDelete) {
                            if (!file.delete() && logger.isWarnEnabled()) {
                                logger.warn("Could not delete file {}", file.getAbsolutePath());
                            }
                        }
                    }
                }
            }

            /**
             * Rotates the underlying JSON audit file.
             *
             * @throws IOException error rotating file
             */
            void rotateNow() throws IOException {
                // close and rename current file
                fileChannel.close();
                final Path archivedFilePath = fileNamingPolicy.getNextName().toPath();
                Files.move(filePath, archivedFilePath);
                // create new file
                openFileChannel();
                lastRotationTime = DateTime.now(DateTimeZone.UTC);
            }

            private void openFileChannel() throws IOException {
                fileChannel = FileChannel.open(filePath, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
                positionInFile = fileChannel.size();
            }

            @Override
            public void close() throws IOException {
                fileChannel.close();
            }

            @Override
            public void registerRotationHooks(RotationHooks rotationHooks) {
                // not implemented
            }

            int currentIterationsWithoutEvents() {
                return ++iterationsWithoutEventsCounter;
            }
        }
    }

}