1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.forgerock.audit.handlers.jms;
18
19 import static org.forgerock.audit.util.ResourceExceptionsUtil.*;
20 import static org.forgerock.json.JsonValue.*;
21 import static org.forgerock.json.resource.Responses.newResourceResponse;
22
23 import javax.inject.Inject;
24 import javax.jms.JMSException;
25 import javax.jms.MessageProducer;
26 import javax.jms.Session;
27 import java.util.Collections;
28 import java.util.List;
29
30 import com.fasterxml.jackson.core.JsonProcessingException;
31 import com.fasterxml.jackson.databind.ObjectMapper;
32 import org.forgerock.audit.Audit;
33 import org.forgerock.audit.events.EventTopicsMetaData;
34 import org.forgerock.audit.events.handlers.AuditEventHandlerBase;
35 import org.forgerock.json.JsonValue;
36 import org.forgerock.json.resource.InternalServerErrorException;
37 import org.forgerock.json.resource.NotSupportedException;
38 import org.forgerock.json.resource.QueryRequest;
39 import org.forgerock.json.resource.QueryResourceHandler;
40 import org.forgerock.json.resource.QueryResponse;
41 import org.forgerock.json.resource.ResourceException;
42 import org.forgerock.json.resource.ResourceResponse;
43 import org.forgerock.services.context.Context;
44 import org.forgerock.util.promise.Promise;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47
48
49
50
51 public class JmsAuditEventHandler extends AuditEventHandlerBase {
52 private static final Logger LOGGER = LoggerFactory.getLogger(JmsAuditEventHandler.class);
53 private static final ObjectMapper MAPPER = new ObjectMapper();
54
55 private final JmsResourceManager jmsResourceManager;
56 private final Publisher<JsonValue> publisher;
57
58
59
60
61
62
63
64
65
66 @Inject
67 public JmsAuditEventHandler(
68 @Audit final JmsContextManager jmsContextManager,
69 final JmsAuditEventHandlerConfiguration configuration,
70 final EventTopicsMetaData eventTopicsMetaData) throws ResourceException {
71
72 super(configuration.getName(), eventTopicsMetaData, configuration.getTopics(), configuration.isEnabled());
73
74 publisher = buildPublisher(configuration);
75 this.jmsResourceManager =
76 jmsContextManager == null
77 ? new JmsResourceManager(
78 configuration, new JndiJmsContextManager(configuration.getJndi()))
79 : new JmsResourceManager(configuration, jmsContextManager);
80 LOGGER.debug("Successfully configured JMS audit event handler.");
81 }
82
83
84
85
86
87
88
89 Publisher<JsonValue> buildPublisher(JmsAuditEventHandlerConfiguration configuration) {
90 return configuration.getBatch().isBatchEnabled()
91 ? new JmsBatchPublisher(configuration.getBatch())
92 : new JmsPublisher();
93 }
94
95
96
97
98 @Override
99 public void startup() throws ResourceException {
100 publisher.startup();
101 LOGGER.debug("JMS audit event handler is started.");
102 }
103
104
105
106
107 @Override
108 public void shutdown() throws ResourceException {
109 publisher.shutdown();
110 LOGGER.debug("JMS audit event handler is shutdown.");
111 }
112
113
114
115
116
117
118
119
120
121 public Promise<ResourceResponse, ResourceException> publishEvent(Context context, String auditTopic,
122 JsonValue auditEvent) {
123 try {
124 publisher.publish(json(object(
125 field("auditTopic", auditTopic),
126 field("event", auditEvent.getObject())
127 )));
128
129
130 return newResourceResponse(
131 auditEvent.get(ResourceResponse.FIELD_CONTENT_ID).asString(),
132 null,
133 auditEvent).asPromise();
134
135 } catch (Exception ex) {
136 return adapt(ex).asPromise();
137 }
138 }
139
140
141
142
143
144
145
146
147 private void publishJmsMessagesWithRetry(List<JsonValue> messages) throws InternalServerErrorException {
148 try {
149 publishJmsMessages(messages);
150 } catch (JMSException e) {
151 LOGGER.debug("Retrying publish", e);
152 try {
153 resetConnection();
154 publishJmsMessages(messages);
155 } catch (JMSException | ResourceException ex) {
156 final String message = "Unable to publish JMS messages, messages are likely lost";
157 LOGGER.error(message, e);
158 throw new InternalServerErrorException(message, e);
159 }
160 }
161 }
162
163
164
165
166
167
168
169
170 private void publishJmsMessages(List<JsonValue> messages) throws JMSException, InternalServerErrorException {
171 Session session = null;
172 try {
173 session = jmsResourceManager.createSession();
174 MessageProducer producer = null;
175 try {
176 producer = jmsResourceManager.createProducer(session);
177 for (JsonValue message : messages) {
178 String text = MAPPER.writeValueAsString(message.getObject());
179 producer.send(session.createTextMessage(text));
180 }
181 } finally {
182 if (null != producer) {
183 producer.close();
184 }
185 }
186 } catch (JMSException e) {
187 LOGGER.debug("Failed to publish messages", e);
188 throw e;
189 } catch (JsonProcessingException e) {
190 final String message = "Unable to publish JMS messages, messages are likely lost";
191 LOGGER.error(message, e);
192 throw new InternalServerErrorException(message, e);
193 } finally {
194 if (null != session) {
195 session.close();
196 }
197 }
198 }
199
200
201
202
203
204
205
206 @Override
207 public Promise<QueryResponse, ResourceException> queryEvents(
208 Context context,
209 String topic,
210 QueryRequest queryRequest,
211 QueryResourceHandler queryResourceHandler) {
212 return notSupported(queryRequest).asPromise();
213 }
214
215
216
217
218
219
220
221 @Override
222 public Promise<ResourceResponse, ResourceException> readEvent(Context context, String topic, String resourceId) {
223 return new NotSupportedException("read operations are not supported").asPromise();
224 }
225
226
227
228
229 private class JmsBatchPublisher extends BatchPublisher<JsonValue> {
230
231
232
233
234
235
236 public JmsBatchPublisher(BatchPublisherConfiguration configuration) {
237 super("JmsBatchPublisher", configuration);
238 }
239
240 @Override
241 public void startupPublisher() throws ResourceException {
242 openJmsConnection();
243 }
244
245 @Override
246 public void shutdownPublisher() throws ResourceException {
247 closeJmsConnection();
248 }
249
250 @Override
251 protected void publishMessages(List<JsonValue> messages) {
252 try {
253 publishJmsMessagesWithRetry(messages);
254 } catch (InternalServerErrorException e) {
255
256 }
257 }
258 }
259
260
261
262
263 private class JmsPublisher implements Publisher<JsonValue> {
264
265 @Override
266 public void startup() throws ResourceException {
267 openJmsConnection();
268 }
269
270 @Override
271 public void shutdown() throws ResourceException {
272 closeJmsConnection();
273 }
274
275 @Override
276 public void publish(JsonValue message) throws ResourceException {
277 publishJmsMessagesWithRetry(Collections.singletonList(message));
278 }
279 }
280
281 private void openJmsConnection() throws InternalServerErrorException {
282 try {
283 jmsResourceManager.openConnection();
284 } catch (JMSException e) {
285 throw new InternalServerErrorException("trouble opening connection", e);
286 }
287 }
288
289 private void closeJmsConnection() throws InternalServerErrorException {
290 try {
291 jmsResourceManager.closeConnection();
292 } catch (JMSException e) {
293 throw new InternalServerErrorException("trouble closing connection", e);
294 }
295 }
296
297 private void resetConnection() throws InternalServerErrorException {
298 closeJmsConnection();
299 openJmsConnection();
300 }
301 }