View Javadoc
1   /*
2    * The contents of this file are subject to the terms of the Common Development and
3    * Distribution License (the License). You may not use this file except in compliance with the
4    * License.
5    *
6    * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
7    * specific language governing permission and limitations under the License.
8    *
9    * When distributing Covered Software, include this CDDL Header Notice in each file and include
10   * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
11   * Header, with the fields enclosed by brackets [] replaced by your own identifying
12   * information: "Portions copyright [year] [name of copyright owner]".
13   *
14   * Copyright 2013 Cybernetica AS
15   * Portions copyright 2014-2015 ForgeRock AS.
16   */
17  package org.forgerock.audit.handlers.syslog;
18  
19  import org.forgerock.util.Reject;
20  import org.slf4j.Logger;
21  import org.slf4j.LoggerFactory;
22  
23  import java.io.IOException;
24  import java.nio.charset.StandardCharsets;
25  import java.util.ArrayList;
26  import java.util.Arrays;
27  import java.util.List;
28  import java.util.concurrent.BlockingQueue;
29  import java.util.concurrent.ExecutorService;
30  import java.util.concurrent.Executors;
31  import java.util.concurrent.LinkedBlockingQueue;
32  import java.util.concurrent.ThreadFactory;
33  import java.util.concurrent.TimeUnit;
34  
35  import static org.forgerock.audit.batch.CommonAuditBatchConfiguration.POLLING_TIMEOUT;
36  import static org.forgerock.audit.batch.CommonAuditBatchConfiguration.POLLING_TIMEOUT_UNIT;
37  
38  /**
39   * SyslogPublisher that offloads message transmission to a separate thread.
40   */
41  class AsynchronousSyslogPublisher implements SyslogPublisher {
42  
43      private static final Logger logger = LoggerFactory.getLogger(AsynchronousSyslogPublisher.class);
44  
45      /** Maximum number of messages that can be queued before producers start to block. */
46      private static final int CAPACITY = 5000;
47  
48      /** SyslogConnection through which buffered messages are sent. */
49      private final SyslogConnection connection;
50      /** Queue to store unpublished records. */
51      private final BlockingQueue<byte[]> queue;
52      /** Single threaded executor which runs the WriterTask. */
53      private final ExecutorService executorService;
54      /** Flag for notifying the WriterTask to exit. */
55      private volatile boolean stopRequested;
56  
57      /**
58       * Construct a new BufferedSyslogPublisher.
59       *
60       * @param name
61       *            the name of the thread.
62       * @param connection
63       *            a SyslogConnection used for output.
64       */
65      AsynchronousSyslogPublisher(final String name, final SyslogConnection connection) {
66          Reject.ifNull(connection);
67          this.connection = connection;
68          this.queue = new LinkedBlockingQueue<>(CAPACITY);
69          this.stopRequested = false;
70          this.executorService = Executors.newSingleThreadExecutor(new ThreadFactory() {
71              @Override
72              public Thread newThread(Runnable runnable) {
73                  return new Thread(runnable, name);
74              }
75          });
76          executorService.execute(new WriterTask());
77      }
78  
79      @Override
80      public void publishMessage(String syslogMessage) throws IOException {
81          boolean interrupted = false;
82          while (!stopRequested) {
83              // Put request on queue for writer
84              try {
85                  queue.put(syslogMessage.getBytes(StandardCharsets.UTF_8));
86                  break;
87              } catch (InterruptedException e) {
88                  // We expect this to happen. Just ignore it and hopefully
89                  // drop out in the next try.
90                  interrupted = true;
91              }
92          }
93          if (interrupted) {
94              Thread.currentThread().interrupt();
95          }
96      }
97  
98      @Override
99      public void close() {
100         stopRequested = true;
101 
102         executorService.shutdown();
103         boolean interrupted = false;
104         while (!executorService.isTerminated()) {
105             try {
106                 executorService.awaitTermination(1, TimeUnit.MINUTES);
107             } catch (InterruptedException e) {
108                 interrupted = true;
109             }
110         }
111 
112         // Close the wrapped publisher.
113         connection.close();
114 
115         if (interrupted) {
116             Thread.currentThread().interrupt();
117         }
118     }
119 
120     private void publishBufferedMessages(List<byte[]> syslogMessages) {
121         for (byte[] syslogMessage : syslogMessages) {
122             try {
123                 connection.reconnect();
124                 connection.send(syslogMessage);
125             } catch (IOException ex) {
126                 logger.error("Error when writing a message, message size: " + syslogMessage.length, ex);
127                 connection.close();
128             }
129         }
130         try {
131             connection.flush();
132         } catch (IOException ex) {
133             logger.error("Error when flushing the connection", ex);
134         }
135     }
136 
137     /**
138      * The publisher thread is responsible for emptying the queue of log records waiting to published.
139      */
140     private class WriterTask implements Runnable {
141 
142         /**
143          * Runs until queue is empty AND we've been asked to terminate.
144          */
145         @Override
146         public void run() {
147             List<byte[]> drainList = new ArrayList<>(CAPACITY);
148 
149             boolean interrupted = false;
150             while (!stopRequested || !queue.isEmpty()) {
151                 try {
152                     queue.drainTo(drainList, CAPACITY);
153                     if (drainList.isEmpty()) {
154                         byte[] message = queue.poll(POLLING_TIMEOUT, POLLING_TIMEOUT_UNIT);
155                         if (message != null) {
156                             publishBufferedMessages(Arrays.asList(message));
157                         }
158                     } else {
159                         publishBufferedMessages(drainList);
160                         drainList.clear();
161                     }
162                 } catch (InterruptedException ex) {
163                     // Ignore. We'll rerun the loop
164                     // and presumably fall out.
165                     interrupted = true;
166                 }
167             }
168             if (interrupted) {
169                 Thread.currentThread().interrupt();
170             }
171         }
172     }
173 
174 }