PipeBufferedStream.java

/*
 * The contents of this file are subject to the terms of the Common Development and
 * Distribution License (the License). You may not use this file except in compliance with the
 * License.
 *
 * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
 * specific language governing permission and limitations under the License.
 *
 * When distributing Covered Software, include this CDDL Header Notice in each file and include
 * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
 * Header, with the fields enclosed by brackets [] replaced by your own identifying
 * information: "Portions Copyright [year] [name of copyright owner]".
 *
 * Portions Copyright 2014-2016 ForgeRock AS.
 */

package org.forgerock.http.io;

import static org.forgerock.http.io.IO.newBranchingInputStream;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicInteger;

import org.forgerock.util.Factory;

/**
 * Represents a pipe for transferring bytes from an {@link java.io.OutputStream} to a
 * {@link org.forgerock.http.io.BranchingInputStream}.
 * This class is not thread-safe : the buffer has to be fully filled before reading from it : if the consumers reads
 * faster than the producer writes into it, then the consumer will get to the end of the buffer and that will be
 * interpreted an end-of-stream.
 */
public final class PipeBufferedStream {
    private final OutputStream outputStream;
    private final BranchingInputStream inputStream;
    /** The buffer will be closed once both the input and output stream are closed. */
    private final AtomicInteger bufferRefCount = new AtomicInteger(2);
    private final Buffer buffer;
    private int position = 0;

    /**
     * Constructs a new {@link PipeBufferedStream} with a default {@link Factory}.
     */
    public PipeBufferedStream() {
        this(IO.newTemporaryStorage());
    }

    /**
     * Constructs a new {@link PipeBufferedStream} with the given {@link Factory}.
     *
     * @param bufferFactory The buffer factory to use to create the {@link BranchingInputStream}
     */
    public PipeBufferedStream(final Factory<Buffer> bufferFactory) {
        outputStream = new PipeOutputStream();
        inputStream = newBranchingInputStream(new PipeInputStream(), bufferFactory);
        this.buffer = bufferFactory.newInstance();
    }

    /**
     * Returns the output stream which writes to the pipe.
     *
     * @return The output stream.
     */
    public OutputStream getIn() {
        return outputStream;
    }

    /**
     * Returns the input stream which reads from the pipe.
     *
     * @return The input stream.
     */
    public BranchingInputStream getOut() {
        return inputStream;
    }

    private void closeBufferIfNeeded() throws IOException {
        if (bufferRefCount.decrementAndGet() == 0) {
            buffer.close();
        }
    }

    private class PipeOutputStream extends OutputStream {
        @Override
        public void write(final int i) throws IOException {
            buffer.append((byte) i);
        }

        @Override
        public void write(final byte[] b, final int off, final int len) throws IOException {
            buffer.append(b, off, len);
        }

        @Override
        public void close() throws IOException {
            closeBufferIfNeeded();
        }
    }

    private class PipeInputStream extends InputStream {
        @Override
        public int read() throws IOException {
            return position < buffer.length() ? buffer.read(position++) : -1;
        }

        @Override
        public int read(final byte[] b, final int off, final int len) throws IOException {
            if (position < buffer.length()) {
                final int readLength = buffer.read(position, b, off, len);
                position += readLength;
                return readLength;
            }
            return -1;
        }

        @Override
        public void close() throws IOException {
            closeBufferIfNeeded();
        }
    }
}