View Javadoc
1   /*
2    * The contents of this file are subject to the terms of the Common Development and
3    * Distribution License (the License). You may not use this file except in compliance with the
4    * License.
5    *
6    * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
7    * specific language governing permission and limitations under the License.
8    *
9    * When distributing Covered Software, include this CDDL Header Notice in each file and include
10   * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
11   * Header, with the fields enclosed by brackets [] replaced by your own identifying
12   * information: "Portions copyright [year] [name of copyright owner]".
13   *
14   * Copyright 2015-2016 ForgeRock AS.
15   */
16  package org.forgerock.audit.handlers.jdbc;
17  
18  import static org.forgerock.json.JsonValue.object;
19  import static org.forgerock.json.resource.Responses.newQueryResponse;
20  import static org.forgerock.json.resource.Responses.newResourceResponse;
21  
22  import java.io.IOException;
23  import java.util.List;
24  import java.util.Map;
25  import javax.inject.Inject;
26  import javax.sql.DataSource;
27  
28  import org.forgerock.audit.Audit;
29  import org.forgerock.audit.AuditException;
30  import org.forgerock.audit.events.AuditEvent;
31  import org.forgerock.audit.events.AuditEventHelper;
32  import org.forgerock.audit.events.EventTopicsMetaData;
33  import org.forgerock.audit.events.handlers.AuditEventHandler;
34  import org.forgerock.audit.events.handlers.AuditEventHandlerBase;
35  import org.forgerock.audit.handlers.jdbc.JdbcAuditEventHandlerConfiguration.ConnectionPool;
36  import org.forgerock.audit.handlers.jdbc.JdbcAuditEventHandlerConfiguration.EventBufferingConfiguration;
37  import org.forgerock.http.util.Json;
38  import org.forgerock.json.JsonPointer;
39  import org.forgerock.json.JsonValue;
40  import org.forgerock.json.resource.CountPolicy;
41  import org.forgerock.json.resource.InternalServerErrorException;
42  import org.forgerock.json.resource.NotFoundException;
43  import org.forgerock.json.resource.QueryRequest;
44  import org.forgerock.json.resource.QueryResourceHandler;
45  import org.forgerock.json.resource.QueryResponse;
46  import org.forgerock.json.resource.ResourceException;
47  import org.forgerock.json.resource.ResourceResponse;
48  import org.forgerock.services.context.Context;
49  import org.forgerock.util.promise.Promise;
50  import org.slf4j.Logger;
51  import org.slf4j.LoggerFactory;
52  
53  import com.zaxxer.hikari.HikariConfig;
54  import com.zaxxer.hikari.HikariDataSource;
55  
56  import static org.forgerock.audit.batch.CommonAuditBatchConfiguration.POLLING_INTERVAL;
57  
58  /**
59   * Implements a {@link AuditEventHandler} to write {@link AuditEvent}s to a JDBC repository.
60   **/
61  public class JdbcAuditEventHandler extends AuditEventHandlerBase {
62  
63      private static final Logger logger = LoggerFactory.getLogger(JdbcAuditEventHandler.class);
64      /** The name used for a MySQL database. */
65      public static final String MYSQL = "mysql";
66      /** The name used for an H2 database. */
67      public static final String H2 = "h2";
68      /** The name used for an Oracle database. */
69      public static final String ORACLE = "oracle";
70  
71      private final JdbcAuditEventHandlerConfiguration configuration;
72      private DataSource dataSource;
73      private DatabaseStatementProvider databaseStatementProvider;
74      private boolean sharedDataSource;
75      private JdbcAuditEventExecutor jdbcAuditEventExecutor;
76  
77      /**
78       * Create a new JdbcAuditEventHandler instance.
79       *
80       * @param configuration
81       *          Configuration parameters that can be adjusted by system administrators.
82       * @param eventTopicsMetaData
83       *          Meta-data for all audit event topics.
84       * @param dataSource
85       *          Connection pool. If this parameter is null, then a Hikari data source will be created.
86       */
87      @Inject
88      public JdbcAuditEventHandler(
89              final JdbcAuditEventHandlerConfiguration configuration,
90              final EventTopicsMetaData eventTopicsMetaData,
91              @Audit final DataSource dataSource) {
92          super(configuration.getName(), eventTopicsMetaData, configuration.getTopics(), configuration.isEnabled());
93          this.configuration = configuration;
94          this.dataSource = dataSource;
95      }
96  
97      /**
98       * {@inheritDoc}
99       */
100     @Override
101     public void startup() throws ResourceException {
102         if (dataSource != null) {
103             sharedDataSource = true;
104         } else {
105             logger.info("No connection pool (DataSource) provided for JDBC Audit Event Handler; defaulting to Hikari");
106             sharedDataSource = false;
107             dataSource = new HikariDataSource(createHikariConfig(configuration.getConnectionPool()));
108         }
109         databaseStatementProvider = getDatabaseStatementProvider(configuration.getDatabaseType());
110         final JdbcAuditEventExecutor jdbcAuditEventExecutor = new JdbcAuditEventExecutorImpl(this.dataSource);
111         final EventBufferingConfiguration bufferConfig = configuration.getBuffering();
112         if (bufferConfig.isEnabled()) {
113             this.jdbcAuditEventExecutor = new BufferedJdbcAuditEventExecutor(
114                     bufferConfig.getMaxSize(),
115                     bufferConfig.isAutoFlush(),
116                     jdbcAuditEventExecutor,
117                     POLLING_INTERVAL,
118                     bufferConfig.getWriterThreads(),
119                     bufferConfig.getMaxBatchedEvents(),
120                     dataSource);
121         } else {
122             this.jdbcAuditEventExecutor = jdbcAuditEventExecutor;
123         }
124     }
125 
126     /**
127      * {@inheritDoc}
128      */
129     @Override
130     public void shutdown() throws ResourceException {
131         if (!sharedDataSource && dataSource instanceof HikariDataSource) {
132             ((HikariDataSource) dataSource).close();
133         }
134         jdbcAuditEventExecutor.close();
135     }
136 
137     /**
138      * {@inheritDoc}
139      */
140     @Override
141     public Promise<ResourceResponse, ResourceException> publishEvent(Context context, String topic, JsonValue event) {
142         try {
143             final TableMapping mapping = getTableMapping(topic);
144             final JdbcAuditEvent jdbcAuditEvent = databaseStatementProvider.buildCreateEvent(
145                     event, mapping, eventTopicsMetaData.getSchema(topic));
146             jdbcAuditEventExecutor.createAuditEvent(jdbcAuditEvent);
147         } catch (AuditException e) {
148             final String error = String.format("Unable to create audit entry for %s", topic);
149             logger.error(error, e);
150             return new InternalServerErrorException(error, e).asPromise();
151         }
152         return newResourceResponse(event.get(ResourceResponse.FIELD_CONTENT_ID).asString(), null, event).asPromise();
153     }
154 
155     /**
156      * {@inheritDoc}
157      */
158     @Override
159     public Promise<QueryResponse, ResourceException> queryEvents(final Context context, final String topic,
160             final QueryRequest queryRequest, final QueryResourceHandler queryResourceHandler) {
161         final String auditEventTopic = queryRequest.getResourcePathObject().get(0);
162         try {
163             logger.debug("Query called for audit event: {} with queryFilter: {}", topic,
164                     queryRequest.getQueryFilter());
165 
166             final TableMapping mapping = getTableMapping(topic);
167             final List<Map<String, Object>> results =
168                     jdbcAuditEventExecutor.queryAuditEvent(
169                             databaseStatementProvider.buildQueryEvent(
170                                     mapping, queryRequest, eventTopicsMetaData.getSchema(topic)));
171 
172             for (Map<String, Object> entry : results) {
173                 final JsonValue result = processEntry(entry, mapping, topic);
174                 queryResourceHandler.handleResource(
175                         newResourceResponse(result.get(ResourceResponse.FIELD_CONTENT_ID).asString(), null, result));
176             }
177             return newQueryResponse(String.valueOf(queryRequest.getPagedResultsOffset() + results.size()),
178                             CountPolicy.EXACT, results.size()).asPromise();
179         } catch (AuditException e) {
180             final String error = String.format("Unable to query audit entry for %s", auditEventTopic);
181             logger.error(error, e);
182             return new InternalServerErrorException(error, e).asPromise();
183         }
184     }
185 
186     @Override
187     public Promise<ResourceResponse, ResourceException> readEvent(Context context, String topic, String resourceId) {
188         JsonValue result;
189         try {
190             logger.debug("Read called for audit event {} with id {}", topic, resourceId);
191 
192             final TableMapping mapping = getTableMapping(topic);
193             final List<Map<String, Object>> results =
194                     jdbcAuditEventExecutor.readAuditEvent(
195                             databaseStatementProvider.buildReadEvent(
196                                     mapping, resourceId, eventTopicsMetaData.getSchema(topic)));
197 
198             if (results.isEmpty()) {
199                 return new NotFoundException(String.format("Entry not found for id: %s", resourceId)).asPromise();
200             }
201             result = processEntry(results.get(0), mapping, topic);
202         } catch (AuditException e) {
203             final String error = String.format("Unable to read audit entry for %s", topic);
204             logger.error(error, e);
205             return new InternalServerErrorException(error, e).asPromise();
206         }
207         return newResourceResponse(resourceId, null, result).asPromise();
208     }
209 
210     private TableMapping getTableMapping(final String auditEventTopic) throws AuditException {
211         for (TableMapping tableMapping : configuration.getTableMappings()) {
212             if (tableMapping.getEvent().equalsIgnoreCase(auditEventTopic)) {
213                 return tableMapping;
214             }
215         }
216         throw new AuditException(String.format("No table mapping found for audit event type: %s", auditEventTopic));
217     }
218 
219     private JsonValue processEntry(final Map<String, Object> sqlResult, final TableMapping tableMapping,
220             final String auditEventTopic) throws AuditException {
221         final JsonValue result = JsonValue.json(object());
222         try {
223             for (Map.Entry<String, String> entry : tableMapping.getFieldToColumn().entrySet()) {
224                 final Object value = sqlResult.get(entry.getValue().toLowerCase());
225                 if (value != null) {
226                     final JsonPointer field = new JsonPointer(entry.getKey());
227                     final String fieldType =
228                             AuditEventHelper.getPropertyType(eventTopicsMetaData.getSchema(auditEventTopic), field);
229                     if (AuditEventHelper.ARRAY_TYPE.equalsIgnoreCase(fieldType)
230                             || AuditEventHelper.OBJECT_TYPE.equalsIgnoreCase(fieldType)) {
231                         // parse stringified json
232                         result.putPermissive(field, Json.readJson((String) value));
233                     } else {
234                         // value doesn't need parsing
235                         result.putPermissive(field, value);
236                     }
237                 }
238             }
239         } catch (IOException e) {
240             logger.error("Unable to process retrieved entry", e);
241             throw new AuditException("Unable to process retrieved entry", e);
242         }
243         return result;
244     }
245 
246     private HikariConfig createHikariConfig(ConnectionPool connectionPool) {
247         final HikariConfig hikariConfig = new HikariConfig();
248         hikariConfig.setAutoCommit(connectionPool.getAutoCommit());
249         hikariConfig.setConnectionTimeout(connectionPool.getConnectionTimeout());
250         hikariConfig.setIdleTimeout(connectionPool.getIdleTimeout());
251         hikariConfig.setMaximumPoolSize(connectionPool.getMaxPoolSize());
252         hikariConfig.setMaxLifetime(connectionPool.getMaxLifetime());
253         hikariConfig.setMinimumIdle(connectionPool.getMinIdle());
254         if (!isBlank(connectionPool.getJdbcUrl())) {
255             hikariConfig.setJdbcUrl(connectionPool.getJdbcUrl());
256         }
257         if (!isBlank(connectionPool.getDataSourceClassName())) {
258             hikariConfig.setDataSourceClassName(connectionPool.getDataSourceClassName());
259         }
260         if (!isBlank(connectionPool.getUsername())) {
261             hikariConfig.setUsername(connectionPool.getUsername());
262         }
263         if (!isBlank(connectionPool.getPassword())) {
264             hikariConfig.setPassword(connectionPool.getPassword());
265         }
266         if (!isBlank(connectionPool.getPoolName())) {
267             hikariConfig.setPoolName(connectionPool.getPoolName());
268         }
269         if (!isBlank(connectionPool.getDriverClassName())) {
270             hikariConfig.setDriverClassName(connectionPool.getDriverClassName());
271         }
272         return hikariConfig;
273     }
274 
275     private DatabaseStatementProvider getDatabaseStatementProvider(final String databaseName) {
276         switch (databaseName) {
277         case MYSQL:
278         case H2:
279             return new GenericDatabaseStatementProvider();
280         case ORACLE:
281             return new OracleDatabaseStatementProvider();
282         default:
283             logger.warn("Unknown databaseName provided. Using the generic statement provider: {}", databaseName);
284             return new GenericDatabaseStatementProvider();
285         }
286     }
287 
288     private static boolean isBlank(CharSequence charSeq) {
289         if (charSeq == null) {
290             return true;
291         }
292         final int length = charSeq.length();
293         if (length == 0) {
294             return true;
295         }
296         for (int i = 0; i < length; i++) {
297             if (!Character.isWhitespace(charSeq.charAt(i))) {
298                 return false;
299             }
300         }
301         return true;
302     }
303 }