1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.forgerock.audit.handlers.jdbc;
17
18 import static org.forgerock.audit.batch.CommonAuditBatchConfiguration.POLLING_INTERVAL;
19 import static org.forgerock.json.JsonValue.object;
20 import static org.forgerock.json.resource.Responses.newQueryResponse;
21 import static org.forgerock.json.resource.Responses.newResourceResponse;
22
23 import com.zaxxer.hikari.HikariConfig;
24 import com.zaxxer.hikari.HikariDataSource;
25 import jakarta.inject.Inject;
26 import java.io.IOException;
27 import java.util.List;
28 import java.util.Map;
29 import javax.sql.DataSource;
30 import org.forgerock.audit.Audit;
31 import org.forgerock.audit.AuditException;
32 import org.forgerock.audit.events.AuditEvent;
33 import org.forgerock.audit.events.AuditEventHelper;
34 import org.forgerock.audit.events.EventTopicsMetaData;
35 import org.forgerock.audit.events.handlers.AuditEventHandler;
36 import org.forgerock.audit.events.handlers.AuditEventHandlerBase;
37 import org.forgerock.audit.handlers.jdbc.JdbcAuditEventHandlerConfiguration.ConnectionPool;
38 import org.forgerock.audit.handlers.jdbc.JdbcAuditEventHandlerConfiguration.EventBufferingConfiguration;
39 import org.forgerock.http.util.Json;
40 import org.forgerock.json.JsonPointer;
41 import org.forgerock.json.JsonValue;
42 import org.forgerock.json.resource.CountPolicy;
43 import org.forgerock.json.resource.InternalServerErrorException;
44 import org.forgerock.json.resource.NotFoundException;
45 import org.forgerock.json.resource.QueryRequest;
46 import org.forgerock.json.resource.QueryResourceHandler;
47 import org.forgerock.json.resource.QueryResponse;
48 import org.forgerock.json.resource.ResourceException;
49 import org.forgerock.json.resource.ResourceResponse;
50 import org.forgerock.services.context.Context;
51 import org.forgerock.util.promise.Promise;
52 import org.slf4j.Logger;
53 import org.slf4j.LoggerFactory;
54
55
56
57
58 public class JdbcAuditEventHandler extends AuditEventHandlerBase {
59
60 private static final Logger logger = LoggerFactory.getLogger(JdbcAuditEventHandler.class);
61
62 public static final String MYSQL = "mysql";
63
64 public static final String H2 = "h2";
65
66 public static final String ORACLE = "oracle";
67
68 private final JdbcAuditEventHandlerConfiguration configuration;
69 private DataSource dataSource;
70 private DatabaseStatementProvider databaseStatementProvider;
71 private boolean sharedDataSource;
72 private JdbcAuditEventExecutor jdbcAuditEventExecutor;
73
74
75
76
77
78
79
80
81
82
83
84 @Inject
85 public JdbcAuditEventHandler(
86 final JdbcAuditEventHandlerConfiguration configuration,
87 final EventTopicsMetaData eventTopicsMetaData,
88 @Audit final DataSource dataSource) {
89 super(configuration.getName(), eventTopicsMetaData, configuration.getTopics(), configuration.isEnabled());
90 this.configuration = configuration;
91 this.dataSource = dataSource;
92 }
93
94
95
96
97 @Override
98 public void startup() throws ResourceException {
99 if (dataSource != null) {
100 sharedDataSource = true;
101 } else {
102 logger.info("No connection pool (DataSource) provided for JDBC Audit Event Handler; defaulting to Hikari");
103 sharedDataSource = false;
104 dataSource = new HikariDataSource(createHikariConfig(configuration.getConnectionPool()));
105 }
106 databaseStatementProvider = getDatabaseStatementProvider(configuration.getDatabaseType());
107 final JdbcAuditEventExecutor jdbcAuditEventExecutor = new JdbcAuditEventExecutorImpl(this.dataSource);
108 final EventBufferingConfiguration bufferConfig = configuration.getBuffering();
109 if (bufferConfig.isEnabled()) {
110 this.jdbcAuditEventExecutor = new BufferedJdbcAuditEventExecutor(
111 bufferConfig.getMaxSize(),
112 bufferConfig.isAutoFlush(),
113 jdbcAuditEventExecutor,
114 POLLING_INTERVAL,
115 bufferConfig.getWriterThreads(),
116 bufferConfig.getMaxBatchedEvents(),
117 dataSource);
118 } else {
119 this.jdbcAuditEventExecutor = jdbcAuditEventExecutor;
120 }
121 }
122
123
124
125
126 @Override
127 public void shutdown() throws ResourceException {
128 if (!sharedDataSource && dataSource instanceof HikariDataSource) {
129 ((HikariDataSource) dataSource).close();
130 }
131 jdbcAuditEventExecutor.close();
132 }
133
134
135
136
137 @Override
138 public Promise<ResourceResponse, ResourceException> publishEvent(Context context, String topic, JsonValue event) {
139 try {
140 final TableMapping mapping = getTableMapping(topic);
141 final JdbcAuditEvent jdbcAuditEvent = databaseStatementProvider.buildCreateEvent(
142 event, mapping, eventTopicsMetaData.getSchema(topic));
143 jdbcAuditEventExecutor.createAuditEvent(jdbcAuditEvent);
144 } catch (AuditException e) {
145 final String error = String.format("Unable to create audit entry for %s", topic);
146 logger.error(error, e);
147 return new InternalServerErrorException(error, e).asPromise();
148 }
149 return newResourceResponse(event.get(ResourceResponse.FIELD_CONTENT_ID).asString(), null, event).asPromise();
150 }
151
152
153
154
155 @Override
156 public Promise<QueryResponse, ResourceException> queryEvents(final Context context, final String topic,
157 final QueryRequest queryRequest, final QueryResourceHandler queryResourceHandler) {
158 final String auditEventTopic = queryRequest.getResourcePathObject().get(0);
159 try {
160 logger.debug("Query called for audit event: {} with queryFilter: {}", topic,
161 queryRequest.getQueryFilter());
162
163 final TableMapping mapping = getTableMapping(topic);
164 final List<Map<String, Object>> results =
165 jdbcAuditEventExecutor.queryAuditEvent(
166 databaseStatementProvider.buildQueryEvent(
167 mapping, queryRequest, eventTopicsMetaData.getSchema(topic)));
168
169 for (Map<String, Object> entry : results) {
170 final JsonValue result = processEntry(entry, mapping, topic);
171 queryResourceHandler.handleResource(
172 newResourceResponse(result.get(ResourceResponse.FIELD_CONTENT_ID).asString(), null, result));
173 }
174 return newQueryResponse(String.valueOf(queryRequest.getPagedResultsOffset() + results.size()),
175 CountPolicy.EXACT, results.size()).asPromise();
176 } catch (AuditException e) {
177 final String error = String.format("Unable to query audit entry for %s", auditEventTopic);
178 logger.error(error, e);
179 return new InternalServerErrorException(error, e).asPromise();
180 }
181 }
182
183 @Override
184 public Promise<ResourceResponse, ResourceException> readEvent(Context context, String topic, String resourceId) {
185 JsonValue result;
186 try {
187 logger.debug("Read called for audit event {} with id {}", topic, resourceId);
188
189 final TableMapping mapping = getTableMapping(topic);
190 final List<Map<String, Object>> results =
191 jdbcAuditEventExecutor.readAuditEvent(
192 databaseStatementProvider.buildReadEvent(
193 mapping, resourceId, eventTopicsMetaData.getSchema(topic)));
194
195 if (results.isEmpty()) {
196 return new NotFoundException(String.format("Entry not found for id: %s", resourceId)).asPromise();
197 }
198 result = processEntry(results.get(0), mapping, topic);
199 } catch (AuditException e) {
200 final String error = String.format("Unable to read audit entry for %s", topic);
201 logger.error(error, e);
202 return new InternalServerErrorException(error, e).asPromise();
203 }
204 return newResourceResponse(resourceId, null, result).asPromise();
205 }
206
207 private TableMapping getTableMapping(final String auditEventTopic) throws AuditException {
208 for (TableMapping tableMapping : configuration.getTableMappings()) {
209 if (tableMapping.getEvent().equalsIgnoreCase(auditEventTopic)) {
210 return tableMapping;
211 }
212 }
213 throw new AuditException(String.format("No table mapping found for audit event type: %s", auditEventTopic));
214 }
215
216 private JsonValue processEntry(final Map<String, Object> sqlResult, final TableMapping tableMapping,
217 final String auditEventTopic) throws AuditException {
218 final JsonValue result = JsonValue.json(object());
219 try {
220 for (Map.Entry<String, String> entry : tableMapping.getFieldToColumn().entrySet()) {
221 final Object value = sqlResult.get(entry.getValue().toLowerCase());
222 if (value != null) {
223 final JsonPointer field = new JsonPointer(entry.getKey());
224 final String fieldType =
225 AuditEventHelper.getPropertyType(eventTopicsMetaData.getSchema(auditEventTopic), field);
226 if (AuditEventHelper.ARRAY_TYPE.equalsIgnoreCase(fieldType)
227 || AuditEventHelper.OBJECT_TYPE.equalsIgnoreCase(fieldType)) {
228
229 result.putPermissive(field, Json.readJson((String) value));
230 } else {
231
232 result.putPermissive(field, value);
233 }
234 }
235 }
236 } catch (IOException e) {
237 logger.error("Unable to process retrieved entry", e);
238 throw new AuditException("Unable to process retrieved entry", e);
239 }
240 return result;
241 }
242
243 private HikariConfig createHikariConfig(ConnectionPool connectionPool) {
244 final HikariConfig hikariConfig = new HikariConfig();
245 hikariConfig.setAutoCommit(connectionPool.getAutoCommit());
246 hikariConfig.setConnectionTimeout(connectionPool.getConnectionTimeout());
247 hikariConfig.setIdleTimeout(connectionPool.getIdleTimeout());
248 hikariConfig.setMaximumPoolSize(connectionPool.getMaxPoolSize());
249 hikariConfig.setMaxLifetime(connectionPool.getMaxLifetime());
250 hikariConfig.setMinimumIdle(connectionPool.getMinIdle());
251 if (!isBlank(connectionPool.getJdbcUrl())) {
252 hikariConfig.setJdbcUrl(connectionPool.getJdbcUrl());
253 }
254 if (!isBlank(connectionPool.getDataSourceClassName())) {
255 hikariConfig.setDataSourceClassName(connectionPool.getDataSourceClassName());
256 }
257 if (!isBlank(connectionPool.getUsername())) {
258 hikariConfig.setUsername(connectionPool.getUsername());
259 }
260 if (!isBlank(connectionPool.getPassword())) {
261 hikariConfig.setPassword(connectionPool.getPassword());
262 }
263 if (!isBlank(connectionPool.getPoolName())) {
264 hikariConfig.setPoolName(connectionPool.getPoolName());
265 }
266 if (!isBlank(connectionPool.getDriverClassName())) {
267 hikariConfig.setDriverClassName(connectionPool.getDriverClassName());
268 }
269 return hikariConfig;
270 }
271
272 private DatabaseStatementProvider getDatabaseStatementProvider(final String databaseName) {
273 switch (databaseName) {
274 case MYSQL:
275 case H2:
276 return new GenericDatabaseStatementProvider();
277 case ORACLE:
278 return new OracleDatabaseStatementProvider();
279 default:
280 logger.warn("Unknown databaseName provided. Using the generic statement provider: {}", databaseName);
281 return new GenericDatabaseStatementProvider();
282 }
283 }
284
285 private static boolean isBlank(CharSequence charSeq) {
286 if (charSeq == null) {
287 return true;
288 }
289 final int length = charSeq.length();
290 if (length == 0) {
291 return true;
292 }
293 for (int i = 0; i < length; i++) {
294 if (!Character.isWhitespace(charSeq.charAt(i))) {
295 return false;
296 }
297 }
298 return true;
299 }
300 }