1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.forgerock.audit.handlers.jms;
17
18 import java.util.ArrayList;
19 import java.util.Collections;
20 import java.util.List;
21 import java.util.concurrent.BlockingQueue;
22 import java.util.concurrent.ExecutorService;
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.ThreadFactory;
26 import java.util.concurrent.TimeUnit;
27
28 import org.forgerock.json.resource.ResourceException;
29 import org.forgerock.util.Reject;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 import static org.forgerock.audit.batch.CommonAuditBatchConfiguration.POLLING_TIMEOUT;
34 import static org.forgerock.audit.batch.CommonAuditBatchConfiguration.POLLING_TIMEOUT_UNIT;
35
36
37
38
39
40
41 public abstract class BatchPublisher<T> implements Publisher<T> {
42 private static final Logger logger = LoggerFactory.getLogger(BatchPublisher.class);
43
44 private final BlockingQueue<T> queue;
45 private final ExecutorService executorService;
46 private final long insertTimeoutSec;
47 private final long shutdownTimeoutSec;
48 private volatile boolean stopRequested;
49 private final int maxBatchedEvents;
50
51
52
53
54
55
56
57 public BatchPublisher(final String name, final BatchPublisherConfiguration configuration) {
58 Reject.ifNull(configuration, "Batch configuration can't be null.");
59 Reject.ifFalse(configuration.getThreadCount() > 0, "ThreadCount must be greater than 0");
60 Reject.ifFalse(configuration.getCapacity() > 0, "Capacity must be greater than 0");
61 Reject.ifFalse(configuration.getMaxBatchedEvents() > 0, "MaxBatchedEvents must be greater than 0");
62 this.queue = new LinkedBlockingQueue<>(configuration.getCapacity());
63 this.maxBatchedEvents = configuration.getMaxBatchedEvents();
64 this.insertTimeoutSec = configuration.getInsertTimeoutSec();
65 this.shutdownTimeoutSec = configuration.getShutdownTimeoutSec();
66 this.stopRequested = false;
67 this.executorService = Executors.newFixedThreadPool(configuration.getThreadCount(), new ThreadFactory() {
68 @Override
69 public Thread newThread(Runnable runnable) {
70 return new Thread(runnable, name);
71 }
72 });
73 }
74
75
76
77
78
79
80
81 protected abstract void startupPublisher() throws ResourceException;
82
83
84
85
86
87
88
89 protected abstract void shutdownPublisher() throws ResourceException;
90
91
92
93
94
95
96 protected abstract void publishMessages(List<T> messages);
97
98
99
100
101
102
103
104 @Override
105 public final void startup() throws ResourceException {
106 stopRequested = false;
107 this.executorService.execute(new PublishTask());
108 startupPublisher();
109 }
110
111
112
113
114
115
116 @Override
117 public final void shutdown() throws ResourceException {
118 stopRequested = true;
119 executorService.shutdown();
120 boolean interrupted = false;
121 while (!executorService.isTerminated()) {
122 try {
123 executorService.awaitTermination(shutdownTimeoutSec, TimeUnit.SECONDS);
124 } catch (InterruptedException e) {
125 interrupted = true;
126 }
127 }
128 shutdownPublisher();
129 if (interrupted) {
130 Thread.currentThread().interrupt();
131 }
132 }
133
134
135
136
137
138
139 @Override
140 public final void publish(T message) {
141 boolean interrupted = false;
142 while (!stopRequested) {
143
144 try {
145 if (queue.offer(message, insertTimeoutSec, TimeUnit.SECONDS)) {
146 break;
147 } else {
148 logger.info(getClass() + " was blocked from queueing. Perhaps more worker threads are needed.");
149 }
150 } catch (InterruptedException e) {
151
152
153 interrupted = true;
154 }
155 }
156 if (interrupted) {
157 Thread.currentThread().interrupt();
158 }
159 }
160
161
162
163
164
165
166
167
168 private class PublishTask implements Runnable {
169
170
171
172
173
174
175
176
177
178 @Override
179 public void run() {
180 List<T> drainList = new ArrayList<>(maxBatchedEvents);
181
182 boolean interrupted = false;
183 while (!stopRequested || !queue.isEmpty()) {
184 try {
185 queue.drainTo(drainList, maxBatchedEvents);
186 if (drainList.isEmpty()) {
187 T message = queue.poll(POLLING_TIMEOUT, POLLING_TIMEOUT_UNIT);
188 if (message != null) {
189 publishMessages(Collections.singletonList(message));
190 }
191 } else {
192 publishMessages(drainList);
193 drainList.clear();
194 }
195 } catch (InterruptedException ex) {
196
197
198 interrupted = true;
199 }
200 }
201 if (interrupted) {
202 Thread.currentThread().interrupt();
203 }
204 }
205 }
206 }