IO.java
/*
* The contents of this file are subject to the terms of the Common Development and
* Distribution License (the License). You may not use this file except in compliance with the
* License.
*
* You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the
* specific language governing permission and limitations under the License.
*
* When distributing Covered Software, include this CDDL Header Notice in each file and include
* the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL
* Header, with the fields enclosed by brackets [] replaced by your own identifying
* information: "Portions Copyright [year] [name of copyright owner]".
*
* Copyright 2009 Sun Microsystems Inc.
* Portions Copyright 2010–2011 ApexIdentity Inc.
* Portions Copyright 2011-2016 ForgeRock AS.
*/
package org.forgerock.http.io;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Reader;
import java.io.Writer;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.forgerock.util.Factory;
/**
* Utility class that can stream to and from streams.
*/
public final class IO {
/** 8 KiB. */
public static final int DEFAULT_TMP_INIT_LENGTH = 8 * 1_024;
/** 64 KiB. */
public static final int DEFAULT_TMP_MEMORY_LIMIT = 64 * 1_024;
/** 1 GiB. */
public static final int DEFAULT_TMP_FILE_LIMIT = 1 * 1_024 * 1_024 * 1_024;
/** Size of buffer to use during streaming. */
private static final int BUF_SIZE = 8 * 1_024;
private static final InputStream NULL_INPUT_STREAM = new ByteArrayInputStream(new byte[0]);
private static final OutputStream NULL_OUTPUT_STREAM = new OutputStream() {
@Override
public void write(final int b) throws IOException {
// goes nowhere, does nothing
}
};
/** Initial size of pre-allocated buffer pools. */
private static final int BUF_POOL_INITIAL_SIZE = 32;
/**
* Pool of pre-allocated {@code byte[]} buffers, which will grow in size up to the maximum concurrent threads
* that call this class, with {@link #BUF_SIZE} amount of memory allocated for each.
*/
private static final Queue<byte[]> BYTE_BUF_POOL;
/**
* Pool of pre-allocated {@code char[]} buffers, which will grow in size up to the maximum concurrent threads
* that call this class, with {@link #BUF_SIZE} amount of memory allocated for each.
*/
private static final Queue<char[]> CHAR_BUF_POOL;
static {
BYTE_BUF_POOL = new ConcurrentLinkedQueue<>();
CHAR_BUF_POOL = new ConcurrentLinkedQueue<>();
for (int i = 0; i < BUF_POOL_INITIAL_SIZE; ++i) {
BYTE_BUF_POOL.add(new byte[BUF_SIZE]);
CHAR_BUF_POOL.add(new char[BUF_SIZE]);
}
}
/**
* Creates a new branching input stream that wraps a byte array.
*
* @param bytes
* byte array to wrap with the branching input stream.
* @return The branching input stream.
*/
public static BranchingInputStream newBranchingInputStream(final byte[] bytes) {
return new ByteArrayBranchingStream(bytes);
}
/**
* Creates a new branching input stream to wrap another input stream. All
* divergence between branches is maintained in a temporary buffer.
* <p>
* If the stream being wrapped is a branching input stream, this constructor
* will simply branch off of that existing stream rather than wrapping it
* with another branching input stream.
* <p>
* <strong>Note:</strong> This stream and any branches it creates are not
* safe for use by multiple concurrent threads.
*
* @param in
* the stream to be wrapped.
* @param bufferFactory
* an object that can create new temporary buffers (e.g. @link
* TemporaryStorage}).
* @return The branching input stream.
*/
public static BranchingInputStream newBranchingInputStream(final InputStream in,
final Factory<Buffer> bufferFactory) {
return new BranchingStreamWrapper(in, bufferFactory);
}
/**
* Creates a new file buffer that uses a local file for data storage.
* <p>
* <strong>Note:</strong> The returned buffer is not synchronized. If
* multiple threads access a buffer concurrently, threads that append to the
* buffer should synchronize on the instance of this object.
*
* @param file
* the file to use as storage for the buffer.
* @param limit
* the buffer length limit, after which an
* {@link OverflowException} will be thrown.
* @return The file buffer.
* @throws FileNotFoundException
* if the file cannot be created or opened for writing.
* @throws SecurityException
* if a security manager denies access to the specified file.
*/
public static Buffer newFileBuffer(final File file, final int limit)
throws FileNotFoundException {
return new FileBuffer(file, limit);
}
/**
* Creates a new buffer that uses a byte array for data storage. The byte
* array starts at a prescribed initial length, and grows exponentially up
* to the prescribed limit.
* <p>
* <strong>Note:</strong> The returned buffer is not synchronized. If
* multiple threads access a buffer concurrently, threads that append to the
* buffer should synchronize on the instance of this object.
*
* @param initial
* the initial size of the byte array to create.
* @param limit
* the buffer length limit, after which an
* {@link OverflowException} will be thrown.
* @return The memory buffer.
*/
public static Buffer newMemoryBuffer(final int initial, final int limit) {
return new MemoryBuffer(initial, limit);
}
/**
* Creates a new temporary buffer that first uses memory, then a temporary
* file for data storage. Initially, a {@link #newMemoryBuffer(int, int)
* memory} buffer is used; when the memory buffer limit is exceeded it
* promotes to the use of a {@link #newFileBuffer(File, int) file} buffer.
*
* @param initialLength
* the initial length of memory buffer byte array.
* @param memoryLimit
* the length limit of the memory buffer.
* @param fileLimit
* the length limit of the file buffer.
* @param directory
* the directory where temporary files are created, or
* {@code null} to use the system-dependent default temporary
* directory.
* @return The temporary buffer.
*/
public static Buffer newTemporaryBuffer(final int initialLength, final int memoryLimit,
final int fileLimit, final File directory) {
return new TemporaryBuffer(initialLength, memoryLimit, fileLimit, directory);
}
/**
* Creates a new storage using the system dependent default temporary
* directory and default sizes. Equivalent to call
* {@code newTemporaryStorage(null)}.
*
* @return The temporary storage.
*/
public static Factory<Buffer> newTemporaryStorage() {
return newTemporaryStorage(null);
}
/**
* Builds a storage using the given directory (may be {@literal null}) and
* default sizes. Equivalent to call
* {@code newTemporaryStorage(directory, HEIGHT_KB, SIXTY_FOUR_KB, ONE_MB)}
* .
*
* @param directory
* The directory where temporary files are created. If
* {@code null}, then the system-dependent default temporary
* directory will be used.
* @return The temporary storage.
*/
public static Factory<Buffer> newTemporaryStorage(final File directory) {
return newTemporaryStorage(directory, DEFAULT_TMP_INIT_LENGTH, DEFAULT_TMP_MEMORY_LIMIT,
DEFAULT_TMP_FILE_LIMIT);
}
/**
* Builds a storage using the given directory (may be {@literal null}) and
* provided sizes.
*
* @param directory
* The directory where temporary files are created. If
* {@code null}, then the system-dependent default temporary
* directory will be used.
* @param initialLength
* The initial length of memory buffer byte array.
* @param memoryLimit
* The length limit of the memory buffer. Attempts to exceed this
* limit will result in promoting the buffer from a memory to a
* file buffer.
* @param fileLimit
* The length limit of the file buffer. Attempts to exceed this
* limit will result in an {@link OverflowException} being
* thrown.
* @return The temporary storage.
*/
public static Factory<Buffer> newTemporaryStorage(final File directory,
final int initialLength, final int memoryLimit, final int fileLimit) {
return new Factory<Buffer>() {
@Override
public Buffer newInstance() {
return newTemporaryBuffer(initialLength, memoryLimit, fileLimit, directory);
}
};
}
/**
* Returns an input stream that holds no data.
*
* @return An input stream that holds no data.
*/
public static InputStream nullInputStream() {
return NULL_INPUT_STREAM;
}
/**
* Returns an output stream that discards all data written to it.
*
* @return An output stream that discards all data written to it.
*/
public static OutputStream nullOutputStream() {
return NULL_OUTPUT_STREAM;
}
/**
* Streams all data from an input stream to an output stream.
*
* @param in
* the input stream to stream the data from.
* @param out
* the output stream to stream the data to.
* @throws IOException
* if an I/O exception occurs.
*/
public static void stream(final InputStream in, final OutputStream out) throws IOException {
byte[] buf = BYTE_BUF_POOL.poll();
if (buf == null) {
buf = new byte[BUF_SIZE];
}
try {
int n;
while ((n = in.read(buf, 0, BUF_SIZE)) != -1) {
out.write(buf, 0, n);
}
} finally {
BYTE_BUF_POOL.add(buf);
}
}
/**
* Streams data from an input stream to an output stream, up to a specified
* length.
*
* @param in
* the input stream to stream the data from.
* @param out
* the output stream to stream the data to.
* @param len
* the number of bytes to stream.
* @return the actual number of bytes streamed.
* @throws IOException
* if an I/O exception occurs.
*/
public static int stream(final InputStream in, final OutputStream out, final int len)
throws IOException {
byte[] buf = BYTE_BUF_POOL.poll();
if (buf == null) {
buf = new byte[BUF_SIZE];
}
try {
int remaining = len;
int n;
while (remaining > 0 && (n = in.read(buf, 0, Math.min(remaining, BUF_SIZE))) >= 0) {
out.write(buf, 0, n);
remaining -= n;
}
return len - remaining;
} finally {
BYTE_BUF_POOL.add(buf);
}
}
/**
* Streams all characters from a reader to a writer.
*
* @param in
* reader to stream the characters from.
* @param out
* the writer to stream the characters to.
* @throws IOException
* if an I/O exception occurs.
*/
public static void stream(final Reader in, final Writer out) throws IOException {
char[] buf = CHAR_BUF_POOL.poll();
if (buf == null) {
buf = new char[BUF_SIZE];
}
try {
int n;
while ((n = in.read(buf, 0, BUF_SIZE)) != -1) {
out.write(buf, 0, n);
}
} finally {
CHAR_BUF_POOL.add(buf);
}
}
/** Static methods only. */
private IO() {
}
}