1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.forgerock.audit.handlers.jdbc;
17
18 import static org.forgerock.json.JsonValue.object;
19 import static org.forgerock.json.resource.Responses.newQueryResponse;
20 import static org.forgerock.json.resource.Responses.newResourceResponse;
21
22 import java.io.IOException;
23 import java.util.List;
24 import java.util.Map;
25 import javax.inject.Inject;
26 import javax.sql.DataSource;
27
28 import org.forgerock.audit.Audit;
29 import org.forgerock.audit.AuditException;
30 import org.forgerock.audit.events.AuditEvent;
31 import org.forgerock.audit.events.AuditEventHelper;
32 import org.forgerock.audit.events.EventTopicsMetaData;
33 import org.forgerock.audit.events.handlers.AuditEventHandler;
34 import org.forgerock.audit.events.handlers.AuditEventHandlerBase;
35 import org.forgerock.audit.handlers.jdbc.JdbcAuditEventHandlerConfiguration.ConnectionPool;
36 import org.forgerock.audit.handlers.jdbc.JdbcAuditEventHandlerConfiguration.EventBufferingConfiguration;
37 import org.forgerock.http.util.Json;
38 import org.forgerock.json.JsonPointer;
39 import org.forgerock.json.JsonValue;
40 import org.forgerock.json.resource.CountPolicy;
41 import org.forgerock.json.resource.InternalServerErrorException;
42 import org.forgerock.json.resource.NotFoundException;
43 import org.forgerock.json.resource.QueryRequest;
44 import org.forgerock.json.resource.QueryResourceHandler;
45 import org.forgerock.json.resource.QueryResponse;
46 import org.forgerock.json.resource.ResourceException;
47 import org.forgerock.json.resource.ResourceResponse;
48 import org.forgerock.services.context.Context;
49 import org.forgerock.util.promise.Promise;
50 import org.slf4j.Logger;
51 import org.slf4j.LoggerFactory;
52
53 import com.zaxxer.hikari.HikariConfig;
54 import com.zaxxer.hikari.HikariDataSource;
55
56 import static org.forgerock.audit.batch.CommonAuditBatchConfiguration.POLLING_INTERVAL;
57
58
59
60
61 public class JdbcAuditEventHandler extends AuditEventHandlerBase {
62
63 private static final Logger logger = LoggerFactory.getLogger(JdbcAuditEventHandler.class);
64
65 public static final String MYSQL = "mysql";
66
67 public static final String H2 = "h2";
68
69 public static final String ORACLE = "oracle";
70
71 private final JdbcAuditEventHandlerConfiguration configuration;
72 private DataSource dataSource;
73 private DatabaseStatementProvider databaseStatementProvider;
74 private boolean sharedDataSource;
75 private JdbcAuditEventExecutor jdbcAuditEventExecutor;
76
77
78
79
80
81
82
83
84
85
86
87 @Inject
88 public JdbcAuditEventHandler(
89 final JdbcAuditEventHandlerConfiguration configuration,
90 final EventTopicsMetaData eventTopicsMetaData,
91 @Audit final DataSource dataSource) {
92 super(configuration.getName(), eventTopicsMetaData, configuration.getTopics(), configuration.isEnabled());
93 this.configuration = configuration;
94 this.dataSource = dataSource;
95 }
96
97
98
99
100 @Override
101 public void startup() throws ResourceException {
102 if (dataSource != null) {
103 sharedDataSource = true;
104 } else {
105 logger.info("No connection pool (DataSource) provided for JDBC Audit Event Handler; defaulting to Hikari");
106 sharedDataSource = false;
107 dataSource = new HikariDataSource(createHikariConfig(configuration.getConnectionPool()));
108 }
109 databaseStatementProvider = getDatabaseStatementProvider(configuration.getDatabaseType());
110 final JdbcAuditEventExecutor jdbcAuditEventExecutor = new JdbcAuditEventExecutorImpl(this.dataSource);
111 final EventBufferingConfiguration bufferConfig = configuration.getBuffering();
112 if (bufferConfig.isEnabled()) {
113 this.jdbcAuditEventExecutor = new BufferedJdbcAuditEventExecutor(
114 bufferConfig.getMaxSize(),
115 bufferConfig.isAutoFlush(),
116 jdbcAuditEventExecutor,
117 POLLING_INTERVAL,
118 bufferConfig.getWriterThreads(),
119 bufferConfig.getMaxBatchedEvents(),
120 dataSource);
121 } else {
122 this.jdbcAuditEventExecutor = jdbcAuditEventExecutor;
123 }
124 }
125
126
127
128
129 @Override
130 public void shutdown() throws ResourceException {
131 if (!sharedDataSource && dataSource instanceof HikariDataSource) {
132 ((HikariDataSource) dataSource).close();
133 }
134 jdbcAuditEventExecutor.close();
135 }
136
137
138
139
140 @Override
141 public Promise<ResourceResponse, ResourceException> publishEvent(Context context, String topic, JsonValue event) {
142 try {
143 final TableMapping mapping = getTableMapping(topic);
144 final JdbcAuditEvent jdbcAuditEvent = databaseStatementProvider.buildCreateEvent(
145 event, mapping, eventTopicsMetaData.getSchema(topic));
146 jdbcAuditEventExecutor.createAuditEvent(jdbcAuditEvent);
147 } catch (AuditException e) {
148 final String error = String.format("Unable to create audit entry for %s", topic);
149 logger.error(error, e);
150 return new InternalServerErrorException(error, e).asPromise();
151 }
152 return newResourceResponse(event.get(ResourceResponse.FIELD_CONTENT_ID).asString(), null, event).asPromise();
153 }
154
155
156
157
158 @Override
159 public Promise<QueryResponse, ResourceException> queryEvents(final Context context, final String topic,
160 final QueryRequest queryRequest, final QueryResourceHandler queryResourceHandler) {
161 final String auditEventTopic = queryRequest.getResourcePathObject().get(0);
162 try {
163 logger.debug("Query called for audit event: {} with queryFilter: {}", topic,
164 queryRequest.getQueryFilter());
165
166 final TableMapping mapping = getTableMapping(topic);
167 final List<Map<String, Object>> results =
168 jdbcAuditEventExecutor.queryAuditEvent(
169 databaseStatementProvider.buildQueryEvent(
170 mapping, queryRequest, eventTopicsMetaData.getSchema(topic)));
171
172 for (Map<String, Object> entry : results) {
173 final JsonValue result = processEntry(entry, mapping, topic);
174 queryResourceHandler.handleResource(
175 newResourceResponse(result.get(ResourceResponse.FIELD_CONTENT_ID).asString(), null, result));
176 }
177 return newQueryResponse(String.valueOf(queryRequest.getPagedResultsOffset() + results.size()),
178 CountPolicy.EXACT, results.size()).asPromise();
179 } catch (AuditException e) {
180 final String error = String.format("Unable to query audit entry for %s", auditEventTopic);
181 logger.error(error, e);
182 return new InternalServerErrorException(error, e).asPromise();
183 }
184 }
185
186 @Override
187 public Promise<ResourceResponse, ResourceException> readEvent(Context context, String topic, String resourceId) {
188 JsonValue result;
189 try {
190 logger.debug("Read called for audit event {} with id {}", topic, resourceId);
191
192 final TableMapping mapping = getTableMapping(topic);
193 final List<Map<String, Object>> results =
194 jdbcAuditEventExecutor.readAuditEvent(
195 databaseStatementProvider.buildReadEvent(
196 mapping, resourceId, eventTopicsMetaData.getSchema(topic)));
197
198 if (results.isEmpty()) {
199 return new NotFoundException(String.format("Entry not found for id: %s", resourceId)).asPromise();
200 }
201 result = processEntry(results.get(0), mapping, topic);
202 } catch (AuditException e) {
203 final String error = String.format("Unable to read audit entry for %s", topic);
204 logger.error(error, e);
205 return new InternalServerErrorException(error, e).asPromise();
206 }
207 return newResourceResponse(resourceId, null, result).asPromise();
208 }
209
210 private TableMapping getTableMapping(final String auditEventTopic) throws AuditException {
211 for (TableMapping tableMapping : configuration.getTableMappings()) {
212 if (tableMapping.getEvent().equalsIgnoreCase(auditEventTopic)) {
213 return tableMapping;
214 }
215 }
216 throw new AuditException(String.format("No table mapping found for audit event type: %s", auditEventTopic));
217 }
218
219 private JsonValue processEntry(final Map<String, Object> sqlResult, final TableMapping tableMapping,
220 final String auditEventTopic) throws AuditException {
221 final JsonValue result = JsonValue.json(object());
222 try {
223 for (Map.Entry<String, String> entry : tableMapping.getFieldToColumn().entrySet()) {
224 final Object value = sqlResult.get(entry.getValue().toLowerCase());
225 if (value != null) {
226 final JsonPointer field = new JsonPointer(entry.getKey());
227 final String fieldType =
228 AuditEventHelper.getPropertyType(eventTopicsMetaData.getSchema(auditEventTopic), field);
229 if (AuditEventHelper.ARRAY_TYPE.equalsIgnoreCase(fieldType)
230 || AuditEventHelper.OBJECT_TYPE.equalsIgnoreCase(fieldType)) {
231
232 result.putPermissive(field, Json.readJson((String) value));
233 } else {
234
235 result.putPermissive(field, value);
236 }
237 }
238 }
239 } catch (IOException e) {
240 logger.error("Unable to process retrieved entry", e);
241 throw new AuditException("Unable to process retrieved entry", e);
242 }
243 return result;
244 }
245
246 private HikariConfig createHikariConfig(ConnectionPool connectionPool) {
247 final HikariConfig hikariConfig = new HikariConfig();
248 hikariConfig.setAutoCommit(connectionPool.getAutoCommit());
249 hikariConfig.setConnectionTimeout(connectionPool.getConnectionTimeout());
250 hikariConfig.setIdleTimeout(connectionPool.getIdleTimeout());
251 hikariConfig.setMaximumPoolSize(connectionPool.getMaxPoolSize());
252 hikariConfig.setMaxLifetime(connectionPool.getMaxLifetime());
253 hikariConfig.setMinimumIdle(connectionPool.getMinIdle());
254 if (!isBlank(connectionPool.getJdbcUrl())) {
255 hikariConfig.setJdbcUrl(connectionPool.getJdbcUrl());
256 }
257 if (!isBlank(connectionPool.getDataSourceClassName())) {
258 hikariConfig.setDataSourceClassName(connectionPool.getDataSourceClassName());
259 }
260 if (!isBlank(connectionPool.getUsername())) {
261 hikariConfig.setUsername(connectionPool.getUsername());
262 }
263 if (!isBlank(connectionPool.getPassword())) {
264 hikariConfig.setPassword(connectionPool.getPassword());
265 }
266 if (!isBlank(connectionPool.getPoolName())) {
267 hikariConfig.setPoolName(connectionPool.getPoolName());
268 }
269 if (!isBlank(connectionPool.getDriverClassName())) {
270 hikariConfig.setDriverClassName(connectionPool.getDriverClassName());
271 }
272 return hikariConfig;
273 }
274
275 private DatabaseStatementProvider getDatabaseStatementProvider(final String databaseName) {
276 switch (databaseName) {
277 case MYSQL:
278 case H2:
279 return new GenericDatabaseStatementProvider();
280 case ORACLE:
281 return new OracleDatabaseStatementProvider();
282 default:
283 logger.warn("Unknown databaseName provided. Using the generic statement provider: {}", databaseName);
284 return new GenericDatabaseStatementProvider();
285 }
286 }
287
288 private static boolean isBlank(CharSequence charSeq) {
289 if (charSeq == null) {
290 return true;
291 }
292 final int length = charSeq.length();
293 if (length == 0) {
294 return true;
295 }
296 for (int i = 0; i < length; i++) {
297 if (!Character.isWhitespace(charSeq.charAt(i))) {
298 return false;
299 }
300 }
301 return true;
302 }
303 }