JdbcAuditEventHandler.java
/*
* The contents of this file are subject to the terms of the Common Development and
* Distribution License (the License). You may not use this file except in compliance with the
* License.
*
* You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
* specific language governing permission and limitations under the License.
*
* When distributing Covered Software, include this CDDL Header Notice in each file and include
* the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
* Header, with the fields enclosed by brackets [] replaced by your own identifying
* information: "Portions copyright [year] [name of copyright owner]".
*
* Copyright 2015-2016 ForgeRock AS.
*/
package org.forgerock.audit.handlers.jdbc;
import static org.forgerock.json.JsonValue.object;
import static org.forgerock.json.resource.Responses.newQueryResponse;
import static org.forgerock.json.resource.Responses.newResourceResponse;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import javax.inject.Inject;
import javax.sql.DataSource;
import org.forgerock.audit.Audit;
import org.forgerock.audit.AuditException;
import org.forgerock.audit.events.AuditEvent;
import org.forgerock.audit.events.AuditEventHelper;
import org.forgerock.audit.events.EventTopicsMetaData;
import org.forgerock.audit.events.handlers.AuditEventHandler;
import org.forgerock.audit.events.handlers.AuditEventHandlerBase;
import org.forgerock.audit.handlers.jdbc.JdbcAuditEventHandlerConfiguration.ConnectionPool;
import org.forgerock.audit.handlers.jdbc.JdbcAuditEventHandlerConfiguration.EventBufferingConfiguration;
import org.forgerock.http.util.Json;
import org.forgerock.json.JsonPointer;
import org.forgerock.json.JsonValue;
import org.forgerock.json.resource.CountPolicy;
import org.forgerock.json.resource.InternalServerErrorException;
import org.forgerock.json.resource.NotFoundException;
import org.forgerock.json.resource.QueryRequest;
import org.forgerock.json.resource.QueryResourceHandler;
import org.forgerock.json.resource.QueryResponse;
import org.forgerock.json.resource.ResourceException;
import org.forgerock.json.resource.ResourceResponse;
import org.forgerock.services.context.Context;
import org.forgerock.util.promise.Promise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import static org.forgerock.audit.batch.CommonAuditBatchConfiguration.POLLING_INTERVAL;
/**
* Implements a {@link AuditEventHandler} to write {@link AuditEvent}s to a JDBC repository.
**/
public class JdbcAuditEventHandler extends AuditEventHandlerBase {
private static final Logger logger = LoggerFactory.getLogger(JdbcAuditEventHandler.class);
/** The name used for a MySQL database. */
public static final String MYSQL = "mysql";
/** The name used for an H2 database. */
public static final String H2 = "h2";
/** The name used for an Oracle database. */
public static final String ORACLE = "oracle";
private final JdbcAuditEventHandlerConfiguration configuration;
private DataSource dataSource;
private DatabaseStatementProvider databaseStatementProvider;
private boolean sharedDataSource;
private JdbcAuditEventExecutor jdbcAuditEventExecutor;
/**
* Create a new JdbcAuditEventHandler instance.
*
* @param configuration
* Configuration parameters that can be adjusted by system administrators.
* @param eventTopicsMetaData
* Meta-data for all audit event topics.
* @param dataSource
* Connection pool. If this parameter is null, then a Hikari data source will be created.
*/
@Inject
public JdbcAuditEventHandler(
final JdbcAuditEventHandlerConfiguration configuration,
final EventTopicsMetaData eventTopicsMetaData,
@Audit final DataSource dataSource) {
super(configuration.getName(), eventTopicsMetaData, configuration.getTopics(), configuration.isEnabled());
this.configuration = configuration;
this.dataSource = dataSource;
}
/**
* {@inheritDoc}
*/
@Override
public void startup() throws ResourceException {
if (dataSource != null) {
sharedDataSource = true;
} else {
logger.info("No connection pool (DataSource) provided for JDBC Audit Event Handler; defaulting to Hikari");
sharedDataSource = false;
dataSource = new HikariDataSource(createHikariConfig(configuration.getConnectionPool()));
}
databaseStatementProvider = getDatabaseStatementProvider(configuration.getDatabaseType());
final JdbcAuditEventExecutor jdbcAuditEventExecutor = new JdbcAuditEventExecutorImpl(this.dataSource);
final EventBufferingConfiguration bufferConfig = configuration.getBuffering();
if (bufferConfig.isEnabled()) {
this.jdbcAuditEventExecutor = new BufferedJdbcAuditEventExecutor(
bufferConfig.getMaxSize(),
bufferConfig.isAutoFlush(),
jdbcAuditEventExecutor,
POLLING_INTERVAL,
bufferConfig.getWriterThreads(),
bufferConfig.getMaxBatchedEvents(),
dataSource);
} else {
this.jdbcAuditEventExecutor = jdbcAuditEventExecutor;
}
}
/**
* {@inheritDoc}
*/
@Override
public void shutdown() throws ResourceException {
if (!sharedDataSource && dataSource instanceof HikariDataSource) {
((HikariDataSource) dataSource).close();
}
jdbcAuditEventExecutor.close();
}
/**
* {@inheritDoc}
*/
@Override
public Promise<ResourceResponse, ResourceException> publishEvent(Context context, String topic, JsonValue event) {
try {
final TableMapping mapping = getTableMapping(topic);
final JdbcAuditEvent jdbcAuditEvent = databaseStatementProvider.buildCreateEvent(
event, mapping, eventTopicsMetaData.getSchema(topic));
jdbcAuditEventExecutor.createAuditEvent(jdbcAuditEvent);
} catch (AuditException e) {
final String error = String.format("Unable to create audit entry for %s", topic);
logger.error(error, e);
return new InternalServerErrorException(error, e).asPromise();
}
return newResourceResponse(event.get(ResourceResponse.FIELD_CONTENT_ID).asString(), null, event).asPromise();
}
/**
* {@inheritDoc}
*/
@Override
public Promise<QueryResponse, ResourceException> queryEvents(final Context context, final String topic,
final QueryRequest queryRequest, final QueryResourceHandler queryResourceHandler) {
final String auditEventTopic = queryRequest.getResourcePathObject().get(0);
try {
logger.debug("Query called for audit event: {} with queryFilter: {}", topic,
queryRequest.getQueryFilter());
final TableMapping mapping = getTableMapping(topic);
final List<Map<String, Object>> results =
jdbcAuditEventExecutor.queryAuditEvent(
databaseStatementProvider.buildQueryEvent(
mapping, queryRequest, eventTopicsMetaData.getSchema(topic)));
for (Map<String, Object> entry : results) {
final JsonValue result = processEntry(entry, mapping, topic);
queryResourceHandler.handleResource(
newResourceResponse(result.get(ResourceResponse.FIELD_CONTENT_ID).asString(), null, result));
}
return newQueryResponse(String.valueOf(queryRequest.getPagedResultsOffset() + results.size()),
CountPolicy.EXACT, results.size()).asPromise();
} catch (AuditException e) {
final String error = String.format("Unable to query audit entry for %s", auditEventTopic);
logger.error(error, e);
return new InternalServerErrorException(error, e).asPromise();
}
}
@Override
public Promise<ResourceResponse, ResourceException> readEvent(Context context, String topic, String resourceId) {
JsonValue result;
try {
logger.debug("Read called for audit event {} with id {}", topic, resourceId);
final TableMapping mapping = getTableMapping(topic);
final List<Map<String, Object>> results =
jdbcAuditEventExecutor.readAuditEvent(
databaseStatementProvider.buildReadEvent(
mapping, resourceId, eventTopicsMetaData.getSchema(topic)));
if (results.isEmpty()) {
return new NotFoundException(String.format("Entry not found for id: %s", resourceId)).asPromise();
}
result = processEntry(results.get(0), mapping, topic);
} catch (AuditException e) {
final String error = String.format("Unable to read audit entry for %s", topic);
logger.error(error, e);
return new InternalServerErrorException(error, e).asPromise();
}
return newResourceResponse(resourceId, null, result).asPromise();
}
private TableMapping getTableMapping(final String auditEventTopic) throws AuditException {
for (TableMapping tableMapping : configuration.getTableMappings()) {
if (tableMapping.getEvent().equalsIgnoreCase(auditEventTopic)) {
return tableMapping;
}
}
throw new AuditException(String.format("No table mapping found for audit event type: %s", auditEventTopic));
}
private JsonValue processEntry(final Map<String, Object> sqlResult, final TableMapping tableMapping,
final String auditEventTopic) throws AuditException {
final JsonValue result = JsonValue.json(object());
try {
for (Map.Entry<String, String> entry : tableMapping.getFieldToColumn().entrySet()) {
final Object value = sqlResult.get(entry.getValue().toLowerCase());
if (value != null) {
final JsonPointer field = new JsonPointer(entry.getKey());
final String fieldType =
AuditEventHelper.getPropertyType(eventTopicsMetaData.getSchema(auditEventTopic), field);
if (AuditEventHelper.ARRAY_TYPE.equalsIgnoreCase(fieldType)
|| AuditEventHelper.OBJECT_TYPE.equalsIgnoreCase(fieldType)) {
// parse stringified json
result.putPermissive(field, Json.readJson((String) value));
} else {
// value doesn't need parsing
result.putPermissive(field, value);
}
}
}
} catch (IOException e) {
logger.error("Unable to process retrieved entry", e);
throw new AuditException("Unable to process retrieved entry", e);
}
return result;
}
private HikariConfig createHikariConfig(ConnectionPool connectionPool) {
final HikariConfig hikariConfig = new HikariConfig();
hikariConfig.setAutoCommit(connectionPool.getAutoCommit());
hikariConfig.setConnectionTimeout(connectionPool.getConnectionTimeout());
hikariConfig.setIdleTimeout(connectionPool.getIdleTimeout());
hikariConfig.setMaximumPoolSize(connectionPool.getMaxPoolSize());
hikariConfig.setMaxLifetime(connectionPool.getMaxLifetime());
hikariConfig.setMinimumIdle(connectionPool.getMinIdle());
if (!isBlank(connectionPool.getJdbcUrl())) {
hikariConfig.setJdbcUrl(connectionPool.getJdbcUrl());
}
if (!isBlank(connectionPool.getDataSourceClassName())) {
hikariConfig.setDataSourceClassName(connectionPool.getDataSourceClassName());
}
if (!isBlank(connectionPool.getUsername())) {
hikariConfig.setUsername(connectionPool.getUsername());
}
if (!isBlank(connectionPool.getPassword())) {
hikariConfig.setPassword(connectionPool.getPassword());
}
if (!isBlank(connectionPool.getPoolName())) {
hikariConfig.setPoolName(connectionPool.getPoolName());
}
if (!isBlank(connectionPool.getDriverClassName())) {
hikariConfig.setDriverClassName(connectionPool.getDriverClassName());
}
return hikariConfig;
}
private DatabaseStatementProvider getDatabaseStatementProvider(final String databaseName) {
switch (databaseName) {
case MYSQL:
case H2:
return new GenericDatabaseStatementProvider();
case ORACLE:
return new OracleDatabaseStatementProvider();
default:
logger.warn("Unknown databaseName provided. Using the generic statement provider: {}", databaseName);
return new GenericDatabaseStatementProvider();
}
}
private static boolean isBlank(CharSequence charSeq) {
if (charSeq == null) {
return true;
}
final int length = charSeq.length();
if (length == 0) {
return true;
}
for (int i = 0; i < length; i++) {
if (!Character.isWhitespace(charSeq.charAt(i))) {
return false;
}
}
return true;
}
}