AsynchronousSyslogPublisher.java

/*
 * The contents of this file are subject to the terms of the Common Development and
 * Distribution License (the License). You may not use this file except in compliance with the
 * License.
 *
 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
 * specific language governing permission and limitations under the License.
 *
 * When distributing Covered Software, include this CDDL Header Notice in each file and include
 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
 * Header, with the fields enclosed by brackets [] replaced by your own identifying
 * information: "Portions copyright [year] [name of copyright owner]".
 *
 * Copyright 2013 Cybernetica AS
 * Portions copyright 2014-2015 ForgeRock AS.
 */
package org.forgerock.audit.handlers.syslog;

import org.forgerock.util.Reject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import static org.forgerock.audit.batch.CommonAuditBatchConfiguration.POLLING_TIMEOUT;
import static org.forgerock.audit.batch.CommonAuditBatchConfiguration.POLLING_TIMEOUT_UNIT;

/**
 * SyslogPublisher that offloads message transmission to a separate thread.
 */
class AsynchronousSyslogPublisher implements SyslogPublisher {

    private static final Logger logger = LoggerFactory.getLogger(AsynchronousSyslogPublisher.class);

    /** Maximum number of messages that can be queued before producers start to block. */
    private static final int CAPACITY = 5000;

    /** SyslogConnection through which buffered messages are sent. */
    private final SyslogConnection connection;
    /** Queue to store unpublished records. */
    private final BlockingQueue<byte[]> queue;
    /** Single threaded executor which runs the WriterTask. */
    private final ExecutorService executorService;
    /** Flag for notifying the WriterTask to exit. */
    private volatile boolean stopRequested;

    /**
     * Construct a new BufferedSyslogPublisher.
     *
     * @param name
     *            the name of the thread.
     * @param connection
     *            a SyslogConnection used for output.
     */
    AsynchronousSyslogPublisher(final String name, final SyslogConnection connection) {
        Reject.ifNull(connection);
        this.connection = connection;
        this.queue = new LinkedBlockingQueue<>(CAPACITY);
        this.stopRequested = false;
        this.executorService = Executors.newSingleThreadExecutor(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable runnable) {
                return new Thread(runnable, name);
            }
        });
        executorService.execute(new WriterTask());
    }

    @Override
    public void publishMessage(String syslogMessage) throws IOException {
        boolean interrupted = false;
        while (!stopRequested) {
            // Put request on queue for writer
            try {
                queue.put(syslogMessage.getBytes(StandardCharsets.UTF_8));
                break;
            } catch (InterruptedException e) {
                // We expect this to happen. Just ignore it and hopefully
                // drop out in the next try.
                interrupted = true;
            }
        }
        if (interrupted) {
            Thread.currentThread().interrupt();
        }
    }

    @Override
    public void close() {
        stopRequested = true;

        executorService.shutdown();
        boolean interrupted = false;
        while (!executorService.isTerminated()) {
            try {
                executorService.awaitTermination(1, TimeUnit.MINUTES);
            } catch (InterruptedException e) {
                interrupted = true;
            }
        }

        // Close the wrapped publisher.
        connection.close();

        if (interrupted) {
            Thread.currentThread().interrupt();
        }
    }

    private void publishBufferedMessages(List<byte[]> syslogMessages) {
        for (byte[] syslogMessage : syslogMessages) {
            try {
                connection.reconnect();
                connection.send(syslogMessage);
            } catch (IOException ex) {
                logger.error("Error when writing a message, message size: " + syslogMessage.length, ex);
                connection.close();
            }
        }
        try {
            connection.flush();
        } catch (IOException ex) {
            logger.error("Error when flushing the connection", ex);
        }
    }

    /**
     * The publisher thread is responsible for emptying the queue of log records waiting to published.
     */
    private class WriterTask implements Runnable {

        /**
         * Runs until queue is empty AND we've been asked to terminate.
         */
        @Override
        public void run() {
            List<byte[]> drainList = new ArrayList<>(CAPACITY);

            boolean interrupted = false;
            while (!stopRequested || !queue.isEmpty()) {
                try {
                    queue.drainTo(drainList, CAPACITY);
                    if (drainList.isEmpty()) {
                        byte[] message = queue.poll(POLLING_TIMEOUT, POLLING_TIMEOUT_UNIT);
                        if (message != null) {
                            publishBufferedMessages(Arrays.asList(message));
                        }
                    } else {
                        publishBufferedMessages(drainList);
                        drainList.clear();
                    }
                } catch (InterruptedException ex) {
                    // Ignore. We'll rerun the loop
                    // and presumably fall out.
                    interrupted = true;
                }
            }
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }

}