RotatableWriter.java

/*
 * The contents of this file are subject to the terms of the Common Development and
 * Distribution License (the License). You may not use this file except in compliance with the
 * License.
 *
 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
 * specific language governing permission and limitations under the License.
 *
 * When distributing Covered Software, include this CDDL Header Notice in each file and include
 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
 * Header, with the fields enclosed by brackets [] replaced by your own identifying
 * information: "Portions copyright [year] [name of copyright owner]".
 *
 * Copyright 2015-2016 ForgeRock AS.
 */

package org.forgerock.audit.events.handlers.writers;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;

import org.forgerock.audit.events.handlers.FileBasedEventHandlerConfiguration;
import org.forgerock.audit.retention.FileNamingPolicy;
import org.forgerock.audit.retention.RetentionPolicy;
import org.forgerock.audit.rotation.RotatableObject;
import org.forgerock.audit.rotation.RotationContext;
import org.forgerock.audit.rotation.RotationHooks;
import org.forgerock.audit.rotation.RotationPolicy;
import org.forgerock.util.annotations.VisibleForTesting;
import org.forgerock.util.time.Duration;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Creates an {@link RotatableWriter} that supports file rotation and retention.
 */
public class RotatableWriter implements TextWriter, RotatableObject {

    private static final Logger logger = LoggerFactory.getLogger(RotatableWriter.class);
    private static final Duration FIVE_SECONDS = Duration.duration("5s");

    private final List<RotationPolicy> rotationPolicies;
    private final List<RetentionPolicy> retentionPolicies;
    private final FileNamingPolicy fileNamingPolicy;
    private ScheduledExecutorService rotator;
    private DateTime lastRotationTime;
    private final boolean rotationEnabled;
    private final File file;
    private RotationHooks rotationHooks = new RotationHooks.NoOpRotatationHooks();
    private final AtomicBoolean isRotating = new AtomicBoolean(false);
    /** The underlying output stream. */
    private MeteredStream meteredStream;
    /** The underlying buffered writer using the output stream. */
    private BufferedWriter writer;
    private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private final RolloverLifecycleHook rolloverLifecycleHook;

    /**
     * Constructs a {@link RotatableWriter} given an initial file to manage rotation/retention, and
     * a {@link FileBasedEventHandlerConfiguration}.
     * @param file The initial file to manage rotation/retention.
     * @param configuration The configuration of the rotation and retention policies.
     * @param append Whether to append to the rotatable file or not.
     * @throws IOException If a problem occurs.
     */
    public RotatableWriter(final File file, final FileBasedEventHandlerConfiguration configuration,
            final boolean append) throws IOException {
        this(file, configuration, append, configuration.getFileRotation().buildTimeStampFileNamingPolicy(file));
    }

    /**
     * Constructs a {@link RotatableWriter} given an initial file to manage rotation/retention, a
     * a {@link FileBasedEventHandlerConfiguration} and a {@link RolloverLifecycleHook}.
     *
     * @param file The initial file to manage rotation/retention.
     * @param configuration The configuration of the rotation and retention policies.
     * @param append Whether to append to the rotatable file or not.
     * @param rolloverLifecycleHook Hook to use before and after rotation/retention checks.
     * @throws IOException If a problem occurs.
     */
    public RotatableWriter(final File file, final FileBasedEventHandlerConfiguration configuration,
            final boolean append, final RolloverLifecycleHook rolloverLifecycleHook) throws IOException {
        this(file, configuration, append, configuration.getFileRotation().buildTimeStampFileNamingPolicy(file),
                rolloverLifecycleHook);
    }

    /**
     * This constructor allows tests to set an alternative FileNamingPolicy as TimeStampFileNamingPolicy lists files
     * for deletion by their last modified timestamps but these timestamps are only accurate to the nearest second.
     */
    @VisibleForTesting
    RotatableWriter(final File file, final FileBasedEventHandlerConfiguration configuration,
                           final boolean append, final FileNamingPolicy fileNamingPolicy) throws IOException {
        this(file, configuration, append, fileNamingPolicy, NOOP_ROLLOVER_LIFECYCLE_HOOK);
    }

    /** Constructor with all possible parameters. */
    private RotatableWriter(final File file, final FileBasedEventHandlerConfiguration configuration,
            final boolean append, final FileNamingPolicy fileNamingPolicy,
            final RolloverLifecycleHook rolloverLifecycleHook) throws IOException {
        this.file = file;
        this.fileNamingPolicy = fileNamingPolicy;
        this.rotationEnabled = configuration.getFileRotation().isRotationEnabled();
        final long lastModified = file.lastModified();
        this.lastRotationTime = lastModified > 0
                ? new DateTime(file.lastModified(), DateTimeZone.UTC)
                : DateTime.now(DateTimeZone.UTC);
        this.rolloverLifecycleHook = rolloverLifecycleHook;
        this.writer = constructWriter(file, append);
        retentionPolicies = configuration.getFileRetention().buildRetentionPolicies();
        rotationPolicies = configuration.getFileRotation().buildRotationPolicies();
        scheduleRotationAndRetentionChecks(configuration);
    }

    /**
     * Rotate the log file if any of the configured rotation policies determine that rotation is required.
     *
     * @throws IOException If unable to rotate the log file.
     */
    @Override
    public void rotateIfNeeded() throws IOException {
        if (!rotationEnabled || isRotating.get()) {
            return;
        }
        readWriteLock.writeLock().lock();
        try {
            for (RotationPolicy rotationPolicy : rotationPolicies) {
                if (rotationPolicy.shouldRotateFile(this)) {
                    if (logger.isTraceEnabled()) {
                        logger.trace("Must rotate: {}", file.getAbsolutePath());
                    }
                    isRotating.set(true);
                    if (rotate()) {
                        if (logger.isTraceEnabled()) {
                            logger.trace("Finished rotation for: {}", file.getAbsolutePath());
                        }
                    }
                    break;
                }
            }
        } finally {
            readWriteLock.writeLock().unlock();
            isRotating.set(false);
        }
    }

    /** Delete files if they need to be deleted as per enabled retention policies. */
    private void deleteFilesIfNeeded() throws IOException {
        readWriteLock.writeLock().lock();
        try {
            Set<File> filesToDelete = checkRetention(); // return the files to delete, but do not delete them
            if (!filesToDelete.isEmpty()) {
                deleteFiles(filesToDelete);
            }
        } finally {
            readWriteLock.writeLock().unlock();
        }
    }

    private boolean rotate() throws IOException {
        boolean rotationHappened = false;
        RotationContext context = new RotationContext();
        context.setWriter(writer);
        File currentFile = fileNamingPolicy.getInitialName();
        context.setInitialFile(currentFile);
        if (currentFile.exists()) {
            File newFile = fileNamingPolicy.getNextName();
            context.setNextFile(newFile);
            rotationHooks.preRotationAction(context);
            writer.close();
            if (logger.isTraceEnabled()) {
                logger.trace("Renaming {} to {}", currentFile.getAbsolutePath(), newFile.getAbsolutePath());
            }
            if (currentFile.renameTo(newFile)) {
                rotationHappened = true;
                if (currentFile.createNewFile()) {
                    writer = constructWriter(currentFile, true);
                    context.setWriter(writer);
                    rotationHooks.postRotationAction(context);
                } else {
                    logger.error("Unable to resume writing to audit file {}; further events will not be logged",
                            currentFile.toString());
                }
            } else {
                logger.error("Unable to rename the audit file {}; further events will continue to be logged to "
                        + "the current file", currentFile.toString());
                writer = constructWriter(currentFile, true);
            }
            lastRotationTime = DateTime.now(DateTimeZone.UTC);
        }
        return rotationHappened;
    }

    private Set<File> checkRetention() throws IOException {
        Set<File> filesToDelete = new HashSet<>();
        for (RetentionPolicy retentionPolicy : retentionPolicies) {
            filesToDelete.addAll(retentionPolicy.deleteFiles(fileNamingPolicy));
        }
        return filesToDelete;
    }

    private void deleteFiles(final Set<File> files) {
        for (final File file : files) {
            if (logger.isInfoEnabled()) {
                logger.info("Deleting file {}", file.getAbsolutePath());
            }
            if (!file.delete()) {
                if (logger.isWarnEnabled()) {
                    logger.warn("Could not delete file {}", file.getAbsolutePath());
                }
            }
        }
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public long getBytesWritten() {
        logger.trace("bytes written={}", meteredStream.getBytesWritten());
        return meteredStream.getBytesWritten();
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public DateTime getLastRotationTime() {
        return lastRotationTime;
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void close() throws IOException {
        if (rotator != null) {
            boolean interrupted = false;
            rotator.shutdown();
            try {
                while (!rotator.awaitTermination(500, MILLISECONDS)) {
                    logger.debug("Waiting to terminate the rotator thread.");
                }
            } catch (InterruptedException ex) {
                logger.error("Unable to terminate the rotator thread", ex);
                interrupted = true;
            } finally {
                if (interrupted) {
                    Thread.currentThread().interrupt();
                }
            }
        }
        writer.close();
    }

    @Override
    public void shutdown() {
        try {
            close();
        } catch (IOException e) {
            logger.error("Error when performing shutdown", e);
        }
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void registerRotationHooks(final RotationHooks rotationHooks) {
        this.rotationHooks = rotationHooks;
    }

    @Override
    public void write(String str) throws IOException {
        ReadLock lock = readWriteLock.readLock();
        try {
            lock.lock();
            logger.trace("Actually writing to file: {}", str);
            writer.write(str);
        } finally {
            lock.unlock();
        }
        rotateIfNeeded();
    }

    /**
     * Forces a rotation of the writer.
     *
     * @return {@code true} if rotation was done, {@code false} otherwise.
     * @throws IOException
     *          If an error occurs
     */
    public boolean forceRotation() throws IOException {
        readWriteLock.writeLock().lock();
        try {
            isRotating.set(true);
            return rotate();
        } finally {
            isRotating.set(false);
            readWriteLock.writeLock().unlock();
        }
    }

    @Override
    public void flush() throws IOException {
        writer.flush();
    }

    private BufferedWriter constructWriter(File csvFile, boolean append)
            throws IOException {
        FileOutputStream stream = new FileOutputStream(csvFile, append);
        meteredStream = new MeteredStream(stream, file.length());
        OutputStreamWriter osw = new OutputStreamWriter(meteredStream, StandardCharsets.UTF_8);
        return new BufferedWriter(osw);
    }

    /**
     * Schedule checks for rotations and retention policies.
     * <p>
     * The check interval is provided by the RotationRetentionCheckInterval property, which must have
     * a non-zero value if at least one policy is enabled.
     */
    private void scheduleRotationAndRetentionChecks(FileBasedEventHandlerConfiguration configuration)
            throws IOException {
        final Duration rotationCheckInterval = parseDuration("rotation and retention check interval",
                configuration.getRotationRetentionCheckInterval(), FIVE_SECONDS);

        if (!rotationPolicies.isEmpty() || !retentionPolicies.isEmpty()) {
            if (rotationCheckInterval.isUnlimited() || rotationCheckInterval.isZero()) {
                throw new IOException("Rotation and retention check interval set to an invalid value: "
                        + rotationCheckInterval);
            }
            rotator = Executors.newScheduledThreadPool(1);
            rotator.scheduleAtFixedRate(
                    new Runnable() {
                        @Override
                        public void run() {
                            rolloverLifecycleHook.beforeRollingOver();
                            try {
                                try {
                                    rotateIfNeeded();
                                } catch (Exception e) {
                                    logger.error("Failure when applying a rotation policy to file {}",
                                            fileNamingPolicy.getInitialName(), e);
                                }
                                try {
                                    deleteFilesIfNeeded();
                                } catch (Exception e) {
                                    logger.error("Failure when applying a retention policy to file {}",
                                            fileNamingPolicy.getInitialName(), e);
                                }
                            } finally {
                                rolloverLifecycleHook.afterRollingOver();
                            }
                        }
                    },
                    rotationCheckInterval.to(TimeUnit.MILLISECONDS),
                    rotationCheckInterval.to(TimeUnit.MILLISECONDS),
                    TimeUnit.MILLISECONDS);
        }
    }

    private Duration parseDuration(String description, String duration, Duration defaultValue) {
        try {
            return Duration.duration(duration);
        } catch (IllegalArgumentException e) {
            logger.warn("Invalid {} value: '{}'", description, duration);
            return defaultValue;
        }
    }

    @VisibleForTesting
    List<RotationPolicy> getRotationPolicies() {
        return rotationPolicies;
    }

    /**
     * A RotationRetentionCheckHook that does nothing.
     */
    public static final RolloverLifecycleHook NOOP_ROLLOVER_LIFECYCLE_HOOK =
        new RolloverLifecycleHook() {
            @Override
            public void beforeRollingOver() {
                // nothing to do
            }

            @Override
            public void afterRollingOver() {
                // nothing to do
            }
    };

    /**
     * Callback hooks to allow custom action to be taken before and after the checks for rotation and
     * retention is performed.
     */
    public interface RolloverLifecycleHook {

        /**
         * This method is called before the rotation and retention checks are done.
         */
        void beforeRollingOver();

        /**
         * This method is called after the rotation and retention checks are done.
         */
        void afterRollingOver();
    }

}