View Javadoc
1   /*
2    * The contents of this file are subject to the terms of the Common Development and
3    * Distribution License (the License). You may not use this file except in compliance with the
4    * License.
5    *
6    * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
7    * specific language governing permission and limitations under the License.
8    *
9    * When distributing Covered Software, include this CDDL Header Notice in each file and include
10   * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
11   * Header, with the fields enclosed by brackets [] replaced by your own identifying
12   * information: "Portions copyright [year] [name of copyright owner]".
13   *
14   * Copyright 2016 ForgeRock AS.
15   */
16  
17  package org.forgerock.audit.handlers.jms;
18  
19  import static org.forgerock.audit.util.ResourceExceptionsUtil.adapt;
20  import static org.forgerock.audit.util.ResourceExceptionsUtil.notSupported;
21  import static org.forgerock.json.JsonValue.field;
22  import static org.forgerock.json.JsonValue.json;
23  import static org.forgerock.json.JsonValue.object;
24  import static org.forgerock.json.resource.Responses.newResourceResponse;
25  
26  import com.fasterxml.jackson.core.JsonProcessingException;
27  import com.fasterxml.jackson.databind.ObjectMapper;
28  import jakarta.inject.Inject;
29  import jakarta.jms.JMSException;
30  import jakarta.jms.MessageProducer;
31  import jakarta.jms.Session;
32  import java.util.Collections;
33  import java.util.List;
34  import org.forgerock.audit.Audit;
35  import org.forgerock.audit.events.EventTopicsMetaData;
36  import org.forgerock.audit.events.handlers.AuditEventHandlerBase;
37  import org.forgerock.json.JsonValue;
38  import org.forgerock.json.resource.InternalServerErrorException;
39  import org.forgerock.json.resource.NotSupportedException;
40  import org.forgerock.json.resource.QueryRequest;
41  import org.forgerock.json.resource.QueryResourceHandler;
42  import org.forgerock.json.resource.QueryResponse;
43  import org.forgerock.json.resource.ResourceException;
44  import org.forgerock.json.resource.ResourceResponse;
45  import org.forgerock.services.context.Context;
46  import org.forgerock.util.promise.Promise;
47  import org.slf4j.Logger;
48  import org.slf4j.LoggerFactory;
49  
50  /**
51   * Publishes Audit events on a JMS Topic.
52   */
53  public class JmsAuditEventHandler extends AuditEventHandlerBase {
54      private static final Logger LOGGER = LoggerFactory.getLogger(JmsAuditEventHandler.class);
55      private static final ObjectMapper MAPPER = new ObjectMapper();
56  
57      private final JmsResourceManager jmsResourceManager;
58      private final Publisher<JsonValue> publisher;
59  
60      /**
61       * Creates a new AuditEventHandler instance that publishes JMS messages on a JMS Topic for each Audit event.
62       *
63       * @param jmsContextManager optional injected {@link JmsContextManager}.
64       * @param configuration Configuration parameters that can be adjusted by system administrators.
65       * @param eventTopicsMetaData Meta-data for all audit event topics.
66       * @throws ResourceException If JMS connections cannot be established.
67       */
68      @Inject
69      public JmsAuditEventHandler(
70              @Audit final JmsContextManager jmsContextManager,
71              final JmsAuditEventHandlerConfiguration configuration,
72              final EventTopicsMetaData eventTopicsMetaData) throws ResourceException {
73  
74          super(configuration.getName(), eventTopicsMetaData, configuration.getTopics(), configuration.isEnabled());
75  
76          publisher = buildPublisher(configuration);
77          this.jmsResourceManager =
78                  jmsContextManager == null
79                          ? new JmsResourceManager(
80                                  configuration, new JndiJmsContextManager(configuration.getJndi()))
81                          : new JmsResourceManager(configuration, jmsContextManager);
82          LOGGER.debug("Successfully configured JMS audit event handler.");
83      }
84  
85      /**
86       * Factory method for publisher.
87       *
88       * @param configuration used to determine if a batched publisher is needed or not.
89       * @return the constructed publisher.
90       */
91      Publisher<JsonValue> buildPublisher(JmsAuditEventHandlerConfiguration configuration) {
92          return configuration.getBatch().isBatchEnabled()
93                  ? new JmsBatchPublisher(configuration.getBatch())
94                  : new JmsPublisher();
95      }
96  
97      /**
98       * Creates the JMS Topic and ConnectionFactory from the context configuration settings and opens the JMS connection.
99       */
100     @Override
101     public void startup() throws ResourceException {
102         publisher.startup();
103         LOGGER.debug("JMS audit event handler is started.");
104     }
105 
106     /**
107      * Closes the JMS connection.
108      */
109     @Override
110     public void shutdown() throws ResourceException {
111         publisher.shutdown();
112         LOGGER.debug("JMS audit event handler is shutdown.");
113     }
114 
115     /**
116      * Converts the audit event into a JMS TextMessage and then publishes the message on the configured jmsTopic.
117      *
118      * @param context The context chain that initiated the event.
119      * @param auditTopic The Audit Topic for which the auditEvent was created for. (Not to be confused with a JMS Topic)
120      * @param auditEvent The event to convert to a JMS TextMessage and publish on the JMS Topic.
121      * @return a promise with either a response or an exception
122      */
123     @Override
124     public Promise<ResourceResponse, ResourceException> publishEvent(Context context, String auditTopic,
125             JsonValue auditEvent) {
126         try {
127             publisher.publish(json(object(
128                     field("auditTopic", auditTopic),
129                     field("event", auditEvent.getObject())
130             )));
131 
132             // Return the auditEvent as the response.
133             return newResourceResponse(
134                     auditEvent.get(ResourceResponse.FIELD_CONTENT_ID).asString(),
135                     null,
136                     auditEvent).asPromise();
137 
138         } catch (Exception ex) {
139             return adapt(ex).asPromise();
140         }
141     }
142 
143 
144     /**
145      * Publishes the list of messages using a single producer.
146      *
147      * @param messages the messages to send.
148      * @throws InternalServerErrorException if unable to publish jms messages.
149      */
150     private void publishJmsMessagesWithRetry(List<JsonValue> messages) throws InternalServerErrorException {
151         try {
152             publishJmsMessages(messages);
153         } catch (JMSException e) {
154             LOGGER.debug("Retrying publish", e);
155             try {
156                 resetConnection();
157                 publishJmsMessages(messages);
158             } catch (JMSException | ResourceException ex) {
159                 final String message = "Unable to publish JMS messages, messages are likely lost";
160                 LOGGER.error(message, e);
161                 throw new InternalServerErrorException(message, e);
162             }
163         }
164     }
165 
166     /**
167      * Publishes the list of messages using a single producer.
168      *
169      * @param messages the messages to send.
170      * @throws JMSException if unable to publish jms messages and a retry is possible.
171      *         InternalServerErrorException if unable to publish jms messages and a retry is not possible.
172      */
173     private void publishJmsMessages(List<JsonValue> messages) throws JMSException, InternalServerErrorException {
174         Session session = null;
175         try {
176             session = jmsResourceManager.createSession();
177             MessageProducer producer = null;
178             try {
179                 producer = jmsResourceManager.createProducer(session);
180                 for (JsonValue message : messages) {
181                     String text = MAPPER.writeValueAsString(message.getObject());
182                     producer.send(session.createTextMessage(text));
183                 }
184             } finally {
185                 if (null != producer) {
186                     producer.close();
187                 }
188             }
189         } catch (JMSException e) {
190             LOGGER.debug("Failed to publish messages", e);
191             throw e;
192         } catch (JsonProcessingException e) {
193             final String message = "Unable to publish JMS messages, messages are likely lost";
194             LOGGER.error(message, e);
195             throw new InternalServerErrorException(message, e);
196         } finally {
197             if (null != session) {
198                 session.close();
199             }
200         }
201     }
202 
203     /**
204      * Returns NotSupportedException as query is not implemented for JMS.
205      * <br/>
206      * {@inheritDoc}
207      * @return NotSupportedException as query is not implemented for JMS.
208      */
209     @Override
210     public Promise<QueryResponse, ResourceException> queryEvents(
211             Context context,
212             String topic,
213             QueryRequest queryRequest,
214             QueryResourceHandler queryResourceHandler) {
215         return notSupported(queryRequest).asPromise();
216     }
217 
218     /**
219      * Returns NotSupportedException as read is not implemented for JMS.
220      * <br/>
221      * {@inheritDoc}
222      * @return NotSupportedException as read is not implemented for JMS.
223      */
224     @Override
225     public Promise<ResourceResponse, ResourceException> readEvent(Context context, String topic, String resourceId) {
226         return new NotSupportedException("read operations are not supported").asPromise();
227     }
228 
229     /**
230      * Implementation of the BatchPublisher to handle publishing groups of audit event data to JMS.
231      */
232     private class JmsBatchPublisher extends BatchPublisher<JsonValue> {
233 
234         /**
235          * Constructor that passes the configuration to {@link BatchPublisher}.
236          *
237          * @param configuration config of the publisher.
238          */
239         public JmsBatchPublisher(BatchPublisherConfiguration configuration) {
240             super("JmsBatchPublisher", configuration);
241         }
242 
243         @Override
244         public void startupPublisher() throws ResourceException {
245             openJmsConnection();
246         }
247 
248         @Override
249         public void shutdownPublisher() throws ResourceException {
250             closeJmsConnection();
251         }
252 
253         @Override
254         protected void publishMessages(List<JsonValue> messages) {
255             try {
256                 publishJmsMessagesWithRetry(messages);
257             } catch (InternalServerErrorException e) {
258                 // do nothing
259             }
260         }
261     }
262 
263     /**
264      * Implementation of the Publisher to handle publishing singleton audit event data to JMS.
265      */
266     private class JmsPublisher implements Publisher<JsonValue> {
267 
268         @Override
269         public void startup() throws ResourceException {
270             openJmsConnection();
271         }
272 
273         @Override
274         public void shutdown() throws ResourceException {
275             closeJmsConnection();
276         }
277 
278         @Override
279         public void publish(JsonValue message) throws ResourceException {
280             publishJmsMessagesWithRetry(Collections.singletonList(message));
281         }
282     }
283 
284     private void openJmsConnection() throws InternalServerErrorException {
285         try {
286             jmsResourceManager.openConnection();
287         } catch (JMSException e) {
288             throw new InternalServerErrorException("trouble opening connection", e);
289         }
290     }
291 
292     private void closeJmsConnection() throws InternalServerErrorException {
293         try {
294             jmsResourceManager.closeConnection();
295         } catch (JMSException e) {
296             throw new InternalServerErrorException("trouble closing connection", e);
297         }
298     }
299 
300     private void resetConnection() throws InternalServerErrorException {
301         closeJmsConnection();
302         openJmsConnection();
303     }
304 }