Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions dev-support/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@
</module>

<module name="SuppressWarningsFilter"/>
<module name="SuppressionSingleFilter">
<property name="checks" value="FileLength"/>
<property name="files" value="RaftServerImpl.java"/>
</module>
Comment thread
peterxcli marked this conversation as resolved.
Outdated

<!-- Checks that a package-info.java file exists for each package. -->
<!-- See http://checkstyle.sf.net/config_javadoc.html#JavadocPackage -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.DataStreamReply;
Expand All @@ -42,6 +43,7 @@
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.RoutingTable;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.protocol.exceptions.AlreadyExistsException;
import org.apache.ratis.protocol.exceptions.DataStreamException;
import org.apache.ratis.server.RaftConfiguration;
Expand All @@ -53,6 +55,7 @@
import org.apache.ratis.statemachine.StateMachine.DataChannel;
import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelFuture;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelId;
import org.apache.ratis.util.ConcurrentUtils;
Expand All @@ -68,6 +71,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -352,9 +356,36 @@ static DataStreamReplyByteBuffer newDataStreamReplyByteBuffer(DataStreamRequestB
.setDataStreamPacket(request)
.setBuffer(buffer)
.setSuccess(reply.isSuccess())
.setCommitInfos(reply.getCommitInfos())
.build();
}

static DataStreamReplyByteBuffer newDataStreamReadOnlyReplyByteBuffer(DataStreamRequestByteBuf request,
long streamOffset, ByteBuffer buffer) {
final ByteBuffer readOnlyBuffer = buffer.asReadOnlyBuffer();
return DataStreamReplyByteBuffer.newBuilder()
.setClientId(request.getClientId())
.setType(Type.STREAM_DATA)
.setStreamId(request.getStreamId())
.setStreamOffset(streamOffset)
.setBuffer(readOnlyBuffer)
.setSuccess(true)
.setBytesWritten(readOnlyBuffer.remaining())
.build();
}

private static CompletableFuture<Void> writeAndFlush(ChannelHandlerContext ctx, DataStreamReply reply) {
final CompletableFuture<Void> future = new CompletableFuture<>();
ctx.writeAndFlush(reply).addListener(channelFuture -> {
if (channelFuture.isSuccess()) {
future.complete(null);
} else {
future.completeExceptionally(channelFuture.cause());
}
});
return future;
}
Comment thread
peterxcli marked this conversation as resolved.
Outdated

private void sendReply(List<CompletableFuture<DataStreamReply>> remoteWrites,
DataStreamRequestByteBuf request, long bytesWritten, Collection<CommitInfoProto> commitInfos,
ChannelHandlerContext ctx) {
Expand Down Expand Up @@ -450,6 +481,23 @@ private void readImpl(DataStreamRequestByteBuf request, ChannelHandlerContext ct
// add to ChannelMap
channels.add(channelId, key);

if (request.getType() == Type.STREAM_HEADER) {
final RaftClientRequest raftClientRequest = toRaftClientRequest(request);
if (raftClientRequest.is(TypeCase.READ)) {
submitReadOnlyRequest(request, raftClientRequest, ctx).whenComplete((v, exception) -> {
try {
if (exception != null) {
replyDataStreamException(server, exception, raftClientRequest, request, ctx);
}
} finally {
request.release();
channels.remove(channelId, key);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • request can be release earlier -- we only need to clientId and streamId in the read stream.
  • channelId is not used. So, we don't need a ChannelMap in ReadStreamManagement.

}
});
return;
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new read streams have nothing to do with the existing write streams. Let's do the check in NettyServerStreamRpc.

@@ -235,6 +237,9 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
 
         final DataStreamRequestByteBuf request = (DataStreamRequestByteBuf)msg;
         try(UncheckedAutoCloseable autoReset = requestRef.set(request)) {
+          if (reads.process(request, ctx)) {
+            return;
+          }
           requests.read(request, ctx, proxies.get(request)::getDataStreamOutput);
         }
       }


final StreamInfo info;
if (request.getType() == Type.STREAM_HEADER) {
final MemoizedSupplier<StreamInfo> supplier = JavaUtils.memoize(
Expand Down Expand Up @@ -510,6 +558,72 @@ private void readImpl(DataStreamRequestByteBuf request, ChannelHandlerContext ct
});
}

private static RaftClientRequest toRaftClientRequest(DataStreamRequestByteBuf request) {
try {
return ClientProtoUtils.toRaftClientRequest(RaftClientRequestProto.parseFrom(request.slice().nioBuffer()));
} catch (Throwable e) {
throw new CompletionException(e);
}
}

private CompletableFuture<Void> submitReadOnlyRequest(DataStreamRequestByteBuf request,
RaftClientRequest raftClientRequest, ChannelHandlerContext ctx) {
try {
final StateMachine.DataChannel readOnlyDataStream = new StateMachine.DataChannel() {
private long streamOffset;
private boolean closed;

@Override
public synchronized boolean isOpen() {
return !closed;
}

@Override
public synchronized void close() {
closed = true;
}

@Override
public synchronized void force(boolean metadata) throws IOException {
if (!isOpen()) {
throw new AlreadyClosedException("Channel closed at offset " + streamOffset);
}
ctx.flush();
}

@Override
public synchronized int write(ByteBuffer buffer) throws IOException {
if (!isOpen()) {
throw new AlreadyClosedException("Channel closed at offset " + streamOffset);
}
final int length = buffer.remaining();
final DataStreamReplyByteBuffer reply = newDataStreamReadOnlyReplyByteBuffer(request, streamOffset, buffer);
final ChannelFuture future = ctx.writeAndFlush(reply);
try {
future.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new InterruptedIOException(
"Interrupted while writing " + length + " bytes at offset " + streamOffset);
}
if (!future.isSuccess()) {
final Throwable cause = future.cause();
if (cause instanceof IOException) {
throw (IOException) cause;
}
Comment thread
peterxcli marked this conversation as resolved.
Outdated
throw new IOException("Failed to write " + length + " bytes at offset " + streamOffset, cause);
}
streamOffset += length;
return length;
}
};
return server.streamReadOnlyAsync(raftClientRequest, readOnlyDataStream)
.thenCompose(reply -> writeAndFlush(ctx, newDataStreamReplyByteBuffer(request, reply)));
} catch (IOException e) {
return JavaUtils.completeExceptionally(e);
}
}

static void assertReplyCorrespondingToRequest(
final DataStreamRequestByteBuf request, final DataStreamReply reply) {
Preconditions.assertTrue(request.getClientId().equals(reply.getClientId()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
Expand All @@ -32,6 +33,8 @@
import org.apache.ratis.protocol.AdminProtocol;
import org.apache.ratis.protocol.RaftClientAsynchronousProtocol;
import org.apache.ratis.protocol.RaftClientProtocol;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
Expand Down Expand Up @@ -150,6 +153,18 @@ default RaftGroup getGroup() {
/** @return the data stream rpc service. */
DataStreamServerRpc getDataStreamServerRpc();

/**
* Submit a read-only request whose response may be streamed through the data stream RPC.
*
* @param request the read-only request
* @param stream the stream for response data chunks
* @return a future for the terminal reply
*/
default CompletableFuture<RaftClientReply> streamReadOnlyAsync(
RaftClientRequest request, StateMachine.DataChannel stream) throws IOException {
throw new UnsupportedOperationException("This method is NOT supported.");
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new method seems not needed since we may:

  • Phase 1: Directly call DataApi.streamReadOnly(..) and ignore all linearizable checks.
  • Phase 2: Reuse RaftClientAsynchronousProtocol.submitClientRequestAsync(..) to submit a dummy read request for linearizable checks and then call DataApi.streamReadOnly(..).

Of course, we should start with Phase 1 for simpilcity.


/** @return the {@link RpcType}. */
default RpcType getRpcType() {
return getFactory().getRpcType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,18 @@ default CompletableFuture<DataStream> stream(RaftClientRequest request) {
return CompletableFuture.completedFuture(null);
}

/**
* Stream a read-only state machine request. Implementations may write zero or more data
* chunks before completing the returned future with the terminal reply message.
*
* @param request the read-only client request
* @param stream the output stream for response data chunks
* @return a future for the terminal reply message
*/
default CompletableFuture<Message> streamReadOnly(RaftClientRequest request, DataChannel stream) {
Comment thread
peterxcli marked this conversation as resolved.
Outdated
throw new UnsupportedOperationException("This method is NOT supported.");
}

/**
* Link asynchronously the given stream with the given log entry.
* The given stream can be null if it is unavailable due to errors.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -971,6 +971,31 @@ CompletableFuture<RaftClientReply> executeSubmitClientRequestAsync(RaftClientReq
clientExecutor).join();
}

CompletableFuture<RaftClientReply> executeStreamReadOnlyAsync(
RaftClientRequest request, StateMachine.DataChannel stream) {
return CompletableFuture.supplyAsync(() -> JavaUtils.callAsUnchecked(() -> TraceServer.traceAsyncMethod(() -> {
assertLifeCycleState(LifeCycle.States.RUNNING);
if (!request.is(TypeCase.READ)) {
throw new IOException("Expected a read-only request but got " + request);
}
LOG.debug("{}: receive read-only stream request({})", getMemberId(), request);
final Timekeeper timer = raftServerMetrics.getClientRequestTimer(request.getType());
final Optional<Timekeeper.Context> timerContext = Optional.ofNullable(timer).map(Timekeeper::time);
return readAsync(request, r -> {
try {
return processQueryFuture(stateMachine.data().streamReadOnly(r, stream), r);
} catch (UnsupportedOperationException e) {
return queryStateMachine(r);
}
}).whenComplete((clientReply, exception) -> {
timerContext.ifPresent(Timekeeper.Context::stop);
if (exception != null || clientReply.getException() != null) {
raftServerMetrics.incFailedRequestCount(request.getType());
}
});
}, request, getMemberId().toString(), SpanNames.SUBMIT_CLIENT_REQUEST_ASYNC), CompletionException::new),
clientExecutor).thenCompose(Function.identity());
}
@Override
public CompletableFuture<RaftClientReply> submitClientRequestAsync(
RaftClientRequest request) throws IOException {
Expand Down Expand Up @@ -1110,37 +1135,36 @@ private CompletableFuture<Long> getReadIndex(RaftClientRequest request, LeaderSt
return writeIndexCache.getWriteIndexFuture(request).thenCompose(leader::getReadIndex);
}
private CompletableFuture<RaftClientReply> readAsync(RaftClientRequest request) {
return readAsync(request, this::queryStateMachine);
}

private CompletableFuture<RaftClientReply> readAsync(
RaftClientRequest request, Function<RaftClientRequest, CompletableFuture<RaftClientReply>> query) {
if (request.getType().getRead().getPreferNonLinearizable()
|| readOption == RaftServerConfigKeys.Read.Option.DEFAULT) {
final CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
if (reply != null) {
return reply;
}
return queryStateMachine(request);
} else if (readOption == RaftServerConfigKeys.Read.Option.LINEARIZABLE){
final LeaderStateImpl leader = role.getLeaderState().orElse(null);
final CompletableFuture<Long> replyFuture;
if (leader != null) {
replyFuture = getReadIndex(request, leader);
} else {
replyFuture = sendReadIndexAsync(request).thenApply(reply -> {
if (reply != null) {
return reply;
}
return query.apply(request);
}
if (readOption != RaftServerConfigKeys.Read.Option.LINEARIZABLE) {
throw new IllegalStateException("Unexpected read option: " + readOption);
}
final LeaderStateImpl leader = role.getLeaderState().orElse(null);
final CompletableFuture<Long> replyFuture = leader != null ? getReadIndex(request, leader)
: sendReadIndexAsync(request).thenApply(reply -> {
if (reply.getServerReply().getSuccess()) {
return reply.getReadIndex();
} else {
throw new CompletionException(new ReadIndexException(getId() +
": Failed to get read index from the leader: " + reply));
}
throw new CompletionException(new ReadIndexException(getId()
+ ": Failed to get read index from the leader: " + reply));
});
}

return replyFuture
.thenCompose(readIndex -> getState().getReadRequests().waitToAdvance(readIndex,
() -> getReadException("add", snapshotInstallationHandler.getInProgressInstallSnapshotIndex(), false)))
.thenCompose(readIndex -> queryStateMachine(request))
.exceptionally(e -> readException2Reply(request, e));
} else {
throw new IllegalStateException("Unexpected read option: " + readOption);
}
return replyFuture
.thenCompose(readIndex -> getState().getReadRequests().waitToAdvance(readIndex,
() -> getReadException("add", snapshotInstallationHandler.getInProgressInstallSnapshotIndex(), false)))
.thenCompose(readIndex -> query.apply(request))
.exceptionally(e -> readException2Reply(request, e));
}
private RaftClientReply readException2Reply(RaftClientRequest request, Throwable e) {
e = JavaUtils.unwrapCompletionException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,13 @@ public CompletableFuture<RaftClientReply> submitClientRequestAsync(RaftClientReq
.thenCompose(impl -> impl.executeSubmitClientRequestAsync(request));
}

@Override
public CompletableFuture<RaftClientReply> streamReadOnlyAsync(
RaftClientRequest request, StateMachine.DataChannel stream) {
return getImplFuture(request.getRaftGroupId())
.thenCompose(impl -> impl.executeStreamReadOnlyAsync(request, stream));
}

@Override
public RaftClientReply submitClientRequest(RaftClientRequest request)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
Expand Down Expand Up @@ -147,8 +148,14 @@ static RoutingTable getRoutingTableChainTopology(Iterable<RaftPeerId> peers, Raf
}

class MultiDataStreamStateMachine extends BaseStateMachine {
static final int READ_ONLY_STREAM_CHUNKS = 3;

private final ConcurrentMap<ClientInvocationId, SingleDataStream> streams = new ConcurrentHashMap<>();

static ByteString getReadOnlyStreamChunk(ByteString query, int index) {
return ByteString.copyFromUtf8(query.toStringUtf8() + "-chunk-" + index);
}

@Override
public CompletableFuture<DataStream> stream(RaftClientRequest request) {
final SingleDataStream s = new SingleDataStream(request);
Expand Down Expand Up @@ -176,6 +183,24 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
return CompletableFuture.completedFuture(() -> bytesWritten);
}

@Override
public CompletableFuture<Message> query(Message request) {
return CompletableFuture.completedFuture(request);
}

@Override
public CompletableFuture<Message> streamReadOnly(RaftClientRequest request, DataChannel stream) {
try {
for (int i = 0; i < READ_ONLY_STREAM_CHUNKS; i++) {
final ByteString chunk = getReadOnlyStreamChunk(request.getMessage().getContent(), i);
stream.write(chunk.asReadOnlyByteBuffer());
}
return CompletableFuture.completedFuture(Message.valueOf(getId().toByteString()));
} catch (IOException e) {
return JavaUtils.completeExceptionally(e);
}
}

SingleDataStream getSingleDataStream(RaftClientRequest request) {
return getSingleDataStream(ClientInvocationId.valueOf(request));
}
Expand Down
Loading