Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1092,12 +1092,11 @@ private CompletableFuture<RaftClientReply> staleReadAsync(RaftClientRequest requ
}
return processQueryFuture(stateMachine.queryStale(request.getMessage(), minIndex), request);
}

ReadRequests getReadRequests() {
return getState().getReadRequests();
}

private CompletableFuture<ReadIndexReplyProto> sendReadIndexAsync(RaftClientRequest clientRequest) {
final Throwable snapshotInstallation = snapshotInstallationHandler.getInProgressInstallSnapshotReadException();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's create the ReadException with a different message. It will be easier to debug later.

  ReadException newReadException(String op, long installSnapshot, boolean started) {
    return new ReadException(getMemberId() + ": Failed to " + op + " readIndex as snapshot (" + installSnapshot
        + ") installation is " + (started ? "started" : "in progress"));
  }

  private CompletableFuture<ReadIndexReplyProto> sendReadIndexAsync(RaftClientRequest clientRequest) {
    final long installSnapshot = snapshotInstallationHandler.getInProgressInstallSnapshotIndex();
    if (installSnapshot != RaftLog.INVALID_LOG_INDEX) {
      return JavaUtils.completeExceptionally(newReadException("get", installSnapshot, false));
    }

if (snapshotInstallation != null) {
return JavaUtils.completeExceptionally(snapshotInstallation);
}
final RaftPeerId leaderId = getInfo().getLeaderId();
if (leaderId == null) {
return JavaUtils.completeExceptionally(new ReadIndexException(getMemberId() + ": Leader is unknown."));
Expand Down Expand Up @@ -1146,7 +1145,8 @@ private CompletableFuture<RaftClientReply> readAsync(RaftClientRequest request)
}

return replyFuture
.thenCompose(readIndex -> getReadRequests().waitToAdvance(readIndex))
.thenCompose(readIndex -> getState().getReadRequests().waitToAdvance(readIndex,
snapshotInstallationHandler::getInProgressInstallSnapshotReadException))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a waitReadIndex method and check there.

          .thenCompose(this::waitReadIndex)
  private CompletableFuture<Long> waitReadIndex(long readIndex) {
    final long installSnapshot = snapshotInstallationHandler.getInProgressInstallSnapshotIndex();
    if (installSnapshot != RaftLog.INVALID_LOG_INDEX) {
      return JavaUtils.completeExceptionally(newReadException("start waiting for", installSnapshot, false));
    }
    return getState().getReadRequests().waitToAdvance(readIndex);
  }

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. Updated, but there might be a small chance of interleaving here since the snapshot checking is not protected by ReadRequests object lock.

  1. waitReadIndex() checks snapshot state and sees no snapshot.
  2. Snapshot notification sets inProgressInstallSnapshotIndex.
  3. Snapshot notification calls ReadRequests.fail() and clears the current queue.
  4. Read then enqueues itself in ReadRequests.waitToAdvance() after the failure sweep.

We can probably synchronized on RaftServerImpl lock, but I'm afraid this might increase follower read contention.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ivandika3 , That is a good point! Then, let's use your previous approach then; see https://issues.apache.org/jira/secure/attachment/13082234/1444_review2.patch

.thenCompose(readIndex -> queryStateMachine(request))
.exceptionally(e -> readException2Reply(request, e));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,30 @@
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.exceptions.ReadException;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.TimeoutExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.function.LongConsumer;
import java.util.function.Supplier;

/** For supporting linearizable read. */
class ReadRequests {
private static final Logger LOG = LoggerFactory.getLogger(ReadRequests.class);

static ReadException newException(Object server, long installSnapshot) {
return new ReadException(server + ": Failed read as snapshot (" + installSnapshot
+ ") installation is in progress");
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move this method to RaftServerImpl.newReadException as mentioned earlier.


static class ReadIndexQueue {
private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
/** The log index known to be applied. */
Expand All @@ -52,10 +61,14 @@ static class ReadIndexQueue {
this.readTimeout = readTimeout;
}

CompletableFuture<Long> add(long readIndex) {
CompletableFuture<Long> add(long readIndex, Supplier<Throwable> failureSupplier) {
final CompletableFuture<Long> returned;
final boolean create;
synchronized (this) {
final Throwable failure = failureSupplier.get();
if (failure != null) {
return JavaUtils.completeExceptionally(failure);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be checked in RaftServerImpl.

if (readIndex <= lastAppliedIndex) {
return CompletableFuture.completedFuture(lastAppliedIndex);
}
Expand Down Expand Up @@ -88,6 +101,14 @@ private void handleTimeout(long readIndex) {
removed.completeExceptionally(new ReadException("Read timeout " + readTimeout + " for index " + readIndex));
}

void fail(Throwable cause) {
final Collection<CompletableFuture<Long>> futures;
synchronized (this) {
futures = new ArrayList<>(sorted.values());
sorted.clear();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Let's make sorted non-final in order to avoid the copying.
  • We may return the futures here and fail them in ReadRequests.fail(..)..
//ReadIndexQueue
    synchronized Collection<CompletableFuture<Long>> clear(Throwable cause) {
      final Collection<CompletableFuture<Long>> futures = sorted.values();
      sorted = new TreeMap<>();
      return futures;
    }
//ReadRequests
  void fail(Throwable cause) {
    for(CompletableFuture<Long> f : readIndexQueue.clear(cause)) {
      f.completeExceptionally(cause);
    }
  }

}
futures.forEach(f -> f.completeExceptionally(cause));
}

/** Complete all the entries less than or equal to the given applied index. */
synchronized void complete(long appliedIndex) {
Expand Down Expand Up @@ -119,7 +140,11 @@ LongConsumer getAppliedIndexConsumer() {
return readIndexQueue::complete;
}

CompletableFuture<Long> waitToAdvance(long readIndex) {
return readIndexQueue.add(readIndex);
CompletableFuture<Long> waitToAdvance(long readIndex, Supplier<Throwable> failureSupplier) {
return readIndexQueue.add(readIndex, failureSupplier);
}

void fail(Throwable cause) {
readIndexQueue.fail(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ long getInProgressInstallSnapshotIndex() {
return inProgressInstallSnapshotIndex.get();
}

Throwable getInProgressInstallSnapshotReadException() {
final long installSnapshot = getInProgressInstallSnapshotIndex();
return installSnapshot != INVALID_LOG_INDEX ? ReadRequests.newException(getMemberId(), installSnapshot) : null;
}

InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto request) throws IOException {
BatchLogger.print(BatchLogKey.INSTALL_SNAPSHOT_REQUEST, getMemberId(),
suffix -> LOG.info("{}: receive installSnapshot: {} {}",
Expand Down Expand Up @@ -276,6 +281,7 @@ private CompletableFuture<InstallSnapshotReplyProto> notifyStateMachineToInstall
InstallSnapshotResult.ALREADY_INSTALLED, snapshotIndex);
return future.thenApply(dummy -> reply);
}
server.getState().getReadRequests().fail(ReadRequests.newException(getMemberId(), firstAvailableLogIndex));

final RaftPeerProto leaderProto;
if (!request.hasLastRaftConfigurationLogEntryProto()) {
Expand Down Expand Up @@ -401,4 +407,4 @@ private RoleInfoProto getRoleInfoProto(RaftPeerProto leader) {
.setFollowerInfo(followerInfo)
.build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ static <C extends MiniRaftCluster> void runTestFollowerLinearizableRead(C cluste
}
}

@Test
public void testFollowerLinearizableReadFailsWhenInstallingSnapshot() throws Exception {
runWithNewCluster(ReadOnlyRequestTests::runTestFollowerLinearizableReadFailsWhenInstallingSnapshot);
}

@Test
public void testFollowerLinearizableReadParallel() throws Exception {
runWithNewCluster(LinearizableReadTests::runTestFollowerReadOnlyParallel);
Expand Down Expand Up @@ -285,4 +290,4 @@ static <C extends MiniRaftCluster> void runTestReadAfterWrite(C cluster) throws
assertReplyAtLeast(2, asyncReply.join());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,13 @@
import org.junit.jupiter.api.Test;
import org.slf4j.event.Level;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLong;

public abstract class ReadOnlyRequestTests<CLUSTER extends MiniRaftCluster>
Expand Down Expand Up @@ -139,6 +144,76 @@ static <C extends MiniRaftCluster> void runTestReadOnlyRetryWhenLeaderDown(Retry
}
}

private static void setInProgressInstallSnapshotIndex(RaftServer.Division server, long index) throws Exception {
final Field snapshotInstallationHandler = server.getClass().getDeclaredField("snapshotInstallationHandler");
snapshotInstallationHandler.setAccessible(true);
final Object handler = snapshotInstallationHandler.get(server);
final Field inProgressInstallSnapshotIndex = handler.getClass()
.getDeclaredField("inProgressInstallSnapshotIndex");
inProgressInstallSnapshotIndex.setAccessible(true);
((AtomicLong) inProgressInstallSnapshotIndex.get(handler)).set(index);
}

private static void startSnapshotInstallation(RaftServer.Division server, long index) throws Exception {
setInProgressInstallSnapshotIndex(server, index);
final Method getState = server.getClass().getDeclaredMethod("getState");
getState.setAccessible(true);
final Object state = getState.invoke(server);
final Method getReadRequests = state.getClass().getDeclaredMethod("getReadRequests");
getReadRequests.setAccessible(true);
final Object readRequests = getReadRequests.invoke(state);
final Method fail = readRequests.getClass().getDeclaredMethod("fail", Throwable.class);
fail.setAccessible(true);
fail.invoke(readRequests, new ReadException(server.getMemberId()
+ ": Failed read as snapshot (" + index + ") installation is in progress"));
}

static void assertSnapshotInstallationReadException(Throwable exception) {
final Throwable cause = exception instanceof CompletionException && exception.getCause() != null
? exception.getCause() : exception;
Assertions.assertInstanceOf(ReadException.class, cause);
Assertions.assertTrue(cause.getMessage().contains("snapshot (1) installation is in progress"),
() -> "Unexpected exception: " + exception);
}

static <C extends MiniRaftCluster> void runTestFollowerLinearizableReadFailsWhenInstallingSnapshot(C cluster)
throws Exception {
final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();

final List<RaftServer.Division> followers = cluster.getFollowers();
Assertions.assertEquals(2, followers.size());

final RaftServer.Division follower = followers.get(0);
final RaftPeerId followerId = follower.getId();

try (RaftClient leaderClient = cluster.createClient(leaderId);
RaftClient followerClient = cluster.createClient(followerId, RetryPolicies.noRetry())) {
assertReplyExact(1, leaderClient.io().send(INCREMENT));
assertReplyExact(1, followerClient.io().sendReadOnly(QUERY, followerId));

final CompletableFuture<RaftClientReply> writeReply = leaderClient.async().send(WAIT_AND_INCREMENT);
Thread.sleep(100);
final CompletableFuture<RaftClientReply> pendingRead = followerClient.async().sendReadOnly(QUERY, followerId);
Assertions.assertFalse(pendingRead.isDone(), () -> "pendingRead=" + pendingRead);

startSnapshotInstallation(follower, 1);
try {
final CompletionException pendingException = Assertions.assertThrows(CompletionException.class,
pendingRead::join);
assertSnapshotInstallationReadException(pendingException);

final ReadException readException = Assertions.assertThrows(ReadException.class,
() -> followerClient.io().sendReadOnly(QUERY, followerId));
assertSnapshotInstallationReadException(readException);
} finally {
setInProgressInstallSnapshotIndex(follower, -1);
}

assertReplyExact(2, writeReply.join());
assertReplyExact(2, followerClient.io().sendReadOnly(QUERY, followerId));
}
}

static int retrieve(RaftClientReply reply) {
Assertions.assertTrue(reply.isSuccess());
return Integer.parseInt(reply.getMessage().getContent().toString(StandardCharsets.UTF_8));
Expand Down