AsynchronousTextWriter.java
/*
* The contents of this file are subject to the terms of the Common Development and
* Distribution License (the License). You may not use this file except in compliance with the
* License.
*
* You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
* specific language governing permission and limitations under the License.
*
* When distributing Covered Software, include this CDDL Header Notice in each file and include
* the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
* Header, with the fields enclosed by brackets [] replaced by your own identifying
* information: "Portions copyright [year] [name of copyright owner]".
*
* Copyright 2006-2008 Sun Microsystems, Inc.
* Portions Copyright 2013-2015 ForgeRock AS.
*/
package org.forgerock.audit.events.handlers.writers;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.forgerock.util.Reject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.forgerock.audit.batch.CommonAuditBatchConfiguration.POLLING_TIMEOUT;
import static org.forgerock.audit.batch.CommonAuditBatchConfiguration.POLLING_TIMEOUT_UNIT;
/**
* A Text Writer which writes log records asynchronously to character-based stream.
* <p>
* The records are buffered in a queue and written asynchronously. If maximum CAPACITY of the queue is
* reached, then calls to {@code write()} method are blocked. This prevent OOM errors while allowing
* good write performances.
*/
public class AsynchronousTextWriter implements TextWriter {
private static final Logger logger = LoggerFactory.getLogger(AsynchronousTextWriter.class);
/** Maximum number of messages that can be queued before producers start to block. */
private static final int CAPACITY = 5000;
/** The wrapped Text Writer. */
private final TextWriter writer;
/** Queue to store unpublished records. */
private final BlockingQueue<String> queue;
/** Single threaded executor which runs the WriterTask. */
private final ExecutorService executorService;
/** Flag for determining if the wrapped TextWriter should be flushed after each event is written. */
private final boolean autoFlush;
/** Flag for notifying the WriterTask to exit. */
private volatile boolean stopRequested;
/**
* Construct a new AsynchronousTextWriter wrapper.
*
* @param name
* the name of the thread.
* @param autoFlush
* indicates if the underlying writer should be flushed after the queue is flushed.
* @param writer
* a character stream used for output.
*/
public AsynchronousTextWriter(final String name, final boolean autoFlush, final TextWriter writer) {
Reject.ifNull(writer);
this.autoFlush = autoFlush;
this.writer = writer;
this.queue = new LinkedBlockingQueue<>(CAPACITY);
this.stopRequested = false;
this.executorService = Executors.newSingleThreadExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
return new Thread(runnable, name);
}
});
executorService.execute(new WriterTask());
}
/**
* The publisher thread is responsible for emptying the queue of log records waiting to published.
*/
private class WriterTask implements Runnable {
/**
* Runs until queue is empty AND we've been asked to terminate.
*/
@Override
public void run() {
List<String> drainList = new ArrayList<>(CAPACITY);
boolean interrupted = false;
while (!stopRequested || !queue.isEmpty()) {
try {
queue.drainTo(drainList, CAPACITY);
if (drainList.isEmpty()) {
String message = queue.poll(POLLING_TIMEOUT, POLLING_TIMEOUT_UNIT);
if (message != null) {
writeMessage(message);
if (autoFlush) {
flush();
}
}
} else {
for (String message : drainList) {
writeMessage(message);
}
drainList.clear();
if (autoFlush) {
flush();
}
}
} catch (InterruptedException ex) {
// Ignore. We'll rerun the loop
// and presumably fall out.
interrupted = true;
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
private void writeMessage(String message) {
try {
writer.write(message);
} catch (IOException e) {
logger.error("Error when writing a message, message size: " + message.length(), e);
}
}
/**
* Write the log record asynchronously.
*
* @param record
* the log record to write.
*/
@Override
public void write(String record) throws IOException {
boolean interrupted = false;
boolean enqueued = false;
while (!stopRequested) {
// Put request on queue for writer
try {
queue.put(record);
enqueued = true;
break;
} catch (InterruptedException e) {
// We expect this to happen. Just ignore it and hopefully
// drop out in the next try.
interrupted = true;
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
// Inform caller if this writer has been shutdown
if (!enqueued) {
throw new IOException("Writer closed");
}
}
@Override
public void flush() {
try {
writer.flush();
} catch (IOException e) {
logger.error("Error when flushing the writer", e);
}
}
/** {@inheritDoc} */
@Override
public long getBytesWritten() {
return writer.getBytesWritten();
}
/**
* Retrieves the wrapped writer.
*
* @return The wrapped writer used by this asynchronous writer.
*/
public TextWriter getWrappedWriter() {
return writer;
}
/** {@inheritDoc} */
@Override
public void shutdown() {
shutdown(true);
}
/**
* Releases any resources held by the writer.
*
* @param shutdownWrapped
* If the wrapped writer should be closed as well.
*/
public void shutdown(boolean shutdownWrapped) {
stopRequested = true;
// Wait for writer thread to terminate
executorService.shutdown();
boolean interrupted = false;
while (!executorService.isTerminated()) {
try {
executorService.awaitTermination(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
interrupted = true;
}
}
// Shutdown the wrapped writer.
if (shutdownWrapped) {
writer.shutdown();
}
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}