Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.avro;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
Expand Down Expand Up @@ -58,6 +59,20 @@
private static int maxCollectionLength = MAX_ARRAY_VM_LIMIT;
private static int maxStringLength = MAX_ARRAY_VM_LIMIT;

/**
* System property declaring max size of any decompression stream: {@value}.
*/
public static final String MAX_DECOMPRESS_LENGTH_PROPERTY = "org.apache.avro.limits.decompress.maxLength";

/**
* Default limit: {@value}.
*/
private static final long DEFAULT_MAX_DECOMPRESS_LENGTH = 200L * 1024 * 1024;
private static final Logger LOG = LoggerFactory.getLogger(SystemLimitException.class);

public static final long MAX_DECOMPRESS_LENGTH = getLongLimitFromProperty(MAX_DECOMPRESS_LENGTH_PROPERTY,
DEFAULT_MAX_DECOMPRESS_LENGTH);

static {
resetLimits();
}
Expand Down Expand Up @@ -89,6 +104,35 @@
return i;
}

/**
* Get a long value stored in a system property, used to configure the system
* behaviour of output.
*
* @param property The system property to fetch
* @param defaultValue The value to use if the system property is not present or
* parsable as an int
* @return The value from the system property
*/
private static long getLongLimitFromProperty(String property, long defaultValue) {

Check notice

Code scanning / CodeQL

Useless parameter Note

The parameter 'property' is never used.

Check notice

Code scanning / CodeQL

Useless parameter Note

The parameter 'defaultValue' is never used.
Comment thread
github-advanced-security[bot] marked this conversation as resolved.
Fixed
Comment thread
github-advanced-security[bot] marked this conversation as resolved.
Fixed
String prop = System.getProperty(MAX_DECOMPRESS_LENGTH_PROPERTY);
long limit = DEFAULT_MAX_DECOMPRESS_LENGTH;
if (prop != null) {
try {
long parsed = Long.parseLong(prop);
if (parsed <= 0) {
LOG.warn("Invalid value '{}' for property '{}': must be positive. Using default: {}", prop,
MAX_DECOMPRESS_LENGTH_PROPERTY, DEFAULT_MAX_DECOMPRESS_LENGTH);
} else {
limit = parsed;
}
} catch (NumberFormatException e) {
LOG.warn("Could not parse property '{}' value '{}'. Using default: {}", MAX_DECOMPRESS_LENGTH_PROPERTY, prop,
DEFAULT_MAX_DECOMPRESS_LENGTH);
}
}
return limit;
}

/**
* Check to ensure that reading the bytes is within the specified limits.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,47 @@
import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;

import org.apache.avro.SystemLimitException;

/**
* Utility to make data written to an {@link ByteArrayOutputStream} directly
* available as a {@link ByteBuffer}.
* available as a {@link ByteBuffer}. Optional limit to amount of data which may
* be written.
* <p>
* The amount of data which can be written to any output stream is limited by
* the system property
* {@link SystemLimitException#MAX_DECOMPRESS_LENGTH_PROPERTY}
*
*/
public class NonCopyingByteArrayOutputStream extends ByteArrayOutputStream {

/**
* Size limit, -1 for no limits.
*/
private final long limit;

/**
* Creates a new byte array output stream, with a buffer capacity of the
* specified size, in bytes.
* specified size, in bytes, size limited by
* {@link SystemLimitException#MAX_DECOMPRESS_LENGTH}
*
* @param size the initial size
* @throws IllegalArgumentException if size is negative
*/
public NonCopyingByteArrayOutputStream(int size) {
this(size, SystemLimitException.MAX_DECOMPRESS_LENGTH);
}

/**
* Creates a new byte array output stream, with a buffer capacity of the
* specified size, in bytes, capacity limit as specified in {@code limit}.
*
* @param size buffer capacity
* @param limit size limit or -1 for no limit.
*/
NonCopyingByteArrayOutputStream(final int size, final long limit) {
super(size);
this.limit = limit;
}

/**
Expand All @@ -48,4 +74,36 @@ public NonCopyingByteArrayOutputStream(int size) {
public ByteBuffer asByteBuffer() {
return ByteBuffer.wrap(super.buf, 0, super.count);
}

/**
* Check there is capacity to write data.
*
* @param bytes bytes to add
* @throws SystemLimitException if the limit is exceeded.
*/
private void checkCapacity(int bytes) {
if (limit >= 0 && (size() + bytes) > limit) {
throw new SystemLimitException(
String.format("Buffer size %,d (bytes) exceeds maximum allowed size %,d.", (size() + bytes), limit));
}
}

@Override
public synchronized void write(final int b) {
checkCapacity(1);
super.write(b);
}

@Override
public synchronized void write(final byte[] b, final int off, final int len) {
checkCapacity(len);
super.write(b, off, len);
}

@Override
public void writeBytes(final byte[] b) {
checkCapacity(b.length);
super.writeBytes(b);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.avro.util;

import java.io.IOException;
import java.nio.ByteBuffer;

import org.junit.jupiter.api.Test;

import org.apache.avro.SystemLimitException;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class NonCopyingByteArrayOutputStreamTest {

/**
* Basic test: write then read.
*/
@Test
public void testDefaultWriteWorks() throws IOException {
NonCopyingByteArrayOutputStream out = new NonCopyingByteArrayOutputStream(1);
out.write('a');
final byte[] b = "string".getBytes();
out.write(b, 0, b.length);
out.close();
final ByteBuffer buffer = out.asByteBuffer();
assertEquals('a', buffer.get());
for (byte value : b) {
assertEquals(value, buffer.get());
}
}

/**
* Test write limiting.
*/
@Test
public void testLimitedWrite() throws IOException {
NonCopyingByteArrayOutputStream out = new NonCopyingByteArrayOutputStream(1, 4);
out.write('a');
// it's impossible to go over the limit in a write(bytes) call.
final byte[] b = "longstring".getBytes();
assertThrows(SystemLimitException.class, () -> out.write(b),
"Buffer size 11 (bytes) exceeds maximum allowed size 4.");
// we can still write up to the limit...the buffer has not been written to yet.
out.write(b, 0, 2);
out.write('z');
// now at end of file, so another write shall fail.
assertThrows(SystemLimitException.class, () -> out.write('x'));
out.close();
// validate everything successfully written is there
final ByteBuffer buffer = out.asByteBuffer();
for (byte value : "aloz".getBytes()) {
assertEquals(value, buffer.get());
}
}
}
Loading