1 /*
2 * The contents of this file are subject to the terms of the Common Development and
3 * Distribution License (the License). You may not use this file except in compliance with the
4 * License.
5 *
6 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
7 * specific language governing permission and limitations under the License.
8 *
9 * When distributing Covered Software, include this CDDL Header Notice in each file and include
10 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
11 * Header, with the fields enclosed by brackets [] replaced by your own identifying
12 * information: "Portions copyright [year] [name of copyright owner]".
13 *
14 * Copyright 2016 ForgeRock AS.
15 */
16 package org.forgerock.audit.handlers.jms;
17
18 import java.util.ArrayList;
19 import java.util.Collections;
20 import java.util.List;
21 import java.util.concurrent.BlockingQueue;
22 import java.util.concurrent.ExecutorService;
23 import java.util.concurrent.Executors;
24 import java.util.concurrent.LinkedBlockingQueue;
25 import java.util.concurrent.ThreadFactory;
26 import java.util.concurrent.TimeUnit;
27
28 import org.forgerock.json.resource.ResourceException;
29 import org.forgerock.util.Reject;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 import static org.forgerock.audit.batch.CommonAuditBatchConfiguration.POLLING_TIMEOUT;
34 import static org.forgerock.audit.batch.CommonAuditBatchConfiguration.POLLING_TIMEOUT_UNIT;
35
36 /**
37 * Generic publisher that will queue anything for batch processing.
38 *
39 * @param <T> This is the type of object that will be queued before publishing.
40 */
41 public abstract class BatchPublisher<T> implements Publisher<T> {
42 private static final Logger logger = LoggerFactory.getLogger(BatchPublisher.class);
43
44 private final BlockingQueue<T> queue;
45 private final ExecutorService executorService;
46 private final long insertTimeoutSec;
47 private final long shutdownTimeoutSec;
48 private volatile boolean stopRequested;
49 private final int maxBatchedEvents;
50
51 /**
52 * This constructs the thread pool of worker threads. The pool is not executed until {@link #startup()}.
53 *
54 * @param name Name given to the thread pool worker threads.
55 * @param configuration queue management and thread pool configuration settings.
56 */
57 public BatchPublisher(final String name, final BatchPublisherConfiguration configuration) {
58 Reject.ifNull(configuration, "Batch configuration can't be null.");
59 Reject.ifFalse(configuration.getThreadCount() > 0, "ThreadCount must be greater than 0");
60 Reject.ifFalse(configuration.getCapacity() > 0, "Capacity must be greater than 0");
61 Reject.ifFalse(configuration.getMaxBatchedEvents() > 0, "MaxBatchedEvents must be greater than 0");
62 this.queue = new LinkedBlockingQueue<>(configuration.getCapacity());
63 this.maxBatchedEvents = configuration.getMaxBatchedEvents();
64 this.insertTimeoutSec = configuration.getInsertTimeoutSec();
65 this.shutdownTimeoutSec = configuration.getShutdownTimeoutSec();
66 this.stopRequested = false;
67 this.executorService = Executors.newFixedThreadPool(configuration.getThreadCount(), new ThreadFactory() {
68 @Override
69 public Thread newThread(Runnable runnable) {
70 return new Thread(runnable, name);
71 }
72 });
73 }
74
75 /**
76 * This is invoked by {@link #startup()}. This should be implemented to initialize any resources that need
77 * to be started when the publisher is started. For example, opening shared connections to remote services.
78 *
79 * @throws ResourceException if there is trouble starting the publisher.
80 */
81 protected abstract void startupPublisher() throws ResourceException;
82
83 /**
84 * This is invoked by {@link #shutdown()}. This should be implemented to clean up any resources that were
85 * initialized in startup. For exmaple, closing the connections to remote services.
86 *
87 * @throws ResourceException if there is trouble shutting down the publisher.
88 */
89 protected abstract void shutdownPublisher() throws ResourceException;
90
91 /**
92 * This is invoked by the worker threads to have the passed in messages published immediately.
93 *
94 * @param messages the messages to publish immediately.
95 */
96 protected abstract void publishMessages(List<T> messages);
97
98 /**
99 * This first initializes the worker threads that monitor the queue of items to publish, and then calls
100 * {@link #startupPublisher()}.
101 *
102 * @throws ResourceException If there is trouble starting up the publisher or starting the worker threads.
103 */
104 @Override
105 public final void startup() throws ResourceException {
106 stopRequested = false;
107 this.executorService.execute(new PublishTask());
108 startupPublisher();
109 }
110
111 /**
112 * This shutdowns the worker threads, and then calls {@link #shutdownPublisher()}.
113 *
114 * @throws ResourceException if there is trouble shutting down the publisher or stopping the worker threads.
115 */
116 @Override
117 public final void shutdown() throws ResourceException {
118 stopRequested = true;
119 executorService.shutdown();
120 boolean interrupted = false;
121 while (!executorService.isTerminated()) {
122 try {
123 executorService.awaitTermination(shutdownTimeoutSec, TimeUnit.SECONDS);
124 } catch (InterruptedException e) {
125 interrupted = true;
126 }
127 }
128 shutdownPublisher();
129 if (interrupted) {
130 Thread.currentThread().interrupt();
131 }
132 }
133
134 /**
135 * Offers the message to the queue. If the offer isn't accepted for 1 minute, the message is lost.
136 *
137 * @param message the message to queue.
138 */
139 @Override
140 public final void publish(T message) {
141 boolean interrupted = false;
142 while (!stopRequested) {
143 // Put request on queue for worker thread
144 try {
145 if (queue.offer(message, insertTimeoutSec, TimeUnit.SECONDS)) {
146 break;
147 } else {
148 logger.info(getClass() + " was blocked from queueing. Perhaps more worker threads are needed.");
149 }
150 } catch (InterruptedException e) {
151 // We expect this to happen. Just ignore it and hopefully
152 // drop out in the next try.
153 interrupted = true;
154 }
155 }
156 if (interrupted) {
157 Thread.currentThread().interrupt();
158 }
159 }
160
161 /**
162 * This runnable defines the logic of the worker threads that process the queue.
163 *
164 * @see BlockingQueue#drainTo(java.util.Collection, int)
165 * @see BlockingQueue#poll(long, TimeUnit)
166 * @see Executors#newFixedThreadPool(int, ThreadFactory)
167 */
168 private class PublishTask implements Runnable {
169
170 /**
171 * While the queue isn't empty this will drain the queue into a list and process them in a single call to
172 * {@link #publishMessages(List)}. <br/>
173 * If the drain results in an empty list, then this will poll for a single item and process that item as a
174 * singleton batch. <br/>
175 * If the poll timeouts ({@link BatchPublisherConfiguration#pollTimeoutSec }), and the queue is still
176 * empty, then the run will exit.<br/>
177 */
178 @Override
179 public void run() {
180 List<T> drainList = new ArrayList<>(maxBatchedEvents);
181
182 boolean interrupted = false;
183 while (!stopRequested || !queue.isEmpty()) {
184 try {
185 queue.drainTo(drainList, maxBatchedEvents);
186 if (drainList.isEmpty()) {
187 T message = queue.poll(POLLING_TIMEOUT, POLLING_TIMEOUT_UNIT);
188 if (message != null) {
189 publishMessages(Collections.singletonList(message));
190 }
191 } else {
192 publishMessages(drainList);
193 drainList.clear();
194 }
195 } catch (InterruptedException ex) {
196 // Ignore. We'll rerun the loop
197 // and presumably fall out.
198 interrupted = true;
199 }
200 }
201 if (interrupted) {
202 Thread.currentThread().interrupt();
203 }
204 }
205 }
206 }