View Javadoc
1   /*
2    * The contents of this file are subject to the terms of the Common Development and
3    * Distribution License (the License). You may not use this file except in compliance with the
4    * License.
5    *
6    * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
7    * specific language governing permission and limitations under the License.
8    *
9    * When distributing Covered Software, include this CDDL Header Notice in each file and include
10   * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
11   * Header, with the fields enclosed by brackets [] replaced by your own identifying
12   * information: "Portions copyright [year] [name of copyright owner]".
13   *
14   * Copyright 2016 ForgeRock AS.
15   */
16  package org.forgerock.audit.handlers.jms;
17  
18  import java.util.ArrayList;
19  import java.util.Collections;
20  import java.util.List;
21  import java.util.concurrent.BlockingQueue;
22  import java.util.concurrent.ExecutorService;
23  import java.util.concurrent.Executors;
24  import java.util.concurrent.LinkedBlockingQueue;
25  import java.util.concurrent.ThreadFactory;
26  import java.util.concurrent.TimeUnit;
27  
28  import org.forgerock.json.resource.ResourceException;
29  import org.forgerock.util.Reject;
30  import org.slf4j.Logger;
31  import org.slf4j.LoggerFactory;
32  
33  import static org.forgerock.audit.batch.CommonAuditBatchConfiguration.POLLING_TIMEOUT;
34  import static org.forgerock.audit.batch.CommonAuditBatchConfiguration.POLLING_TIMEOUT_UNIT;
35  
36  /**
37   * Generic publisher that will queue anything for batch processing.
38   *
39   * @param <T> This is the type of object that will be queued before publishing.
40   */
41  public abstract class BatchPublisher<T> implements Publisher<T> {
42      private static final Logger logger = LoggerFactory.getLogger(BatchPublisher.class);
43  
44      private final BlockingQueue<T> queue;
45      private final ExecutorService executorService;
46      private final long insertTimeoutSec;
47      private final long shutdownTimeoutSec;
48      private volatile boolean stopRequested;
49      private final int maxBatchedEvents;
50  
51      /**
52       * This constructs the thread pool of worker threads.  The pool is not executed until {@link #startup()}.
53       *
54       * @param name Name given to the thread pool worker threads.
55       * @param configuration queue management and thread pool configuration settings.
56       */
57      public BatchPublisher(final String name, final BatchPublisherConfiguration configuration) {
58          Reject.ifNull(configuration, "Batch configuration can't be null.");
59          Reject.ifFalse(configuration.getThreadCount() > 0, "ThreadCount must be greater than 0");
60          Reject.ifFalse(configuration.getCapacity() > 0, "Capacity must be greater than 0");
61          Reject.ifFalse(configuration.getMaxBatchedEvents() > 0, "MaxBatchedEvents must be greater than 0");
62          this.queue = new LinkedBlockingQueue<>(configuration.getCapacity());
63          this.maxBatchedEvents = configuration.getMaxBatchedEvents();
64          this.insertTimeoutSec = configuration.getInsertTimeoutSec();
65          this.shutdownTimeoutSec = configuration.getShutdownTimeoutSec();
66          this.stopRequested = false;
67          this.executorService = Executors.newFixedThreadPool(configuration.getThreadCount(), new ThreadFactory() {
68              @Override
69              public Thread newThread(Runnable runnable) {
70                  return new Thread(runnable, name);
71              }
72          });
73      }
74  
75      /**
76       * This is invoked by {@link #startup()}.  This should be implemented to initialize any resources that need
77       * to be started when the publisher is started. For example, opening shared connections to remote services.
78       *
79       * @throws ResourceException if there is trouble starting the publisher.
80       */
81      protected abstract void startupPublisher() throws ResourceException;
82  
83      /**
84       * This is invoked by {@link #shutdown()}.  This should be implemented to clean up any resources that were
85       * initialized in startup. For exmaple, closing the connections to remote services.
86       *
87       * @throws ResourceException if there is trouble shutting down the publisher.
88       */
89      protected abstract void shutdownPublisher() throws ResourceException;
90  
91      /**
92       * This is invoked by the worker threads to have the passed in messages published immediately.
93       *
94       * @param messages the messages to publish immediately.
95       */
96      protected abstract void publishMessages(List<T> messages);
97  
98      /**
99       * This first initializes the worker threads that monitor the queue of items to publish, and then calls
100      * {@link #startupPublisher()}.
101      *
102      * @throws ResourceException If there is trouble starting up the publisher or starting the worker threads.
103      */
104     @Override
105     public final void startup() throws ResourceException {
106         stopRequested = false;
107         this.executorService.execute(new PublishTask());
108         startupPublisher();
109     }
110 
111     /**
112      * This shutdowns the worker threads, and then calls {@link #shutdownPublisher()}.
113      *
114      * @throws ResourceException if there is trouble shutting down the publisher or stopping the worker threads.
115      */
116     @Override
117     public final void shutdown() throws ResourceException {
118         stopRequested = true;
119         executorService.shutdown();
120         boolean interrupted = false;
121         while (!executorService.isTerminated()) {
122             try {
123                 executorService.awaitTermination(shutdownTimeoutSec, TimeUnit.SECONDS);
124             } catch (InterruptedException e) {
125                 interrupted = true;
126             }
127         }
128         shutdownPublisher();
129         if (interrupted) {
130             Thread.currentThread().interrupt();
131         }
132     }
133 
134     /**
135      * Offers the message to the queue.  If the offer isn't accepted for 1 minute, the message is lost.
136      *
137      * @param message the message to queue.
138      */
139     @Override
140     public final void publish(T message) {
141         boolean interrupted = false;
142         while (!stopRequested) {
143             // Put request on queue for worker thread
144             try {
145                 if (queue.offer(message, insertTimeoutSec, TimeUnit.SECONDS)) {
146                     break;
147                 } else {
148                     logger.info(getClass() + " was blocked from queueing. Perhaps more worker threads are needed.");
149                 }
150             } catch (InterruptedException e) {
151                 // We expect this to happen. Just ignore it and hopefully
152                 // drop out in the next try.
153                 interrupted = true;
154             }
155         }
156         if (interrupted) {
157             Thread.currentThread().interrupt();
158         }
159     }
160 
161     /**
162      * This runnable defines the logic of the worker threads that process the queue.
163      *
164      * @see BlockingQueue#drainTo(java.util.Collection, int)
165      * @see BlockingQueue#poll(long, TimeUnit)
166      * @see Executors#newFixedThreadPool(int, ThreadFactory)
167      */
168     private class PublishTask implements Runnable {
169 
170         /**
171          * While the queue isn't empty this will drain the queue into a list and process them in a single call to
172          * {@link #publishMessages(List)}. <br/>
173          * If the drain results in an empty list, then this will poll for a single item and process that item as a
174          * singleton batch. <br/>
175          * If the poll timeouts ({@link BatchPublisherConfiguration#pollTimeoutSec }), and the queue is still
176          * empty, then the run will exit.<br/>
177          */
178         @Override
179         public void run() {
180             List<T> drainList = new ArrayList<>(maxBatchedEvents);
181 
182             boolean interrupted = false;
183             while (!stopRequested || !queue.isEmpty()) {
184                 try {
185                     queue.drainTo(drainList, maxBatchedEvents);
186                     if (drainList.isEmpty()) {
187                         T message = queue.poll(POLLING_TIMEOUT, POLLING_TIMEOUT_UNIT);
188                         if (message != null) {
189                             publishMessages(Collections.singletonList(message));
190                         }
191                     } else {
192                         publishMessages(drainList);
193                         drainList.clear();
194                     }
195                 } catch (InterruptedException ex) {
196                     // Ignore. We'll rerun the loop
197                     // and presumably fall out.
198                     interrupted = true;
199                 }
200             }
201             if (interrupted) {
202                 Thread.currentThread().interrupt();
203             }
204         }
205     }
206 }