1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.forgerock.audit.handlers.syslog;
18
19 import org.forgerock.util.Reject;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22
23 import java.io.IOException;
24 import java.nio.charset.StandardCharsets;
25 import java.util.ArrayList;
26 import java.util.Arrays;
27 import java.util.List;
28 import java.util.concurrent.BlockingQueue;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.LinkedBlockingQueue;
32 import java.util.concurrent.ThreadFactory;
33 import java.util.concurrent.TimeUnit;
34
35 import static org.forgerock.audit.batch.CommonAuditBatchConfiguration.POLLING_TIMEOUT;
36 import static org.forgerock.audit.batch.CommonAuditBatchConfiguration.POLLING_TIMEOUT_UNIT;
37
38
39
40
41 class AsynchronousSyslogPublisher implements SyslogPublisher {
42
43 private static final Logger logger = LoggerFactory.getLogger(AsynchronousSyslogPublisher.class);
44
45
46 private static final int CAPACITY = 5000;
47
48
49 private final SyslogConnection connection;
50
51 private final BlockingQueue<byte[]> queue;
52
53 private final ExecutorService executorService;
54
55 private volatile boolean stopRequested;
56
57
58
59
60
61
62
63
64
65 AsynchronousSyslogPublisher(final String name, final SyslogConnection connection) {
66 Reject.ifNull(connection);
67 this.connection = connection;
68 this.queue = new LinkedBlockingQueue<>(CAPACITY);
69 this.stopRequested = false;
70 this.executorService = Executors.newSingleThreadExecutor(new ThreadFactory() {
71 @Override
72 public Thread newThread(Runnable runnable) {
73 return new Thread(runnable, name);
74 }
75 });
76 executorService.execute(new WriterTask());
77 }
78
79 @Override
80 public void publishMessage(String syslogMessage) throws IOException {
81 boolean interrupted = false;
82 while (!stopRequested) {
83
84 try {
85 queue.put(syslogMessage.getBytes(StandardCharsets.UTF_8));
86 break;
87 } catch (InterruptedException e) {
88
89
90 interrupted = true;
91 }
92 }
93 if (interrupted) {
94 Thread.currentThread().interrupt();
95 }
96 }
97
98 @Override
99 public void close() {
100 stopRequested = true;
101
102 executorService.shutdown();
103 boolean interrupted = false;
104 while (!executorService.isTerminated()) {
105 try {
106 executorService.awaitTermination(1, TimeUnit.MINUTES);
107 } catch (InterruptedException e) {
108 interrupted = true;
109 }
110 }
111
112
113 connection.close();
114
115 if (interrupted) {
116 Thread.currentThread().interrupt();
117 }
118 }
119
120 private void publishBufferedMessages(List<byte[]> syslogMessages) {
121 for (byte[] syslogMessage : syslogMessages) {
122 try {
123 connection.reconnect();
124 connection.send(syslogMessage);
125 } catch (IOException ex) {
126 logger.error("Error when writing a message, message size: " + syslogMessage.length, ex);
127 connection.close();
128 }
129 }
130 try {
131 connection.flush();
132 } catch (IOException ex) {
133 logger.error("Error when flushing the connection", ex);
134 }
135 }
136
137
138
139
140 private class WriterTask implements Runnable {
141
142
143
144
145 @Override
146 public void run() {
147 List<byte[]> drainList = new ArrayList<>(CAPACITY);
148
149 boolean interrupted = false;
150 while (!stopRequested || !queue.isEmpty()) {
151 try {
152 queue.drainTo(drainList, CAPACITY);
153 if (drainList.isEmpty()) {
154 byte[] message = queue.poll(POLLING_TIMEOUT, POLLING_TIMEOUT_UNIT);
155 if (message != null) {
156 publishBufferedMessages(Arrays.asList(message));
157 }
158 } else {
159 publishBufferedMessages(drainList);
160 drainList.clear();
161 }
162 } catch (InterruptedException ex) {
163
164
165 interrupted = true;
166 }
167 }
168 if (interrupted) {
169 Thread.currentThread().interrupt();
170 }
171 }
172 }
173
174 }