Skip to content
27 changes: 27 additions & 0 deletions src/main/java/org/ethereum/beacon/discovery/DiscoverySystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,31 @@ public Optional<NodeRecord> lookupNode(final Bytes nodeId) {
.findFirst()
.filter(node -> node.getNodeId().equals(nodeId));
}

/**
* Gets all the NodeRecords in the routing table, grouped by their bucket
*
* @return all the NodeRecords in the routing table, grouped by their bucket
*/
public List<List<NodeRecord>> getNodeRecordBuckets() {
return buckets.getNodeRecordBuckets();
}

/**
* Adds a NodeRecord to the routing table
*
* @param nodeRecord The NodeRecord to add to the routing table
*/
public void addNodeRecord(NodeRecord nodeRecord) {
buckets.offer(nodeRecord);
}

/**
* Deletes the NodeRecord identified by nodeId from the routing table
*
* @param nodeId The node ID to be deleted from the routing table
*/
public void deleteNode(Bytes nodeId) {
buckets.deleteNode(nodeId);
}
Comment thread
Matilda-Clerke marked this conversation as resolved.
Outdated
Comment thread
Matilda-Clerke marked this conversation as resolved.
Outdated
}
24 changes: 15 additions & 9 deletions src/main/java/org/ethereum/beacon/discovery/storage/KBucket.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private void offerNewNode(final NodeRecord node) {
return;
}
if (isFull()) {
getLastNode().checkLiveness(clock.millis());
nodes.getLast().checkLiveness(clock.millis());
if (pendingNode.isEmpty()) {
livenessChecker.checkLiveness(node);
}
Expand All @@ -102,7 +102,7 @@ public void onLivenessConfirmed(final NodeRecord node) {
existingEntry -> {
// Move to the start of the bucket
nodes.remove(existingEntry);
nodes.add(0, existingEntry.withLastConfirmedTime(clock.millis()));
nodes.addFirst(existingEntry.withLastConfirmedTime(clock.millis()));
performMaintenance();
},
() -> {
Expand All @@ -117,7 +117,7 @@ public void onLivenessConfirmed(final NodeRecord node) {
pendingNode = Optional.of(new BucketEntry(livenessChecker, node, clock.millis()));
}
} else {
nodes.add(0, new BucketEntry(livenessChecker, node, clock.millis()));
nodes.addFirst(new BucketEntry(livenessChecker, node, clock.millis()));
}
});
}
Expand Down Expand Up @@ -147,12 +147,12 @@ public void performMaintenance() {
if (nodes.isEmpty()) {
return;
}
final BucketEntry lastNode = getLastNode();
final BucketEntry lastNode = nodes.getLast();
if (lastNode.hasFailedLivenessCheck(currentTime)) {
nodes.remove(lastNode);
pendingNode.ifPresent(
pendingEntry -> {
nodes.add(0, pendingEntry);
nodes.addFirst(pendingEntry);
pendingNode = Optional.empty();
});
} else {
Expand All @@ -176,10 +176,6 @@ private void performPendingNodeMaintenance() {
});
}

private BucketEntry getLastNode() {
return nodes.get(nodes.size() - 1);
}

private boolean isFull() {
return nodes.size() >= K;
}
Expand All @@ -199,4 +195,14 @@ public Optional<NodeRecord> getNode(final Bytes targetNodeId) {
public boolean isEmpty() {
return nodes.isEmpty();
}

public void deleteNode(Bytes nodeId) {
nodes.removeIf((bucketEntry) -> bucketEntry.getNodeId().equals(nodeId));
performPendingNodeMaintenance();
pendingNode.ifPresent(
pendingEntry -> {
nodes.addFirst(pendingEntry);
pendingNode = Optional.empty();
});
}
}
12 changes: 12 additions & 0 deletions src/main/java/org/ethereum/beacon/discovery/storage/KBuckets.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.time.Clock;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Spliterator;
Expand Down Expand Up @@ -143,4 +144,15 @@ public synchronized Optional<NodeRecord> getNode(final Bytes nodeId) {
public synchronized boolean containsNode(final Bytes nodeId) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why these methods are synchronized. E.g. containsNode is always just for a certain point in time. The node record could be deleted just after leaving the synchronized method. Or the node could be added just after we returned false.
Same for the other methods.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume it's from an abundance of caution in a system intended to be used in a multi-threaded environment. Perhaps it would be good to overhaul it with ReentractReadWriteLock instead. It could increase performance if the discovery system is used frequently in multiple threads.

return getNode(nodeId).isPresent();
}

public synchronized List<List<NodeRecord>> getNodeRecordBuckets() {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I like this name. We are not returning the buckets, we are returning node records from all buckets. Maybe a simple name like getNodeRecords() is better?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I included the words buckets because each sub list is a list of node records from a bucket. I'm not sure getNodeRecords conveys quite the same meaning.

Personally, I don't understand why node records are organised like that, but it seems important.

return buckets.values().stream().map(KBucket::getAllNodes).toList();
}

public synchronized void deleteNode(final Bytes nodeId) {
final int distance = Functions.logDistance(homeNodeId, nodeId);
if (distance <= MAXIMUM_BUCKET) {
getBucket(distance).ifPresent((bucket) -> bucket.deleteNode(nodeId));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ public NodeRecord fromRlp(RLPReader reader) {
final DiscoverySystem otherClient = createDiscoveryClient(client.getLocalNodeRecord());

final CompletableFuture<Void> pingResult = client.ping(bootnode.getLocalNodeRecord());
waitFor(pingResult);
waitFor(pingResult, 60);
assertTrue(pingResult.isDone());
assertFalse(pingResult.isCompletedExceptionally());

Expand All @@ -392,24 +392,24 @@ public NodeRecord fromRlp(RLPReader reader) {

final CompletableFuture<Void> otherClientPingResult =
otherClient.ping(client.getLocalNodeRecord());
waitFor(otherClientPingResult);
waitFor(otherClientPingResult, 60);
assertTrue(otherClientPingResult.isDone());
assertFalse(otherClientPingResult.isCompletedExceptionally());

final CompletableFuture<Collection<NodeRecord>> findNodesResult1 =
otherClient.findNodes(client.getLocalNodeRecord(), singletonList(0));
waitFor(findNodesResult1);
waitFor(findNodesResult1, 60);
assertTrue(findNodesResult1.isDone());
assertFalse(findNodesResult1.isCompletedExceptionally());

final CompletableFuture<Collection<NodeRecord>> findNodesResult2 =
client.findNodes(bootnode.getLocalNodeRecord(), singletonList(0));
waitFor(findNodesResult2);
waitFor(findNodesResult2, 60);
assertTrue(findNodesResult2.isDone());
assertFalse(findNodesResult2.isCompletedExceptionally());

final CompletableFuture<Void> bootnodePingResult = bootnode.ping(client.getLocalNodeRecord());
waitFor(bootnodePingResult);
waitFor(bootnodePingResult, 60);
assertTrue(bootnodePingResult.isDone());
assertFalse(bootnodePingResult.isCompletedExceptionally());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,19 @@ void getLiveNodes_shouldOnlyReturnedConfirmedLiveNodes() {
assertThat(bucket.getLiveNodes()).containsExactly(node3, node4, node2);
}

@Test
void testDeleteNode() {
final NodeRecord nodeToBeDeleted = fillBucket();
final NodeRecord pendingNode = createNewNodeRecord();

bucket.offer(pendingNode);
bucket.onLivenessConfirmed(pendingNode);
bucket.deleteNode(nodeToBeDeleted.getNodeId());

assertThat(bucket.getAllNodes()).doesNotContain(nodeToBeDeleted);
assertThat(bucket.getAllNodes()).contains(pendingNode);
}

private void confirmNodesInBucketAsLive() {
bucket.getAllNodes().forEach(bucket::onLivenessConfirmed);
}
Expand All @@ -453,7 +466,7 @@ private NodeRecord createNewNodeRecord() {
}

private NodeRecord getLastNodeInBucket() {
return bucket.getAllNodes().get(bucket.getAllNodes().size() - 1);
return bucket.getAllNodes().getLast();
}

private NodeRecord fillBucketWithLiveNodes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,27 @@ void streamClosestNodes_shouldNotIncludeLocalNode() {
assertThat(buckets.streamClosestNodes(localNode.getNodeId())).isEmpty();
}

@Test
void testGetNodeRecordBuckets() {
final NodeRecord node = createNodeAtDistance(1);
buckets.offer(node);

List<List<NodeRecord>> internalBuckets = buckets.getNodeRecordBuckets();
assertThat(internalBuckets.size()).isEqualTo(1);
assertThat(internalBuckets.getFirst().getFirst()).isEqualTo(node);
}

@Test
void testDeleteNode() {
Comment thread
Matilda-Clerke marked this conversation as resolved.
final NodeRecord node = createNodeAtDistance(1);
buckets.offer(node);
buckets.deleteNode(node.getNodeId());

List<List<NodeRecord>> internalBuckets = buckets.getNodeRecordBuckets();
assertThat(internalBuckets.size()).isEqualTo(1);
assertThat(internalBuckets.getFirst().size()).isEqualTo(0);
}

private NodeRecord createNodeAtDistance(final int distance) {
return TestUtil.createNodeAtDistance(localNode.getNodeId(), distance);
}
Expand Down