CsvAuditEventHandler.java

/*
 * The contents of this file are subject to the terms of the Common Development and
 * Distribution License (the License). You may not use this file except in compliance with the
 * License.
 *
 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
 * specific language governing permission and limitations under the License.
 *
 * When distributing Covered Software, include this CDDL Header Notice in each file and include
 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
 * Header, with the fields enclosed by brackets [] replaced by your own identifying
 * information: "Portions copyright [year] [name of copyright owner]".
 *
 * Copyright 2015-2017 ForgeRock AS.
 */
package org.forgerock.audit.handlers.csv;

import static java.lang.String.format;
import static org.forgerock.audit.events.AuditEventHelper.ARRAY_TYPE;
import static org.forgerock.audit.events.AuditEventHelper.OBJECT_TYPE;
import static org.forgerock.audit.events.AuditEventHelper.dotNotationToJsonPointer;
import static org.forgerock.audit.events.AuditEventHelper.getAuditEventProperties;
import static org.forgerock.audit.events.AuditEventHelper.getAuditEventSchema;
import static org.forgerock.audit.events.AuditEventHelper.getPropertyType;
import static org.forgerock.audit.events.AuditEventHelper.jsonPointerToDotNotation;
import static org.forgerock.audit.util.JsonSchemaUtils.generateJsonPointers;
import static org.forgerock.audit.util.JsonValueUtils.JSONVALUE_FILTER_VISITOR;
import static org.forgerock.audit.util.JsonValueUtils.expand;
import static org.forgerock.json.JsonValue.field;
import static org.forgerock.json.JsonValue.json;
import static org.forgerock.json.JsonValue.object;
import static org.forgerock.json.resource.ResourceResponse.FIELD_CONTENT_ID;
import static org.forgerock.json.resource.Responses.newQueryResponse;
import static org.forgerock.json.resource.Responses.newResourceResponse;
import static org.forgerock.util.Utils.isNullOrEmpty;

import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import javax.inject.Inject;

import org.forgerock.audit.Audit;
import org.forgerock.audit.events.EventTopicsMetaData;
import org.forgerock.audit.events.handlers.AuditEventHandlerBase;
import org.forgerock.audit.handlers.csv.CsvAuditEventHandlerConfiguration.CsvSecurity;
import org.forgerock.audit.handlers.csv.CsvAuditEventHandlerConfiguration.EventBufferingConfiguration;
import org.forgerock.audit.providers.KeyStoreHandlerProvider;
import org.forgerock.audit.retention.TimeStampFileNamingPolicy;
import org.forgerock.audit.secure.JcaKeyStoreHandler;
import org.forgerock.audit.secure.KeyStoreHandler;
import org.forgerock.audit.util.JsonValueUtils;
import org.forgerock.json.JsonPointer;
import org.forgerock.json.JsonValue;
import org.forgerock.json.resource.ActionRequest;
import org.forgerock.json.resource.ActionResponse;
import org.forgerock.json.resource.BadRequestException;
import org.forgerock.json.resource.InternalServerErrorException;
import org.forgerock.json.resource.NotFoundException;
import org.forgerock.json.resource.QueryFilters;
import org.forgerock.json.resource.QueryRequest;
import org.forgerock.json.resource.QueryResourceHandler;
import org.forgerock.json.resource.QueryResponse;
import org.forgerock.json.resource.ResourceException;
import org.forgerock.json.resource.ResourceResponse;
import org.forgerock.json.resource.Responses;
import org.forgerock.services.context.Context;
import org.forgerock.util.Reject;
import org.forgerock.util.promise.Promise;
import org.forgerock.util.query.QueryFilter;
import org.forgerock.util.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.supercsv.cellprocessor.Optional;
import org.supercsv.cellprocessor.ift.CellProcessor;
import org.supercsv.io.CsvMapReader;
import org.supercsv.io.ICsvMapReader;
import org.supercsv.prefs.CsvPreference;
import org.supercsv.quote.AlwaysQuoteMode;
import org.supercsv.util.CsvContext;

import com.fasterxml.jackson.databind.ObjectMapper;

/**
 * Handles AuditEvents by writing them to a CSV file.
 */
public class CsvAuditEventHandler extends AuditEventHandlerBase {

    private static final Logger LOGGER = LoggerFactory.getLogger(CsvAuditEventHandler.class);

    /** Name of action to force file rotation. */
    public static final String ROTATE_FILE_ACTION_NAME = "rotate";

    static final String SECURE_CSV_FILENAME_PREFIX = "tamper-evident-";

    private static final ObjectMapper MAPPER = new ObjectMapper();
    private static final Random RANDOM;

    static {
        try {
            RANDOM = SecureRandom.getInstance("SHA1PRNG");
        } catch (NoSuchAlgorithmException ex) {
            throw new RuntimeException(ex);
        }
    }

    private final CsvAuditEventHandlerConfiguration configuration;
    private final CsvPreference csvPreference;
    private final ConcurrentMap<String, CsvWriter> writers = new ConcurrentHashMap<>();
    private final Map<String, Set<String>> fieldOrderByTopic;
    /** Caches a JSON pointer for each field. */
    private final Map<String, JsonPointer> jsonPointerByField;
    /** Caches the dot notation for each field. */
    private final Map<String, String> fieldDotNotationByField;
    private KeyStoreHandler keyStoreHandler;

    /**
     * Create a new CsvAuditEventHandler instance.
     *
     * @param configuration
     *          Configuration parameters that can be adjusted by system administrators.
     * @param eventTopicsMetaData
     *          Meta-data for all audit event topics.
     * @param keyStoreHandlerProvider
     *          The secure storage to use for keys.
     */
    @Inject
    public CsvAuditEventHandler(
            final CsvAuditEventHandlerConfiguration configuration,
            final EventTopicsMetaData eventTopicsMetaData,
            @Audit KeyStoreHandlerProvider keyStoreHandlerProvider) {

        super(configuration.getName(), eventTopicsMetaData, configuration.getTopics(), configuration.isEnabled());
        this.configuration = configuration;
        this.csvPreference = createCsvPreference(this.configuration);
        CsvSecurity security = configuration.getSecurity();
        if (security.isEnabled()) {
            Duration duration = security.getSignatureIntervalDuration();
            Reject.ifTrue(duration.isZero() || duration.isUnlimited(),
                    "The signature interval can't be zero or unlimited");
            Reject.ifFalse(
                    !isNullOrEmpty(security.getKeyStoreHandlerName())
                            ^ (!isNullOrEmpty(security.getFilename()) && !isNullOrEmpty(security.getPassword())),
                    "Either keyStoreHandlerName or filename/password security settings must be specified, "
                            + "but not both");

            if (security.getKeyStoreHandlerName() != null) {
                this.keyStoreHandler = keyStoreHandlerProvider.getKeystoreHandler(security.getKeyStoreHandlerName());
                Reject.ifTrue(keyStoreHandler == null,
                        "No keystore configured for keyStoreHandlerName: "
                                + security.getKeyStoreHandlerName());
            } else {
                try {
                    keyStoreHandler = new JcaKeyStoreHandler(CsvSecureConstants.KEYSTORE_TYPE, security.getFilename(),
                            security.getPassword());
                } catch (Exception e) {
                    throw new IllegalArgumentException(
                            "Unable to create secure storage from file: " + security.getFilename(), e);
                }
            }
        }

        Map<String, Set<String>> fieldOrderByTopic = new HashMap<>();
        Map<String, JsonPointer> jsonPointerByField = new HashMap<>();
        Map<String, String> fieldDotNotationByField = new HashMap<>();
        for (String topic : this.eventTopicsMetaData.getTopics()) {
            try {
                Set<String> fieldOrder = getFieldOrder(topic, this.eventTopicsMetaData);
                for (String field : fieldOrder) {
                    if (!jsonPointerByField.containsKey(field)) {
                        jsonPointerByField.put(field, new JsonPointer(field));
                        fieldDotNotationByField.put(field, jsonPointerToDotNotation(field));
                    }
                }
                fieldOrderByTopic.put(topic, Collections.unmodifiableSet(fieldOrder));
            } catch (ResourceException e) {
                LOGGER.error(topic + " topic schema meta-data misconfigured.");
            }
        }
        this.fieldOrderByTopic = Collections.unmodifiableMap(fieldOrderByTopic);
        this.jsonPointerByField = Collections.unmodifiableMap(jsonPointerByField);
        this.fieldDotNotationByField = Collections.unmodifiableMap(fieldDotNotationByField);
    }

    private CsvPreference createCsvPreference(final CsvAuditEventHandlerConfiguration config) {
        return new CsvPreference.Builder(
                config.getFormatting().getQuoteChar(),
                config.getFormatting().getDelimiterChar(),
                config.getFormatting().getEndOfLineSymbols())
                .useQuoteMode(new AlwaysQuoteMode())
                .build();
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void startup() throws ResourceException {
        LOGGER.trace("Audit logging to: {}", configuration.getLogDirectory());
        File file = new File(configuration.getLogDirectory());
        if (!file.isDirectory()) {
            if (file.exists()) {
                LOGGER.warn("Specified path is file but should be a directory: {}", configuration.getLogDirectory());
            } else {
                if (!file.mkdirs()) {
                    LOGGER.warn("Unable to create audit directory in the path: {}", configuration.getLogDirectory());
                }
            }
        }
        for (String topic : eventTopicsMetaData.getTopics()) {
            File auditLogFile = getAuditLogFile(topic);
            try {
                openWriter(topic, auditLogFile);
            } catch (IOException e) {
                LOGGER.error("Error when creating audit file: {}", auditLogFile, e);
            }
        }
    }

    /** {@inheritDoc} */
    @Override
    public void shutdown() throws ResourceException {
        cleanup();
    }

    /**
     * Create a csv audit log entry.
     * {@inheritDoc}
     */
    @Override
    public Promise<ResourceResponse, ResourceException> publishEvent(Context context, String topic, JsonValue event) {
        try {
            checkTopic(topic);
            publishEventWithRetry(topic, event);
            return newResourceResponse(
                    event.get(ResourceResponse.FIELD_CONTENT_ID).asString(), null, event).asPromise();
        } catch (ResourceException e) {
            return e.asPromise();
        }
    }

    private void checkTopic(String topic) throws ResourceException {
        final JsonValue auditEventProperties = getAuditEventProperties(eventTopicsMetaData.getSchema(topic));
        if (auditEventProperties == null || auditEventProperties.isNull()) {
            throw new InternalServerErrorException("No audit event properties defined for audit event: " + topic);
        }
    }

    /**
     * Publishes the provided event, and returns the writer used.
     */
    private void publishEventWithRetry(final String topic, final JsonValue event)
                    throws ResourceException {
        final CsvWriter csvWriter = getWriter(topic);
        try {
            writeEvent(topic, csvWriter, event);
        } catch (IOException ex) {
            // Re-try once in case the writer stream became closed for some reason
            LOGGER.debug("IOException while writing ({})", ex.getMessage());
            CsvWriter newCsvWriter;
            // An IOException may be thrown if the csvWriter reference we have above was reset by another thread.
            // Synchronize to ensure that we wait for any reset to complete before proceeding - Otherwise, we may
            // lose multiple events or have multiple threads attempting to reset the writer.
            synchronized (this) {
                // Lookup the current writer directly from the map so we can check if another thread has reset it.
                newCsvWriter = writers.get(topic);
                if (newCsvWriter == csvWriter) {
                    // If both references are the same, the writer hasn't been reset.
                    newCsvWriter = resetAndReopenWriter(topic, false);
                    LOGGER.debug("Resetting writer");
                } else {
                    LOGGER.debug("Writer reset by another thread");
                }
            }
            try {
                writeEvent(topic, newCsvWriter, event);
            } catch (IOException e) {
                throw new BadRequestException(e);
            }
        }
    }

    /**
     * Lookup CsvWriter for specified topic.
     * <br/>
     * Uses lazy synchronization in case another thread may be resetting the writer. If the writer is still null
     * after synchronizing then the writer is reset.
     * <br/>
     * This method is only intended for use by {@link #publishEventWithRetry(String, JsonValue)}.
     */
    private CsvWriter getWriter(String topic) throws BadRequestException {
        CsvWriter csvWriter = writers.get(topic);
        if (csvWriter == null) {
            LOGGER.debug("CSV file writer for {} topic is null; checking for reset by another thread", topic);
            synchronized (this) {
                csvWriter = writers.get(topic);
                if (csvWriter == null) {
                    LOGGER.debug("CSV file writer for {} topic not reset by another thread; resetting", topic);
                    csvWriter = resetAndReopenWriter(topic, false);
                }
            }
        }
        return csvWriter;
    }

    private CsvWriter writeEvent(final String topic, CsvWriter csvWriter, final JsonValue event)
                    throws IOException {
        writeEntry(topic, csvWriter, event);
        EventBufferingConfiguration bufferConfig = configuration.getBuffering();
        if (!bufferConfig.isEnabled() || !bufferConfig.isAutoFlush()) {
            csvWriter.flush();
        }
        return csvWriter;
    }

    private Set<String> getFieldOrder(final String topic, final EventTopicsMetaData eventTopicsMetaData)
            throws ResourceException {
        final Set<String> fieldOrder = new LinkedHashSet<>();
        fieldOrder.addAll(generateJsonPointers(getAuditEventSchema(eventTopicsMetaData.getSchema(topic))));
        return fieldOrder;
    }

    private synchronized CsvWriter openWriter(final String topic, final File auditFile) throws IOException {
        final CsvWriter writer = createCsvWriter(auditFile, topic);
        writers.put(topic, writer);
        return writer;
    }

    private synchronized CsvWriter createCsvWriter(final File auditFile, String topic) throws IOException {
        String[] headers = buildHeaders(fieldOrderByTopic.get(topic));
        if (configuration.getSecurity().isEnabled()) {
            return new SecureCsvWriter(auditFile, headers, csvPreference, configuration, keyStoreHandler, RANDOM);
        } else {
            return new StandardCsvWriter(auditFile, headers, csvPreference, configuration);
        }
    }

    private ICsvMapReader createCsvMapReader(final File auditFile) throws IOException {
        CsvMapReader csvReader = new CsvMapReader(new FileReader(auditFile), csvPreference);

        if (configuration.getSecurity().isEnabled()) {
            return new CsvSecureMapReader(csvReader);
        } else {
            return csvReader;
        }
    }

    private String[] buildHeaders(final Collection<String> fieldOrder) {
        final String[] headers = new String[fieldOrder.size()];
        fieldOrder.toArray(headers);
        for (int i = 0; i < headers.length; i++) {
            headers[i] = jsonPointerToDotNotation(headers[i]);
        }
        return headers;
    }

    /**
     * Perform a query on the csv audit log.
     * {@inheritDoc}
     */
    @Override
    public Promise<QueryResponse, ResourceException> queryEvents(
            Context context,
            String topic,
            QueryRequest query,
            QueryResourceHandler handler) {
        try {
            for (final JsonValue value : getEntries(topic, query.getQueryFilter())) {
                handler.handleResource(newResourceResponse(value.get(FIELD_CONTENT_ID).asString(), null, value));
            }
            return newQueryResponse().asPromise();
        } catch (Exception e) {
            return new BadRequestException(e).asPromise();
        }
    }

    /**
     * Read from the csv audit log.
     * {@inheritDoc}
     */
    @Override
    public Promise<ResourceResponse, ResourceException> readEvent(Context context, String topic, String resourceId) {
        try {
            final Set<JsonValue> entry = getEntries(topic, QueryFilters.parse("/_id eq \"" + resourceId + "\""));
            if (entry.isEmpty()) {
                throw new NotFoundException(topic + " audit log not found");
            }
            final JsonValue resource = entry.iterator().next();
            return newResourceResponse(resource.get(FIELD_CONTENT_ID).asString(), null, resource).asPromise();
        } catch (ResourceException e) {
            return e.asPromise();
        } catch (IOException e) {
            return new BadRequestException(e).asPromise();
        }
    }

    @Override
    public Promise<ActionResponse, ResourceException> handleAction(
            Context context, String topic, ActionRequest request) {
        try {
            String action = request.getAction();
            if (topic == null) {
                return new BadRequestException(format("Topic is required for action %s", action)).asPromise();
            }
            if (action.equals(ROTATE_FILE_ACTION_NAME)) {
                return handleRotateAction(topic).asPromise();
            }
            final String error = format("This action is unknown for the CSV handler: %s", action);
            return new BadRequestException(error).asPromise();
        } catch (BadRequestException e) {
            return e.asPromise();
        }
    }

    private ActionResponse handleRotateAction(String topic)
            throws BadRequestException {
        CsvWriter csvWriter = writers.get(topic);
        if (csvWriter == null) {
            LOGGER.debug("Unable to rotate file for topic: {}", topic);
            throw new BadRequestException("Unable to rotate file for topic: " + topic);
        }
        if (configuration.getFileRotation().isRotationEnabled()) {
            try {
                if (!csvWriter.forceRotation()) {
                    throw new BadRequestException("Unable to rotate file for topic: " + topic);
                }
            } catch (IOException e) {
                throw new BadRequestException("Error when rotating file for topic: " + topic, e);
            }
        } else {
            // use a default rotation instead
            resetAndReopenWriter(topic, true);
        }
        return Responses.newActionResponse(json(object(field("rotated", "true"))));
    }

    private File getAuditLogFile(final String type) {
        final String prefix = configuration.getSecurity().isEnabled() ? SECURE_CSV_FILENAME_PREFIX : "";
        return new File(configuration.getLogDirectory(), prefix + type + ".csv");
    }

    private void writeEntry(final String topic, final CsvWriter csvWriter, final JsonValue obj) throws IOException {
        Set<String> fieldOrder = fieldOrderByTopic.get(topic);
        Map<String, String> cells = new HashMap<>(fieldOrder.size());
        for (Map.Entry<String, JsonPointer> columnKey : jsonPointerByField.entrySet()) {
            cells.put(fieldDotNotationByField.get(columnKey.getKey()),
                    JsonValueUtils.extractValueAsString(obj, columnKey.getValue()));
        }
        csvWriter.writeEvent(cells);
    }

    private synchronized CsvWriter resetAndReopenWriter(final String topic, boolean forceRotation)
            throws BadRequestException {
        closeWriter(topic);
        try {
            File auditLogFile = getAuditLogFile(topic);
            if (forceRotation) {
                TimeStampFileNamingPolicy namingPolicy = new TimeStampFileNamingPolicy(auditLogFile, null, null);
                File rotatedFile = namingPolicy.getNextName();
                if (!auditLogFile.renameTo(rotatedFile)) {
                    throw new BadRequestException(
                            format("Unable to rename file %s to %s when rotating", auditLogFile, rotatedFile));
                }
            }
            return openWriter(topic, auditLogFile);
        } catch (IOException e) {
            throw new BadRequestException(e);
        }
    }

    private synchronized void closeWriter(final String topic) {
        CsvWriter writerToClose = writers.remove(topic);
        if (writerToClose != null) {
            // attempt clean-up close
            try {
                writerToClose.close();
            } catch (Exception ex) {
                // Debug level as the writer is expected to potentially be invalid
                LOGGER.debug("File writer close in closeWriter reported failure ", ex);
            }
        }
    }

    /**
     * Parser the csv file corresponding the the specified audit entry type and returns a set of matching audit entries.
     *
     * @param auditEntryType the audit log type
     * @param queryFilter the query filter to apply to the entries
     * @return  A audit log entry; null if no entry exists
     * @throws IOException If unable to get an entry from the CSV file.
     */
    private Set<JsonValue> getEntries(final String auditEntryType, QueryFilter<JsonPointer> queryFilter)
            throws IOException {
        final File auditFile = getAuditLogFile(auditEntryType);
        final Set<JsonValue> results = new HashSet<>();
        if (queryFilter == null) {
            queryFilter = QueryFilter.alwaysTrue();
        }
        if (auditFile.exists()) {
            try (ICsvMapReader reader = createCsvMapReader(auditFile)) {
                // the header elements are used to map the values to the bean (names must match)
                final String[] header = convertDotNotationToSlashes(reader.getHeader(true));
                final CellProcessor[] processors = createCellProcessors(auditEntryType, header);
                Map<String, Object> entry;
                while ((entry = reader.read(header, processors)) != null) {
                    entry = convertDotNotationToSlashes(entry);
                    final JsonValue jsonEntry = expand(entry);
                    if (queryFilter.accept(JSONVALUE_FILTER_VISITOR, jsonEntry)) {
                        results.add(jsonEntry);
                    }
                }

            }
        }
        return results;
    }

    private CellProcessor[] createCellProcessors(final String auditEntryType, final String[] headers)
            throws ResourceException {
        final List<CellProcessor> cellProcessors = new ArrayList<>();
        final JsonValue auditEvent = eventTopicsMetaData.getSchema(auditEntryType);

        for (String header: headers) {
            final String propertyType = getPropertyType(auditEvent, new JsonPointer(header));
            if ((propertyType.equals(OBJECT_TYPE) || propertyType.equals(ARRAY_TYPE))) {
                cellProcessors.add(new Optional(new ParseJsonValue()));
            } else {
                cellProcessors.add(new Optional());
            }
        }

        return cellProcessors.toArray(new CellProcessor[cellProcessors.size()]);
    }

    /**
     * CellProcessor for parsing JsonValue objects from CSV file.
     */
    public class ParseJsonValue implements CellProcessor {

        @Override
        public Object execute(final Object value, final CsvContext context) {
            JsonValue jv = null;
            // Check if value is JSON object
            if (((String) value).startsWith("{") && ((String) value).endsWith("}")) {
                try {
                    jv = new JsonValue(MAPPER.readValue((String) value, Map.class));
                } catch (Exception e) {
                    LOGGER.debug("Error parsing JSON string: " + e.getMessage());
                }
            } else if (((String) value).startsWith("[") && ((String) value).endsWith("]")) {
                try {
                    jv = new JsonValue(MAPPER.readValue((String) value, List.class));
                } catch (Exception e) {
                    LOGGER.debug("Error parsing JSON string: " + e.getMessage());
                }
            }
            if (jv == null) {
                return value;
            }
            return jv.getObject();
        }

    }

    private synchronized void cleanup() throws ResourceException {
        try {
            for (CsvWriter csvWriter : writers.values()) {
                if (csvWriter != null) {
                    csvWriter.flush();
                    csvWriter.close();
                }
            }
        } catch (IOException e) {
            LOGGER.error("Unable to close filewriters during {} cleanup", this.getClass().getName(), e);
            throw new InternalServerErrorException(
                    "Unable to close filewriters during " + this.getClass().getName() + " cleanup", e);
        }
    }

    private Map<String, Object> convertDotNotationToSlashes(final Map<String, Object> entries) {
        final Map<String, Object> newEntry = new LinkedHashMap<>();
        for (Map.Entry<String, Object> entry : entries.entrySet()) {
            final String key = dotNotationToJsonPointer(entry.getKey());
            newEntry.put(key, entry.getValue());
        }
        return newEntry;
    }

    private String[] convertDotNotationToSlashes(final String[] entries) {
        String[] result = new String[entries.length];
        for (int i = 0; i < entries.length; i++) {
            result[i] = dotNotationToJsonPointer(entries[i]);
        }
        return result;
    }

}