Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1154,6 +1154,10 @@ public boolean checkLeadership() {

final RaftConfigurationImpl conf = server.getRaftConf();

if (conf.isSingleMode(server.getId())) {
return true;
}
Copy link
Copy Markdown
Contributor

@szetszwo szetszwo May 29, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do this change separately. Then, this PR changes only the test code.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Removed the LeaderStateImpl change from this PR, so the current PR diff is test-only now. I will handle that leadership check separately.


if (conf.hasMajority(activePeers, server.getId())) {
// leadership check passed
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ private void testRestartFollower(CLUSTER cluster) throws Exception {
LOG.info("{}: newLeaderNextIndex = {}", leaderId, newLeaderNextIndex);
Assertions.assertTrue(newLeaderNextIndex > oldLeaderNextIndex);
Assertions.assertEquals(newLeaderNextIndex, follower.getRaftLog().getNextIndex());
}, 10, ONE_SECOND, "followerNextIndex", LOG);
}, 60, ONE_SECOND, "followerNextIndex", LOG);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.ReadIndexException;
Expand All @@ -30,6 +32,7 @@
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerConfigKeys.Read.ReadIndex.Type;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Slf4jUtils;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.function.CheckedConsumer;
Expand Down Expand Up @@ -179,8 +182,28 @@ public void testFollowerLinearizableReadParallel() throws Exception {
runWithNewCluster(LinearizableReadTests::runTestFollowerReadOnlyParallel);
}

private static long getLogEntryIndex(RaftServer.Division leader, Message message, long startIndex) throws Exception {
final long nextIndex = leader.getRaftLog().getNextIndex();
for (long index = startIndex; index < nextIndex; index++) {
final LogEntryProto entry = leader.getRaftLog().get(index);
if (entry != null && entry.hasStateMachineLogEntry()
&& message.getContent().equals(entry.getStateMachineLogEntry().getLogData())) {
return index;
}
}
throw new AssertionError("Failed to find " + message + " from index " + startIndex + " to " + nextIndex);
}

private static void waitForCommitIndex(RaftServer.Division leader, long index) throws Exception {
JavaUtils.attempt(() -> {
final long commitIndex = leader.getRaftLog().getLastCommittedIndex();
Assertions.assertTrue(commitIndex >= index, () -> "commitIndex=" + commitIndex + " < index=" + index);
}, 10, HUNDRED_MILLIS, "commitIndex >= " + index, null);
}

static <C extends MiniRaftCluster> void runTestFollowerReadOnlyParallel(C cluster) throws Exception {
final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
final RaftPeerId leaderId = leader.getId();

final List<RaftServer.Division> followers = cluster.getFollowers();
Assertions.assertEquals(2, followers.size());
Expand All @@ -199,9 +222,12 @@ static <C extends MiniRaftCluster> void runTestFollowerReadOnlyParallel(C cluste
assertReplyExact(count, leaderClient.io().send(INCREMENT));

count++;
final long nextIndex = leader.getRaftLog().getNextIndex();
writeReplies.add(new Reply(count, leaderClient.async().send(WAIT_AND_INCREMENT)));
// sleep to let the commitIndex/appliedIndex get updated.
Thread.sleep(100);
final long waitAndIncrementIndex = JavaUtils.attemptRepeatedly(
() -> getLogEntryIndex(leader, WAIT_AND_INCREMENT, nextIndex),
10, HUNDRED_MILLIS, "WAIT_AND_INCREMENT entry", null);
waitForCommitIndex(leader, waitAndIncrementIndex);
// WAIT_AND_INCREMENT will delay 500ms to update the count, the read must wait for it.
assertReplyExact(count, f0Client.io().sendReadOnly(QUERY, f0));
f1Replies.add(new Reply(count, f1Client.async().sendReadOnly(QUERY, f1)));
Expand Down
54 changes: 25 additions & 29 deletions ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.slf4j.event.Level;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand Down Expand Up @@ -127,46 +128,41 @@ static void runTestBasicAppendEntries(

final CompletableFuture<Void> killAndRestartFollower = killAndRestartServer(
cluster.getFollowers().get(0).getId(), 0, 1000, cluster, log);
final CompletableFuture<Void> killAndRestartLeader;
if (killLeader) {
log.info("killAndRestart leader " + leader.getId());
killAndRestartLeader = killAndRestartServer(leader.getId(), 2000, 4000, cluster, log);
} else {
killAndRestartLeader = CompletableFuture.completedFuture(null);
}

log.info(cluster.printServers());
CompletableFuture<Void> killAndRestartLeader = CompletableFuture.completedFuture(null);

final SimpleMessage[] messages = SimpleMessage.create(numMessages);

try (final RaftClient client = cluster.createClient()) {
final AtomicInteger asyncReplyCount = new AtomicInteger();
final CompletableFuture<Void> f = new CompletableFuture<>();
try {
log.info(cluster.printServers());

try (final RaftClient client = cluster.createClient()) {
final List<CompletableFuture<RaftClientReply>> asyncReplies = new ArrayList<>();

for (SimpleMessage message : messages) {
for (SimpleMessage message : messages) {
if (async) {
asyncReplies.add(client.async().send(message));
} else {
final RaftClientReply reply = client.io().send(message);
Assertions.assertTrue(reply.isSuccess());
}
}
if (async) {
client.async().send(message).thenAcceptAsync(reply -> {
if (!reply.isSuccess()) {
f.completeExceptionally(
new AssertionError("Failed with reply " + reply));
} else if (asyncReplyCount.incrementAndGet() == messages.length) {
f.complete(null);
}
CompletableFuture.allOf(asyncReplies.toArray(new CompletableFuture<?>[0])).join();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since join() is called below. This allOf is not needed. Let's remove it.

BTW, changing

      final AtomicInteger asyncReplyCount = new AtomicInteger();
      final CompletableFuture<Void> f = new CompletableFuture<>();

to

      final List<CompletableFuture<RaftClientReply>> asyncReplies = new ArrayList<>();

does make the code easier to understand (although the original code is also correct.)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Removed the redundant CompletableFuture.allOf(...).join() and kept the reply list cleanup.

asyncReplies.forEach(f -> {
final RaftClientReply reply = f.join();
Assertions.assertTrue(reply.isSuccess(), () -> "Failed with reply " + reply);
});
} else {
final RaftClientReply reply = client.io().send(message);
Assertions.assertTrue(reply.isSuccess());
}
}
if (async) {
f.join();
Assertions.assertEquals(messages.length, asyncReplyCount.get());
if (killLeader) {
log.info("killAndRestart leader " + leader.getId());
killAndRestartLeader = killAndRestartServer(leader.getId(), 0, 4000, cluster, log);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait for async append replies before injecting the kill-leader restart in RaftBasicTests.

Before this change, killLeader is in the beginning. This change moves it to the end. It makes the test easier to pass but not fixing a bug.

It is good to test killLeader before client sending messages. So, let's don't make this change?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Restored the kill-leader timing so the leader restart is scheduled before client messages are sent. The final kill-leader assertion now checks that the expected messages appear in order, while allowing an extra state-machine entry produced by retry/failover.

Thread.sleep(cluster.getTimeoutMax().toIntExact(TimeUnit.MILLISECONDS) + 100);
} finally {
CompletableFuture.allOf(killAndRestartFollower, killAndRestartLeader).join();
}
Thread.sleep(cluster.getTimeoutMax().toIntExact(TimeUnit.MILLISECONDS) + 100);
log.info(cluster.printAllLogs());
killAndRestartFollower.join();
killAndRestartLeader.join();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait for restart futures before continuing to log assertions in RaftBasicTests.

You are right that we should join before printing the log.

How about we simply move cluster.printAllLogs() up? The try-finally make the code harder to read.

     Thread.sleep(cluster.getTimeoutMax().toIntExact(TimeUnit.MILLISECONDS) + 100);
-    log.info(cluster.printAllLogs());
     killAndRestartFollower.join();
     killAndRestartLeader.join();
+    log.info(cluster.printAllLogs());

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Removed the try/finally restructuring and now join the restart futures before cluster.printAllLogs().



final List<RaftServer.Division> divisions = cluster.getServerAliveStream().collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ void runTestLogTruncate(MiniRaftCluster cluster, RaftServer.Division oldLeader,
for (RaftPeer peer : cluster.getGroup().getPeers()) {
final RaftServer.Division division = cluster.getDivision(peer.getId());
assertLogEntries(division, oldLeaderTerm, firstBatch);
assertEmptyTransactionContextMap(division);
waitForEmptyTransactionContextMap(division);
}

// kill a majority of followers
Expand Down Expand Up @@ -221,15 +221,12 @@ void runTestLogTruncate(MiniRaftCluster cluster, RaftServer.Division oldLeader,
for (RaftPeer peer : cluster.getGroup().getPeers()) {
final RaftServer.Division division = cluster.getDivision(peer.getId());
assertLogEntries(division, oldLeaderTerm, expectedMessages);
final String name = "assertEmptyTransactionContextMap:" + division.getId();
JavaUtils.attempt(() -> assertEmptyTransactionContextMap(division),
10, HUNDRED_MILLIS, name, LOG);

}
waitForEmptyTransactionContextMap(division);
}

if (!exceptions.isEmpty()) {
LOG.info("{} exceptions", exceptions.size());
for(int i = 0 ; i < exceptions.size(); i++) {
for (int i = 0; i < exceptions.size(); i++) {
LOG.info("exception {})", i, exceptions.get(i));
}
Assertions.fail();
Expand All @@ -241,6 +238,11 @@ static void assertEmptyTransactionContextMap(RaftServer.Division d) {
Assertions.assertTrue(map.isEmpty(), () -> d.getId() + " TransactionContextMap is non-empty: " + map);
}

void waitForEmptyTransactionContextMap(RaftServer.Division d) throws InterruptedException {
final String name = "assertEmptyTransactionContextMap:" + d.getId();
JavaUtils.attempt(() -> assertEmptyTransactionContextMap(d), 10, HUNDRED_MILLIS, name, LOG);
}

static void assertEntriesInTransactionContextMap(RaftServer.Division division,
SimpleMessage[] existing, SimpleMessage[] nonExisting) {
final RaftLog log = division.getRaftLog();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.server.impl.ReplyFlusher;
import org.apache.ratis.util.CodeInjectionForTesting;
import org.apache.ratis.util.JavaUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -89,11 +90,9 @@ static <C extends MiniRaftCluster> void runTestFollowerReadOnlyParallelRepliedIn
f0Replies.add(new Reply(0, f0Client.async().sendReadOnly(QUERY, f0)));
f1Replies.add(new Reply(0, f1Client.async().sendReadOnly(QUERY, f1)));

// sleep in order to make sure
// (1) the count is incremented, and
// (2) the reads will wait for the repliedIndex.
Thread.sleep(100);
assertEquals(count, leaderStateMachine.getCount());
// Wait until the leader state machine has applied the write while the ReplyFlusher remains blocked.
JavaUtils.attempt(() -> assertEquals(count, leaderStateMachine.getCount()),
10, HUNDRED_MILLIS, "leaderStateMachine count " + count, null);
}

for (int i = 0; i < n; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,16 @@ public void testElectionStepDownCommand() throws Exception {
void runTestElectionStepDownCommand(MiniRaftCluster cluster) throws Exception {
final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
String sb = getClusterAddress(cluster);
RaftServer.Division newLeader = cluster.getFollowers().get(0);
final StringPrintStream out = new StringPrintStream();
RatisShell shell = new RatisShell(out.getPrintStream());
Assertions.assertNotEquals(cluster.getLeader().getId(), newLeader.getId());
Assertions.assertEquals(2, cluster.getFollowers().size());
int ret = shell.run("election", "stepDown", "-peers", sb.toString());
int ret = shell.run("election", "pause", "-peers", sb.toString(), "-address",
leader.getPeer().getAddress());
Assertions.assertEquals(0, ret);

ret = shell.run("election", "stepDown", "-peers", sb.toString());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is good. Could you also remove the redundant toString() calls?

    int ret = shell.run("election", "pause", "-peers", sb, "-address", leader.getPeer().getAddress());
    Assertions.assertEquals(0, ret);

    ret = shell.run("election", "stepDown", "-peers", sb);
    Assertions.assertEquals(0, ret);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Removed the redundant toString() calls.

Assertions.assertEquals(0, ret);
Assertions.assertEquals(3, cluster.getFollowers().size());
JavaUtils.attempt(() -> Assertions.assertNotEquals(leader.getId(), RaftTestUtil.waitForLeader(cluster).getId()),
10, TimeDuration.valueOf(1, TimeUnit.SECONDS), "testElectionStepDownCommand", LOG);
}
}
Loading