Skip to content
Open
4 changes: 4 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@
<artifactId>auto-service</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ public ClientSideIteratorScanner(final Scanner scanner) {
this.range = scanner.getRange();
this.size = scanner.getBatchSize();
this.retryTimeout = scanner.getTimeout(MILLISECONDS);
// TODO Is this intended to be getBatchTimeout()?
Comment thread
kevinrr888 marked this conversation as resolved.
Outdated
this.batchTimeout = scanner.getTimeout(MILLISECONDS);
this.readaheadThreshold = scanner.getReadaheadThreshold();
SamplerConfiguration samplerConfig = scanner.getSamplerConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.Set;
import java.util.SortedSet;

import javax.annotation.concurrent.NotThreadSafe;

import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
Expand Down Expand Up @@ -76,6 +78,7 @@

import com.google.common.base.Preconditions;

@NotThreadSafe
class RFileScanner extends ScannerOptions implements Scanner {
Comment on lines +82 to 83
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scanners are not thread safe objects


private static class RFileScannerEnvironmentImpl extends ClientServiceEnvironmentImpl {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;

import javax.annotation.concurrent.NotThreadSafe;

import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.ScannerBase;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
Expand All @@ -46,6 +48,7 @@
import org.apache.accumulo.core.util.TextUtil;
import org.apache.hadoop.io.Text;

@NotThreadSafe
public class ScannerOptions implements ScannerBase {
Comment on lines +51 to 52
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scanners are not thread safe


protected List<IterInfo> serverSideIteratorList = Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;

import javax.annotation.concurrent.NotThreadSafe;

import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.SampleNotPresentException;
Expand Down Expand Up @@ -355,6 +357,7 @@ private String getTableInfo() {
return context.getPrintableTableInfoFromId(tableId);
}

@NotThreadSafe
private class QueryTask implements Runnable {
Comment on lines +360 to 361
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QueryTask is not used as/expected to be a thread-safe object


private final String tsLocation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import java.util.function.LongSupplier;
import java.util.function.Supplier;

import javax.annotation.concurrent.NotThreadSafe;

import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
import org.apache.accumulo.core.util.HostAndPort;
Expand Down Expand Up @@ -577,6 +579,7 @@ public TransportPoolShutdownException(String msg) {
private static final long serialVersionUID = 1L;
}

@NotThreadSafe
private static class CachedTTransport extends TTransport {
Comment on lines +582 to 583
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CachedTTransport is only modified by a single thread at a time (must be reserved()/unreserved())


private final ThriftTransportKey cacheKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@
import java.io.IOException;
import java.io.InputStream;

import javax.annotation.concurrent.NotThreadSafe;

/**
* Reader corresponding to BlockedOutputStream. Expects all data to be in the form of size (int)
* data (size bytes) junk (however many bytes it takes to complete a block)
*/
@NotThreadSafe
public class BlockedInputStream extends InputStream {
Comment on lines +32 to 33
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BlockedInputStream is a wrapper for DataInputStream, which is not thread safe

byte[] array;
// ReadPos is where to start reading
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

Expand Down Expand Up @@ -111,7 +112,7 @@ static class ReadLock implements Lock {

QueueLock qlock;
byte[] userData;
long entry = -1;
AtomicLong entry;
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is expected to be thread safe

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes may not be needed to this class. I think it is only used by a single thread, but not 100% sure. Did some sampling and found this usually created as a local var in a function to try to obtain a lock in zookeeper, but the local var is not shared among threads.


ReadLock(QueueLock qlock, byte[] userData) {
this.qlock = qlock;
Expand All @@ -122,7 +123,7 @@ static class ReadLock implements Lock {
ReadLock(QueueLock qlock, byte[] userData, long entry) {
this.qlock = qlock;
this.userData = userData;
this.entry = entry;
this.entry = new AtomicLong(entry);
}

protected LockType lockType() {
Expand Down Expand Up @@ -154,22 +155,27 @@ public void lockInterruptibly() throws InterruptedException {

@Override
public boolean tryLock() {
if (entry == -1) {
entry = qlock.addEntry(new ParsedLock(this.lockType(), this.userData).getLockData());
log.info("Added lock entry {} userData {} lockType {}", entry,
new String(this.userData, UTF_8), lockType());
}
SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);
var entryVal = entry.updateAndGet(val -> {
if (val == -1) {
var newVal = qlock.addEntry(new ParsedLock(this.lockType(), this.userData).getLockData());
Comment thread
kevinrr888 marked this conversation as resolved.
Outdated
log.info("Added lock entry {} userData {} lockType {}", newVal,
new String(this.userData, UTF_8), lockType());
return newVal;
} else {
return val;
}
});
SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entryVal);
for (Entry<Long,byte[]> entry : entries.entrySet()) {
ParsedLock parsed = new ParsedLock(entry.getValue());
if (entry.getKey().equals(this.entry)) {
if (entry.getKey().equals(entryVal)) {
return true;
}
if (parsed.type == LockType.WRITE) {
return false;
}
}
throw new IllegalStateException("Did not find our own lock in the queue: " + this.entry
throw new IllegalStateException("Did not find our own lock in the queue: " + entryVal
+ " userData " + new String(this.userData, UTF_8) + " lockType " + lockType());
}

Expand All @@ -190,13 +196,14 @@ public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {

@Override
public void unlock() {
if (entry == -1) {
return;
}
log.debug("Removing lock entry {} userData {} lockType {}", entry,
new String(this.userData, UTF_8), lockType());
qlock.removeEntry(entry);
entry = -1;
entry.updateAndGet(val -> {
if (val != -1) {
log.debug("Removing lock entry {} userData {} lockType {}", val,
new String(this.userData, UTF_8), lockType());
qlock.removeEntry(val);
}
return -1;
});
}

@Override
Expand All @@ -222,18 +229,23 @@ protected LockType lockType() {

@Override
public boolean tryLock() {
if (entry == -1) {
entry = qlock.addEntry(new ParsedLock(this.lockType(), this.userData).getLockData());
log.info("Added lock entry {} userData {} lockType {}", entry,
new String(this.userData, UTF_8), lockType());
}
SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entry);
var entryVal = entry.updateAndGet(val -> {
if (val == -1) {
var newVal = qlock.addEntry(new ParsedLock(this.lockType(), this.userData).getLockData());
log.info("Added lock entry {} userData {} lockType {}", newVal,
new String(this.userData, UTF_8), lockType());
return newVal;
} else {
return val;
}
});
SortedMap<Long,byte[]> entries = qlock.getEarlierEntries(entryVal);
Iterator<Entry<Long,byte[]>> iterator = entries.entrySet().iterator();
if (!iterator.hasNext()) {
throw new IllegalStateException("Did not find our own lock in the queue: " + this.entry
throw new IllegalStateException("Did not find our own lock in the queue: " + entryVal
+ " userData " + new String(this.userData, UTF_8) + " lockType " + lockType());
}
return iterator.next().getKey().equals(entry);
return iterator.next().getKey().equals(entryVal);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.annotation.concurrent.NotThreadSafe;

import org.apache.accumulo.core.bloomfilter.DynamicBloomFilter;
import org.apache.accumulo.core.classloader.ClassLoaderUtil;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
Expand Down Expand Up @@ -346,6 +348,7 @@ public void close() {
}
}

@NotThreadSafe
public static class Reader implements FileSKVIterator {
Comment on lines +351 to 352
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

accumulo iterators are not expected to be thread safe


private final BloomFilterLoader bfl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@
import java.io.IOException;
import java.io.InputStream;

import javax.annotation.concurrent.NotThreadSafe;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

/**
* This class is like byte array input stream with two differences. It supports seeking and avoids
* synchronization.
*/
@NotThreadSafe
public class SeekableByteArrayInputStream extends InputStream {
Comment on lines 30 to 35
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is clearly not meant to be thread safe: "It ... avoids synchronization."


// making this volatile for the following case
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.annotation.concurrent.NotThreadSafe;

import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
Expand All @@ -43,6 +45,7 @@
public class MapFileOperations extends FileOperations {
private static final String MSG = "Map files are not supported";

@NotThreadSafe
public static class RangeIterator implements FileSKVIterator {
Comment on lines +48 to 49
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

accumulo iterators are not expected to be thread safe


SortedKeyValueIterator<Key,Value> reader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.annotation.concurrent.NotThreadSafe;

import org.apache.accumulo.core.client.SampleNotPresentException;
import org.apache.accumulo.core.client.sample.Sampler;
import org.apache.accumulo.core.client.sample.SamplerConfiguration;
Expand Down Expand Up @@ -557,6 +559,7 @@ public void close() throws IOException {
}
}

@NotThreadSafe
public static class Writer implements FileSKVWriter {
Comment on lines +562 to 563
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

impl clearly expects single threaded access (no volatiles or sync (other than close))


public static final int MAX_CF_IN_DLG = 1000;
Expand Down Expand Up @@ -754,6 +757,7 @@ public long getLength() {
}
}

@NotThreadSafe
private static class LocalityGroupReader extends LocalityGroup implements FileSKVIterator {
Comment on lines +760 to 761
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

accumulo iterators are not expected to be thread safe


private final CachableBlockFile.Reader reader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
import java.io.IOException;
import java.io.OutputStream;

import javax.annotation.concurrent.NotThreadSafe;

/**
* A simplified BufferedOutputStream with borrowed buffer, and allow users to see how much data have
* been buffered.
*/
@NotThreadSafe
class SimpleBufferedOutputStream extends FilterOutputStream {
Comment on lines +31 to 32
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that this is a stream and the the impl itself show this is not a thread safe class

protected byte[] buf; // the borrowed buffer
protected int count = 0; // bytes used in buffer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@
import java.io.IOException;
import java.io.InputStream;

import javax.annotation.concurrent.NotThreadSafe;

import org.apache.hadoop.fs.Seekable;

/**
* BoundedRangeFIleInputStream abstracts a contiguous region of a Hadoop FSDataInputStream as a
* regular input stream. One can create multiple BoundedRangeFileInputStream on top of the same
* FSDataInputStream and they would not interfere with each other.
*/
@NotThreadSafe
public class BoundedRangeFileInputStream extends InputStream {
Comment on lines 28 to 34
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Desc and impl show multiple threads can have their own BoundedRangeFileInputStream for same underlying data, but not that they can share same BoundedRangeFileInputStream.


private volatile boolean closed = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.annotation.concurrent.NotThreadSafe;

import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Column;
import org.apache.accumulo.core.data.Key;
Expand All @@ -36,6 +38,7 @@
import org.apache.accumulo.core.iterators.ServerSkippingIterator;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;

@NotThreadSafe
public class ColumnFamilySkippingIterator extends ServerSkippingIterator
implements InterruptibleIterator {
Comment on lines +41 to 43
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

accumulo iterators are not expected to be thread safe


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;

import javax.annotation.concurrent.NotThreadSafe;

import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
Expand All @@ -34,6 +36,7 @@
import org.apache.accumulo.core.iterators.ServerWrappingIterator;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;

@NotThreadSafe
public class StatsIterator extends ServerWrappingIterator {
Comment on lines +39 to 40
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

accumulo iterators are not expected to be thread safe. The counters here are shared, but those are safely atomic. Internal state "numRead" is not and is not expected to be. deepCopy() shows intended use.


private int numRead = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import javax.annotation.concurrent.NotThreadSafe;

import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.IsolatedScanner;
Expand Down Expand Up @@ -89,6 +91,7 @@
*/
public class TabletsMetadata implements Iterable<TabletMetadata>, AutoCloseable {

@NotThreadSafe
public static class Builder implements TableRangeOptions, TableOptions, RangeOptions, Options {
Comment on lines +94 to 95
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

impl clearly shows that this is not intended to be thread safe (no volatile, no synchronization)


private final List<Text> families = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;

/**
* This class automates management of static singletons that maintain state for Accumulo clients.
* Historically, Accumulo client code that used Connector had no control over these singletons. The
Expand Down Expand Up @@ -81,6 +83,8 @@ public enum Mode {
private static List<SingletonService> services;

@VisibleForTesting
@SuppressFBWarnings(value = "AT_NONATOMIC_64BIT_PRIMITIVE",
justification = "only called in static init block and testing, no sync needed")
static void reset() {
reservations = 0;
mode = Mode.CLIENT;
Expand Down
Loading
Loading