View Javadoc
1   /*
2    * The contents of this file are subject to the terms of the Common Development and
3    * Distribution License (the License). You may not use this file except in compliance with the
4    * License.
5    *
6    * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
7    * specific language governing permission and limitations under the License.
8    *
9    * When distributing Covered Software, include this CDDL Header Notice in each file and include
10   * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
11   * Header, with the fields enclosed by brackets [] replaced by your own identifying
12   * information: "Portions copyright [year] [name of copyright owner]".
13   *
14   * Copyright 2016 ForgeRock AS.
15   */
16  
17  package org.forgerock.audit.handlers.jms;
18  
19  import static org.forgerock.audit.util.ResourceExceptionsUtil.*;
20  import static org.forgerock.json.JsonValue.*;
21  import static org.forgerock.json.resource.Responses.newResourceResponse;
22  
23  import javax.inject.Inject;
24  import javax.jms.JMSException;
25  import javax.jms.MessageProducer;
26  import javax.jms.Session;
27  import java.util.Collections;
28  import java.util.List;
29  
30  import com.fasterxml.jackson.core.JsonProcessingException;
31  import com.fasterxml.jackson.databind.ObjectMapper;
32  import org.forgerock.audit.Audit;
33  import org.forgerock.audit.events.EventTopicsMetaData;
34  import org.forgerock.audit.events.handlers.AuditEventHandlerBase;
35  import org.forgerock.json.JsonValue;
36  import org.forgerock.json.resource.InternalServerErrorException;
37  import org.forgerock.json.resource.NotSupportedException;
38  import org.forgerock.json.resource.QueryRequest;
39  import org.forgerock.json.resource.QueryResourceHandler;
40  import org.forgerock.json.resource.QueryResponse;
41  import org.forgerock.json.resource.ResourceException;
42  import org.forgerock.json.resource.ResourceResponse;
43  import org.forgerock.services.context.Context;
44  import org.forgerock.util.promise.Promise;
45  import org.slf4j.Logger;
46  import org.slf4j.LoggerFactory;
47  
48  /**
49   * Publishes Audit events on a JMS Topic.
50   */
51  public class JmsAuditEventHandler extends AuditEventHandlerBase {
52      private static final Logger LOGGER = LoggerFactory.getLogger(JmsAuditEventHandler.class);
53      private static final ObjectMapper MAPPER = new ObjectMapper();
54  
55      private final JmsResourceManager jmsResourceManager;
56      private final Publisher<JsonValue> publisher;
57  
58      /**
59       * Creates a new AuditEventHandler instance that publishes JMS messages on a JMS Topic for each Audit event.
60       *
61       * @param jmsContextManager optional injected {@link JmsContextManager}.
62       * @param configuration Configuration parameters that can be adjusted by system administrators.
63       * @param eventTopicsMetaData Meta-data for all audit event topics.
64       * @throws ResourceException If JMS connections cannot be established.
65       */
66      @Inject
67      public JmsAuditEventHandler(
68              @Audit final JmsContextManager jmsContextManager,
69              final JmsAuditEventHandlerConfiguration configuration,
70              final EventTopicsMetaData eventTopicsMetaData) throws ResourceException {
71  
72          super(configuration.getName(), eventTopicsMetaData, configuration.getTopics(), configuration.isEnabled());
73  
74          publisher = buildPublisher(configuration);
75          this.jmsResourceManager =
76                  jmsContextManager == null
77                          ? new JmsResourceManager(
78                                  configuration, new JndiJmsContextManager(configuration.getJndi()))
79                          : new JmsResourceManager(configuration, jmsContextManager);
80          LOGGER.debug("Successfully configured JMS audit event handler.");
81      }
82  
83      /**
84       * Factory method for publisher.
85       *
86       * @param configuration used to determine if a batched publisher is needed or not.
87       * @return the constructed publisher.
88       */
89      Publisher<JsonValue> buildPublisher(JmsAuditEventHandlerConfiguration configuration) {
90          return configuration.getBatch().isBatchEnabled()
91                  ? new JmsBatchPublisher(configuration.getBatch())
92                  : new JmsPublisher();
93      }
94  
95      /**
96       * Creates the JMS Topic and ConnectionFactory from the context configuration settings and opens the JMS connection.
97       */
98      @Override
99      public void startup() throws ResourceException {
100         publisher.startup();
101         LOGGER.debug("JMS audit event handler is started.");
102     }
103 
104     /**
105      * Closes the JMS connection.
106      */
107     @Override
108     public void shutdown() throws ResourceException {
109         publisher.shutdown();
110         LOGGER.debug("JMS audit event handler is shutdown.");
111     }
112 
113     /**
114      * Converts the audit event into a JMS TextMessage and then publishes the message on the configured jmsTopic.
115      *
116      * @param context The context chain that initiated the event.
117      * @param auditTopic The Audit Topic for which the auditEvent was created for. (Not to be confused with a JMS Topic)
118      * @param auditEvent The event to convert to a JMS TextMessage and publish on the JMS Topic.
119      * @return a promise with either a response or an exception
120      */
121     public Promise<ResourceResponse, ResourceException> publishEvent(Context context, String auditTopic,
122             JsonValue auditEvent) {
123         try {
124             publisher.publish(json(object(
125                     field("auditTopic", auditTopic),
126                     field("event", auditEvent.getObject())
127             )));
128 
129             // Return the auditEvent as the response.
130             return newResourceResponse(
131                     auditEvent.get(ResourceResponse.FIELD_CONTENT_ID).asString(),
132                     null,
133                     auditEvent).asPromise();
134 
135         } catch (Exception ex) {
136             return adapt(ex).asPromise();
137         }
138     }
139 
140 
141     /**
142      * Publishes the list of messages using a single producer.
143      *
144      * @param messages the messages to send.
145      * @throws InternalServerErrorException if unable to publish jms messages.
146      */
147     private void publishJmsMessagesWithRetry(List<JsonValue> messages) throws InternalServerErrorException {
148         try {
149             publishJmsMessages(messages);
150         } catch (JMSException e) {
151             LOGGER.debug("Retrying publish", e);
152             try {
153                 resetConnection();
154                 publishJmsMessages(messages);
155             } catch (JMSException | ResourceException ex) {
156                 final String message = "Unable to publish JMS messages, messages are likely lost";
157                 LOGGER.error(message, e);
158                 throw new InternalServerErrorException(message, e);
159             }
160         }
161     }
162 
163     /**
164      * Publishes the list of messages using a single producer.
165      *
166      * @param messages the messages to send.
167      * @throws JMSException if unable to publish jms messages and a retry is possible.
168      *         InternalServerErrorException if unable to publish jms messages and a retry is not possible.
169      */
170     private void publishJmsMessages(List<JsonValue> messages) throws JMSException, InternalServerErrorException {
171         Session session = null;
172         try {
173             session = jmsResourceManager.createSession();
174             MessageProducer producer = null;
175             try {
176                 producer = jmsResourceManager.createProducer(session);
177                 for (JsonValue message : messages) {
178                     String text = MAPPER.writeValueAsString(message.getObject());
179                     producer.send(session.createTextMessage(text));
180                 }
181             } finally {
182                 if (null != producer) {
183                     producer.close();
184                 }
185             }
186         } catch (JMSException e) {
187             LOGGER.debug("Failed to publish messages", e);
188             throw e;
189         } catch (JsonProcessingException e) {
190             final String message = "Unable to publish JMS messages, messages are likely lost";
191             LOGGER.error(message, e);
192             throw new InternalServerErrorException(message, e);
193         } finally {
194             if (null != session) {
195                 session.close();
196             }
197         }
198     }
199 
200     /**
201      * Returns NotSupportedException as query is not implemented for JMS.
202      * <br/>
203      * {@inheritDoc}
204      * @return NotSupportedException as query is not implemented for JMS.
205      */
206     @Override
207     public Promise<QueryResponse, ResourceException> queryEvents(
208             Context context,
209             String topic,
210             QueryRequest queryRequest,
211             QueryResourceHandler queryResourceHandler) {
212         return notSupported(queryRequest).asPromise();
213     }
214 
215     /**
216      * Returns NotSupportedException as read is not implemented for JMS.
217      * <br/>
218      * {@inheritDoc}
219      * @return NotSupportedException as read is not implemented for JMS.
220      */
221     @Override
222     public Promise<ResourceResponse, ResourceException> readEvent(Context context, String topic, String resourceId) {
223         return new NotSupportedException("read operations are not supported").asPromise();
224     }
225 
226     /**
227      * Implementation of the BatchPublisher to handle publishing groups of audit event data to JMS.
228      */
229     private class JmsBatchPublisher extends BatchPublisher<JsonValue> {
230 
231         /**
232          * Constructor that passes the configuration to {@link BatchPublisher}.
233          *
234          * @param configuration config of the publisher.
235          */
236         public JmsBatchPublisher(BatchPublisherConfiguration configuration) {
237             super("JmsBatchPublisher", configuration);
238         }
239 
240         @Override
241         public void startupPublisher() throws ResourceException {
242             openJmsConnection();
243         }
244 
245         @Override
246         public void shutdownPublisher() throws ResourceException {
247             closeJmsConnection();
248         }
249 
250         @Override
251         protected void publishMessages(List<JsonValue> messages) {
252             try {
253                 publishJmsMessagesWithRetry(messages);
254             } catch (InternalServerErrorException e) {
255                 // do nothing
256             }
257         }
258     }
259 
260     /**
261      * Implementation of the Publisher to handle publishing singleton audit event data to JMS.
262      */
263     private class JmsPublisher implements Publisher<JsonValue> {
264 
265         @Override
266         public void startup() throws ResourceException {
267             openJmsConnection();
268         }
269 
270         @Override
271         public void shutdown() throws ResourceException {
272             closeJmsConnection();
273         }
274 
275         @Override
276         public void publish(JsonValue message) throws ResourceException {
277             publishJmsMessagesWithRetry(Collections.singletonList(message));
278         }
279     }
280 
281     private void openJmsConnection() throws InternalServerErrorException {
282         try {
283             jmsResourceManager.openConnection();
284         } catch (JMSException e) {
285             throw new InternalServerErrorException("trouble opening connection", e);
286         }
287     }
288 
289     private void closeJmsConnection() throws InternalServerErrorException {
290         try {
291             jmsResourceManager.closeConnection();
292         } catch (JMSException e) {
293             throw new InternalServerErrorException("trouble closing connection", e);
294         }
295     }
296 
297     private void resetConnection() throws InternalServerErrorException {
298         closeJmsConnection();
299         openJmsConnection();
300     }
301 }