AuditServiceImpl.java
/*
* The contents of this file are subject to the terms of the Common Development and
* Distribution License (the License). You may not use this file except in compliance with the
* License.
*
* You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
* specific language governing permission and limitations under the License.
*
* When distributing Covered Software, include this CDDL Header Notice in each file and include
* the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
* Header, with the fields enclosed by brackets [] replaced by your own identifying
* information: "Portions copyright [year] [name of copyright owner]".
*
* Copyright 2015-2017 ForgeRock AS.
*/
package org.forgerock.audit;
import static java.lang.String.format;
import static org.forgerock.audit.AuditServiceProxy.ACTION_PARAM_TARGET_HANDLER;
import static org.forgerock.audit.events.AuditEventBuilder.TIMESTAMP;
import static org.forgerock.audit.events.AuditEventBuilder.TRANSACTION_ID;
import static org.forgerock.audit.util.ResourceExceptionsUtil.adapt;
import static org.forgerock.audit.util.ResourceExceptionsUtil.notSupported;
import static org.forgerock.json.JsonValue.json;
import static org.forgerock.json.JsonValue.object;
import static org.forgerock.json.resource.Responses.newResourceResponse;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import org.forgerock.audit.events.EventTopicsMetaData;
import org.forgerock.audit.events.handlers.AuditEventHandler;
import org.forgerock.audit.filter.Filter;
import org.forgerock.audit.filter.FilterChainBuilder;
import org.forgerock.json.JsonPointer;
import org.forgerock.json.JsonValue;
import org.forgerock.json.resource.ActionRequest;
import org.forgerock.json.resource.ActionResponse;
import org.forgerock.json.resource.BadRequestException;
import org.forgerock.json.resource.CreateRequest;
import org.forgerock.json.resource.DeleteRequest;
import org.forgerock.json.resource.NotSupportedException;
import org.forgerock.json.resource.PatchRequest;
import org.forgerock.json.resource.QueryRequest;
import org.forgerock.json.resource.QueryResourceHandler;
import org.forgerock.json.resource.QueryResponse;
import org.forgerock.json.resource.ReadRequest;
import org.forgerock.json.resource.ResourceException;
import org.forgerock.json.resource.ResourcePath;
import org.forgerock.json.resource.ResourceResponse;
import org.forgerock.json.resource.ServiceUnavailableException;
import org.forgerock.json.resource.UpdateRequest;
import org.forgerock.services.context.Context;
import org.forgerock.util.generator.IdGenerator;
import org.forgerock.util.promise.ExceptionHandler;
import org.forgerock.util.promise.Promise;
import org.forgerock.util.promise.RuntimeExceptionHandler;
import org.forgerock.util.query.QueryFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The default implementation of {@link AuditService}.
* <p/>
* Instances receive their configuration when constructed and cannot be reconfigured. Where "hot-swappable"
* reconfiguration is required, an instance of {@link AuditServiceProxy} should be used as a proxy. The old
* AuditService should fully shutdown before the new instance is started. Care must be taken to ensure that
* no other threads can interact with this object while {@link #startup()} and {@link #shutdown()} methods
* are running.
* <p/>
* After construction, the AuditService will be in the 'STARTING' state until {@link #startup()} is called.
* When in the 'STARTING' state, a call to any method other than {@link #startup()} will lead to
* {@link ServiceUnavailableException}.
* <p/>
* After {@link #startup()} is called, assuming startup succeeds, the AuditService will then be in the
* 'RUNNING' state and further calls to {@link #startup()} will be ignored.
* <p/>
* Calling {@link #shutdown()} will put the AuditService into the 'SHUTDOWN' state; once shutdown, the
* AuditService will remain in this state and cannot be restarted. Further calls to {@link #shutdown()}
* will be ignored. When in the 'SHUTDOWN' state, a call to any method other than {@link #shutdown()} will
* lead to {@link ServiceUnavailableException}.
* <p/>
* When instances are no longer needed, {@link #shutdown()} should be called to ensure that any buffered
* audit events are flushed and that all open file handles or connections are closed.
*/
final class AuditServiceImpl implements AuditService {
private static final Logger logger = LoggerFactory.getLogger(AuditServiceImpl.class);
private static final String PUBLISH_EXCEPTION_TEXT = "Failure in publishing audit event to {} : {}";
/**
* User-facing configuration.
*/
private final AuditServiceConfiguration config;
/**
* Map of all configured AuditEventHandlers indexed by their instance name.
*/
private final Map<String, AuditEventHandler> auditEventHandlersByName;
/**
* Lists of AuditEventHandlers that should receive events for each topic.
*/
private final Map<String, Set<AuditEventHandler>> auditEventHandlersByTopic;
/**
* All the audit event types configured.
*/
private final EventTopicsMetaData eventTopicsMetaData;
/**
* The AuditEventHandler to use for queries.
*/
private final AuditEventHandler queryHandler;
/**
* Indicates the current lifecycle state of this AuditService.
*/
private volatile LifecycleState lifecycleState = LifecycleState.STARTING;
/**
* The filters to apply to the audit event.
*/
private final Filter filters;
/**
* Constructs a new instance.
*
* @param configuration
* User-facing configuration.
* @param eventTopicsMetaData
* Meta-data describing the types of events this AuditService can receive.
* Passing the map to this constructor effectively transfers ownership to this object and neither
* it nor its contents should not be updated further by code outside of this class thereafter.
* @param auditEventHandlers
* List of all configured AuditEventHandlers.
*/
public AuditServiceImpl(
final AuditServiceConfiguration configuration,
final EventTopicsMetaData eventTopicsMetaData,
final Set<AuditEventHandler> auditEventHandlers) {
this.config = new AuditServiceConfiguration(configuration);
this.eventTopicsMetaData = eventTopicsMetaData;
this.auditEventHandlersByName = getAuditEventHandlersByName(auditEventHandlers);
this.auditEventHandlersByTopic = getAuditEventHandlersByTopic(auditEventHandlers, eventTopicsMetaData);
String queryHandlerName = configuration.getHandlerForQueries();
if (queryHandlerName != null && this.auditEventHandlersByName.containsKey(queryHandlerName)
&& this.auditEventHandlersByName.get(queryHandlerName).isEnabled()) {
queryHandler = this.auditEventHandlersByName.get(queryHandlerName);
} else {
queryHandler = new NullQueryHandler(config.getHandlerForQueries());
}
this.filters = new FilterChainBuilder()
.withAuditTopics(eventTopicsMetaData.getTopics())
.withPolicies(configuration.getFilterPolicies())
.build();
}
private Map<String, AuditEventHandler> getAuditEventHandlersByName(Set<AuditEventHandler> handlers) {
Map<String, AuditEventHandler> handlersByName = new HashMap<>(handlers.size());
for (AuditEventHandler handler : handlers) {
handlersByName.put(handler.getName(), handler);
}
return handlersByName;
}
private Map<String, Set<AuditEventHandler>> getAuditEventHandlersByTopic(
final Set<AuditEventHandler> handlers,
final EventTopicsMetaData eventTopicsMetaData) {
Map<String, Set<AuditEventHandler>> handlersByTopic = new HashMap<>();
for (String topic : eventTopicsMetaData.getTopics()) {
// Use a LinkedHashSet so that iteration order follows order in which handlers were defined
handlersByTopic.put(topic, new LinkedHashSet<AuditEventHandler>());
}
for (AuditEventHandler handler : handlers) {
if (!handler.isEnabled()) {
continue;
}
for (String topic : handler.getHandledTopics()) {
handlersByTopic.get(topic).add(handler);
}
}
return handlersByTopic;
}
@Override
public Promise<ResourceResponse, ResourceException> handleRead(final Context context, final ReadRequest request) {
try {
logger.debug("Audit read called for {}", request.getResourcePath());
checkLifecycleStateIsRunning();
if (request.getResourcePathObject().size() != 2) {
return new BadRequestException("Invalid resource path object specified.").asPromise();
}
final String id = request.getResourcePathObject().tail(1).toString();
final String topic = establishTopic(request.getResourcePathObject(), true);
return queryHandler.readEvent(context, topic, id);
} catch (Exception e) {
return adapt(e).asPromise();
}
}
@Override
public Promise<ResourceResponse, ResourceException> handleCreate(
final Context context, final CreateRequest request) {
try {
logger.trace("Audit create called for {}", request.getResourcePath());
checkLifecycleStateIsRunning();
if (context.containsContext(AuditingContext.class)) {
// Don't audit the audit log
return newUnhandledEventResponse().asPromise();
}
final String topic = establishTopic(request.getResourcePathObject(), true);
rejectIfMissingTransactionIdOrTimestamp(request);
establishAuditEventId(request);
filters.doFilter(topic, request.getContent());
Collection<AuditEventHandler> auditEventHandlersForEvent = getAuditEventHandlersForEvent(topic);
return publishEventToHandlers(context, request.getContent(), topic, auditEventHandlersForEvent);
} catch (Exception e) {
logger.warn(e.getMessage());
return adapt(e).asPromise();
}
}
private ResourceResponse newUnhandledEventResponse() {
return newResourceResponse(null, null, json(object()));
}
private void rejectIfMissingTransactionIdOrTimestamp(CreateRequest request) throws BadRequestException {
if (!request.getContent().isDefined(TRANSACTION_ID) || !request.getContent().isDefined(TIMESTAMP)) {
throw new BadRequestException("The request requires a transactionId and a timestamp");
}
}
private void establishAuditEventId(CreateRequest request) {
String newResourceId = request.getNewResourceId();
String auditEventId = newResourceId == null || newResourceId.isEmpty()
? IdGenerator.DEFAULT.generate()
: newResourceId;
request.getContent().put(ResourceResponse.FIELD_CONTENT_ID, auditEventId);
logger.debug("Audit create id {}", auditEventId);
}
private String establishTopic(final ResourcePath path, boolean isTopicRequired) throws ResourceException {
String topic = path.head(1).toString();
if (isTopicRequired && topic == null) {
throw new BadRequestException("Audit service called without specifying event topic in the identifier");
}
if (topic != null && !eventTopicsMetaData.containsTopic(topic)) {
throw new NotSupportedException("Audit service called with unknown event topic " + topic);
}
return topic;
}
/**
* Propagates audit event to all handlers registered to receive events for the given topic.
*
* @return The result generated by the queryHandler so that the result of handleCreate is inline with the
* result that would be received for a call to handleRead or handleQuery for the provided event.
* If no queryHandler is registered to receive events of this type, then return a success result
* with an empty body.
*/
private Promise<ResourceResponse, ResourceException> publishEventToHandlers(Context context, JsonValue event,
final String topic, Collection<AuditEventHandler> auditEventHandlersForEvent) {
Promise<ResourceResponse, ResourceException> promise = newUnhandledEventResponse().asPromise();
if (auditEventHandlersForEvent.isEmpty()) {
// if the event is known but not registered with a handler, it's ok to ignore it
logger.debug("No handler found for the event of topic {}", topic);
return promise;
}
// Otherwise, return the result generated by the handler used for queries or a generic response if
// that handler isn't bound to the event's topic
logger.debug("Cascading the event of topic {} to the handlers : {}", topic, auditEventHandlersForEvent);
for (AuditEventHandler auditEventHandler : auditEventHandlersForEvent) {
Promise<ResourceResponse, ResourceException> handlerResult;
try {
handlerResult = auditEventHandler.publishEvent(context, topic, event)
.thenOnException(new ExceptionHandler<ResourceException>() {
@Override
public void handleException(ResourceException exception) {
logger.warn(PUBLISH_EXCEPTION_TEXT, topic, exception.getMessage());
}
})
.thenOnRuntimeException(new RuntimeExceptionHandler() {
@Override
public void handleRuntimeException(RuntimeException exception) {
logger.warn(PUBLISH_EXCEPTION_TEXT, topic, exception.getMessage());
}
});
} catch (Exception ex) {
logger.warn("Unable to publish event to {} : {}", topic, ex.getMessage());
handlerResult = adapt(ex).asPromise();
}
if (auditEventHandler == queryHandler) {
promise = handlerResult;
}
}
return promise;
}
@Override
public Promise<ResourceResponse, ResourceException> handleUpdate(
final Context context, final UpdateRequest request) {
return notSupported(request).asPromise();
}
@Override
public Promise<ResourceResponse, ResourceException> handleDelete(final Context context, DeleteRequest request) {
return notSupported(request).asPromise();
}
@Override
public Promise<ResourceResponse, ResourceException> handlePatch(final Context context, final PatchRequest request) {
return notSupported(request).asPromise();
}
@Override
public Promise<QueryResponse, ResourceException> handleQuery(
final Context context, final QueryRequest request, final QueryResourceHandler handler) {
try {
logger.debug("Audit query called for {}", request.getResourcePath());
checkLifecycleStateIsRunning();
if (request.getQueryId() != null || request.getQueryExpression() != null) {
return new BadRequestException("QueryId and QueryExpression are not supported for audit").asPromise();
}
if (request.getQueryFilter() == null) {
request.setQueryFilter(QueryFilter.<JsonPointer>alwaysTrue());
}
final String topic = establishTopic(request.getResourcePathObject(), true);
return queryHandler.queryEvents(context, topic, request, handler);
} catch (Exception e) {
return adapt(e).asPromise();
}
}
@Override
public Promise<ActionResponse, ResourceException> handleAction(final Context context, final ActionRequest request) {
try {
String handlerName = request.getAdditionalParameter(ACTION_PARAM_TARGET_HANDLER);
String topic = establishTopic(request.getResourcePathObject(), false);
if (handlerName == null) {
// no action is currently managed at the audit service level, so throw an exception
return new BadRequestException(format("Unable to handle action: %s", request.getAction())).asPromise();
}
// Propagate the action to the given handler
AuditEventHandler handler = auditEventHandlersByName.get(handlerName);
if (handler == null) {
return new BadRequestException(format("Action references an unknown handler name: %s", handlerName))
.asPromise();
}
return handler.handleAction(context, topic, request);
} catch (Exception e) {
return adapt(e).asPromise();
}
}
private Collection<AuditEventHandler> getAuditEventHandlersForEvent(final String auditEvent) {
if (auditEventHandlersByTopic.containsKey(auditEvent)) {
return auditEventHandlersByTopic.get(auditEvent);
} else {
return Collections.emptyList();
}
}
@Override
public AuditServiceConfiguration getConfig() throws ServiceUnavailableException {
checkLifecycleStateIsRunning();
return new AuditServiceConfiguration(config);
}
@Override
public AuditEventHandler getRegisteredHandler(String handlerName) throws ServiceUnavailableException {
checkLifecycleStateIsRunning();
return auditEventHandlersByName.get(handlerName);
}
@Override
public Collection<AuditEventHandler> getRegisteredHandlers() throws ServiceUnavailableException {
return auditEventHandlersByName.values();
}
@Override
public boolean isAuditing(String topic) throws ServiceUnavailableException {
checkLifecycleStateIsRunning();
return !getAuditEventHandlersForEvent(topic).isEmpty();
}
@Override
public Set<String> getKnownTopics() throws ServiceUnavailableException {
checkLifecycleStateIsRunning();
return eventTopicsMetaData.getTopics();
}
@Override
public void startup() throws ServiceUnavailableException {
switch (lifecycleState) {
case STARTING:
for (Map.Entry<String, AuditEventHandler> entry : auditEventHandlersByName.entrySet()) {
String handlerName = entry.getKey();
AuditEventHandler handler = entry.getValue();
try {
handler.startup();
} catch (ResourceException e) {
logger.warn("Unable to startup handler " + handlerName, e);
}
}
lifecycleState = LifecycleState.RUNNING;
break;
case RUNNING:
// nothing to do
break;
case SHUTDOWN:
throw new ServiceUnavailableException("AuditService cannot be restarted after shutdown");
default:
throw new IllegalStateException("AuditService is in an unknown state");
}
}
/**
* {@inheritDoc}
* <p/>
* NB. This method is not synchronized with respect to {@link #handleCreate} so there is the possibility for
* an event to be passed to a handler after that handler has been shutdown if external synchronization is not
* applied. {@link AuditServiceProxy} is expected to be used and applies exactly this kind of synchronization.
*/
@Override
public void shutdown() {
switch (lifecycleState) {
case STARTING:
lifecycleState = LifecycleState.SHUTDOWN;
break;
case RUNNING:
for (Map.Entry<String, AuditEventHandler> entry : auditEventHandlersByName.entrySet()) {
String handlerName = entry.getKey();
AuditEventHandler handler = entry.getValue();
try {
handler.shutdown();
} catch (ResourceException e) {
logger.warn("Unable to shutdown handler " + handlerName, e);
}
}
lifecycleState = LifecycleState.SHUTDOWN;
break;
case SHUTDOWN:
// nothing to do
break;
default:
throw new IllegalStateException("AuditService is in an unknown state");
}
}
@Override
public boolean isRunning() {
return lifecycleState == LifecycleState.RUNNING;
}
private void checkLifecycleStateIsRunning() throws ServiceUnavailableException {
if (lifecycleState != LifecycleState.RUNNING) {
throw new ServiceUnavailableException("AuditService not running");
}
}
/**
* Indicates the current lifecycle state of this AuditService.
*/
private enum LifecycleState {
STARTING, RUNNING, SHUTDOWN
}
/**
* Substitute {@link AuditEventHandler} to use when no query handler is available.
*/
private final class NullQueryHandler implements AuditEventHandler {
private final String errorMessage;
private NullQueryHandler(String handlerForQueries) {
if (handlerForQueries == null || handlerForQueries.trim().isEmpty()) {
this.errorMessage = "No handler defined for queries.";
} else {
this.errorMessage = "The handler defined for queries, '" + handlerForQueries
+ "', has not been registered to the audit service, or it is disabled.";
}
}
@Override
public void startup() throws ResourceException {
throw new UnsupportedOperationException("Unsupported.");
}
@Override
public void shutdown() throws ResourceException {
throw new UnsupportedOperationException("Unsupported.");
}
@Override
public String getName() {
throw new UnsupportedOperationException("Unsupported.");
}
@Override
public Set<String> getHandledTopics() {
throw new UnsupportedOperationException("Unsupported.");
}
@Override
public Promise<ResourceResponse, ResourceException> publishEvent(
Context context, String topic, JsonValue event) {
throw new UnsupportedOperationException("Unsupported.");
}
@Override
public Promise<ResourceResponse, ResourceException> readEvent(
Context context, String topic, String resourceId) {
return adapt(new AuditException(errorMessage)).asPromise();
}
@Override
public Promise<QueryResponse, ResourceException> queryEvents(
Context context, String topic, QueryRequest query, QueryResourceHandler handler) {
return adapt(new AuditException(errorMessage)).asPromise();
}
@Override
public boolean isEnabled() {
return true;
}
@Override
public Promise<ActionResponse, ResourceException> handleAction(Context context, String topic,
ActionRequest request) {
throw new UnsupportedOperationException("Unsupported.");
}
}
}