1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.forgerock.audit.handlers.jms;
18
19 import static org.forgerock.audit.util.ResourceExceptionsUtil.adapt;
20 import static org.forgerock.audit.util.ResourceExceptionsUtil.notSupported;
21 import static org.forgerock.json.JsonValue.field;
22 import static org.forgerock.json.JsonValue.json;
23 import static org.forgerock.json.JsonValue.object;
24 import static org.forgerock.json.resource.Responses.newResourceResponse;
25
26 import com.fasterxml.jackson.core.JsonProcessingException;
27 import com.fasterxml.jackson.databind.ObjectMapper;
28 import jakarta.inject.Inject;
29 import jakarta.jms.JMSException;
30 import jakarta.jms.MessageProducer;
31 import jakarta.jms.Session;
32 import java.util.Collections;
33 import java.util.List;
34 import org.forgerock.audit.Audit;
35 import org.forgerock.audit.events.EventTopicsMetaData;
36 import org.forgerock.audit.events.handlers.AuditEventHandlerBase;
37 import org.forgerock.json.JsonValue;
38 import org.forgerock.json.resource.InternalServerErrorException;
39 import org.forgerock.json.resource.NotSupportedException;
40 import org.forgerock.json.resource.QueryRequest;
41 import org.forgerock.json.resource.QueryResourceHandler;
42 import org.forgerock.json.resource.QueryResponse;
43 import org.forgerock.json.resource.ResourceException;
44 import org.forgerock.json.resource.ResourceResponse;
45 import org.forgerock.services.context.Context;
46 import org.forgerock.util.promise.Promise;
47 import org.slf4j.Logger;
48 import org.slf4j.LoggerFactory;
49
50
51
52
53 public class JmsAuditEventHandler extends AuditEventHandlerBase {
54 private static final Logger LOGGER = LoggerFactory.getLogger(JmsAuditEventHandler.class);
55 private static final ObjectMapper MAPPER = new ObjectMapper();
56
57 private final JmsResourceManager jmsResourceManager;
58 private final Publisher<JsonValue> publisher;
59
60
61
62
63
64
65
66
67
68 @Inject
69 public JmsAuditEventHandler(
70 @Audit final JmsContextManager jmsContextManager,
71 final JmsAuditEventHandlerConfiguration configuration,
72 final EventTopicsMetaData eventTopicsMetaData) throws ResourceException {
73
74 super(configuration.getName(), eventTopicsMetaData, configuration.getTopics(), configuration.isEnabled());
75
76 publisher = buildPublisher(configuration);
77 this.jmsResourceManager =
78 jmsContextManager == null
79 ? new JmsResourceManager(
80 configuration, new JndiJmsContextManager(configuration.getJndi()))
81 : new JmsResourceManager(configuration, jmsContextManager);
82 LOGGER.debug("Successfully configured JMS audit event handler.");
83 }
84
85
86
87
88
89
90
91 Publisher<JsonValue> buildPublisher(JmsAuditEventHandlerConfiguration configuration) {
92 return configuration.getBatch().isBatchEnabled()
93 ? new JmsBatchPublisher(configuration.getBatch())
94 : new JmsPublisher();
95 }
96
97
98
99
100 @Override
101 public void startup() throws ResourceException {
102 publisher.startup();
103 LOGGER.debug("JMS audit event handler is started.");
104 }
105
106
107
108
109 @Override
110 public void shutdown() throws ResourceException {
111 publisher.shutdown();
112 LOGGER.debug("JMS audit event handler is shutdown.");
113 }
114
115
116
117
118
119
120
121
122
123 @Override
124 public Promise<ResourceResponse, ResourceException> publishEvent(Context context, String auditTopic,
125 JsonValue auditEvent) {
126 try {
127 publisher.publish(json(object(
128 field("auditTopic", auditTopic),
129 field("event", auditEvent.getObject())
130 )));
131
132
133 return newResourceResponse(
134 auditEvent.get(ResourceResponse.FIELD_CONTENT_ID).asString(),
135 null,
136 auditEvent).asPromise();
137
138 } catch (Exception ex) {
139 return adapt(ex).asPromise();
140 }
141 }
142
143
144
145
146
147
148
149
150 private void publishJmsMessagesWithRetry(List<JsonValue> messages) throws InternalServerErrorException {
151 try {
152 publishJmsMessages(messages);
153 } catch (JMSException e) {
154 LOGGER.debug("Retrying publish", e);
155 try {
156 resetConnection();
157 publishJmsMessages(messages);
158 } catch (JMSException | ResourceException ex) {
159 final String message = "Unable to publish JMS messages, messages are likely lost";
160 LOGGER.error(message, e);
161 throw new InternalServerErrorException(message, e);
162 }
163 }
164 }
165
166
167
168
169
170
171
172
173 private void publishJmsMessages(List<JsonValue> messages) throws JMSException, InternalServerErrorException {
174 Session session = null;
175 try {
176 session = jmsResourceManager.createSession();
177 MessageProducer producer = null;
178 try {
179 producer = jmsResourceManager.createProducer(session);
180 for (JsonValue message : messages) {
181 String text = MAPPER.writeValueAsString(message.getObject());
182 producer.send(session.createTextMessage(text));
183 }
184 } finally {
185 if (null != producer) {
186 producer.close();
187 }
188 }
189 } catch (JMSException e) {
190 LOGGER.debug("Failed to publish messages", e);
191 throw e;
192 } catch (JsonProcessingException e) {
193 final String message = "Unable to publish JMS messages, messages are likely lost";
194 LOGGER.error(message, e);
195 throw new InternalServerErrorException(message, e);
196 } finally {
197 if (null != session) {
198 session.close();
199 }
200 }
201 }
202
203
204
205
206
207
208
209 @Override
210 public Promise<QueryResponse, ResourceException> queryEvents(
211 Context context,
212 String topic,
213 QueryRequest queryRequest,
214 QueryResourceHandler queryResourceHandler) {
215 return notSupported(queryRequest).asPromise();
216 }
217
218
219
220
221
222
223
224 @Override
225 public Promise<ResourceResponse, ResourceException> readEvent(Context context, String topic, String resourceId) {
226 return new NotSupportedException("read operations are not supported").asPromise();
227 }
228
229
230
231
232 private class JmsBatchPublisher extends BatchPublisher<JsonValue> {
233
234
235
236
237
238
239 public JmsBatchPublisher(BatchPublisherConfiguration configuration) {
240 super("JmsBatchPublisher", configuration);
241 }
242
243 @Override
244 public void startupPublisher() throws ResourceException {
245 openJmsConnection();
246 }
247
248 @Override
249 public void shutdownPublisher() throws ResourceException {
250 closeJmsConnection();
251 }
252
253 @Override
254 protected void publishMessages(List<JsonValue> messages) {
255 try {
256 publishJmsMessagesWithRetry(messages);
257 } catch (InternalServerErrorException e) {
258
259 }
260 }
261 }
262
263
264
265
266 private class JmsPublisher implements Publisher<JsonValue> {
267
268 @Override
269 public void startup() throws ResourceException {
270 openJmsConnection();
271 }
272
273 @Override
274 public void shutdown() throws ResourceException {
275 closeJmsConnection();
276 }
277
278 @Override
279 public void publish(JsonValue message) throws ResourceException {
280 publishJmsMessagesWithRetry(Collections.singletonList(message));
281 }
282 }
283
284 private void openJmsConnection() throws InternalServerErrorException {
285 try {
286 jmsResourceManager.openConnection();
287 } catch (JMSException e) {
288 throw new InternalServerErrorException("trouble opening connection", e);
289 }
290 }
291
292 private void closeJmsConnection() throws InternalServerErrorException {
293 try {
294 jmsResourceManager.closeConnection();
295 } catch (JMSException e) {
296 throw new InternalServerErrorException("trouble closing connection", e);
297 }
298 }
299
300 private void resetConnection() throws InternalServerErrorException {
301 closeJmsConnection();
302 openJmsConnection();
303 }
304 }