Skip to content

Commit b259700

Browse files
Add getNodeRecordBuckets, addNodeRecord, and deleteNode to DiscoverySystem (#188)
* Add getNodeRecordBuckets, addNodeRecord, and deleteNode to DiscoverySystem * Update javadoc * Increase timeout on waits to fix flaky unit test * Add DiscoverySystem and MutableDiscoverySystem interfaces * Spotless * Add license comments * Adjust build method * Add some additional deleteNode tests * spotless * change deleteNode to deleteNodeRecord * change deleteNode to deleteNodeRecord
1 parent 272a7ab commit b259700

9 files changed

Lines changed: 285 additions & 98 deletions

File tree

src/main/java/org/ethereum/beacon/discovery/DiscoverySystem.java

Lines changed: 13 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -8,70 +8,20 @@
88
import java.util.Optional;
99
import java.util.concurrent.CompletableFuture;
1010
import java.util.stream.Stream;
11-
import org.apache.logging.log4j.LogManager;
12-
import org.apache.logging.log4j.Logger;
1311
import org.apache.tuweni.bytes.Bytes;
14-
import org.apache.tuweni.bytes.Bytes32;
15-
import org.ethereum.beacon.discovery.scheduler.ExpirationSchedulerFactory;
1612
import org.ethereum.beacon.discovery.schema.NodeRecord;
1713
import org.ethereum.beacon.discovery.storage.BucketStats;
18-
import org.ethereum.beacon.discovery.storage.KBuckets;
19-
import org.ethereum.beacon.discovery.task.DiscoveryTaskManager;
2014

21-
public class DiscoverySystem {
22-
private static final Logger LOG = LogManager.getLogger();
23-
private final DiscoveryManager discoveryManager;
24-
private final DiscoveryTaskManager taskManager;
25-
private final ExpirationSchedulerFactory expirationSchedulerFactory;
26-
private final KBuckets buckets;
27-
private final List<NodeRecord> bootnodes;
15+
public interface DiscoverySystem {
16+
CompletableFuture<Void> start();
2817

29-
DiscoverySystem(
30-
final DiscoveryManager discoveryManager,
31-
final DiscoveryTaskManager taskManager,
32-
final ExpirationSchedulerFactory expirationSchedulerFactory,
33-
final KBuckets buckets,
34-
final List<NodeRecord> bootnodes) {
35-
this.discoveryManager = discoveryManager;
36-
this.taskManager = taskManager;
37-
this.expirationSchedulerFactory = expirationSchedulerFactory;
38-
this.buckets = buckets;
39-
this.bootnodes = bootnodes;
40-
}
18+
void stop();
4119

42-
public CompletableFuture<Void> start() {
43-
return discoveryManager.start().thenRun(taskManager::start).thenRun(this::pingBootnodes);
44-
}
20+
NodeRecord getLocalNodeRecord();
4521

46-
private void pingBootnodes() {
47-
bootnodes.forEach(
48-
bootnode ->
49-
discoveryManager
50-
.ping(bootnode)
51-
.exceptionally(
52-
e -> {
53-
LOG.debug("Failed to ping bootnode: " + bootnode);
54-
return null;
55-
}));
56-
}
22+
BucketStats getBucketStats();
5723

58-
public void stop() {
59-
taskManager.stop();
60-
discoveryManager.stop();
61-
expirationSchedulerFactory.stop();
62-
}
63-
64-
public NodeRecord getLocalNodeRecord() {
65-
return discoveryManager.getLocalNodeRecord();
66-
}
67-
68-
public BucketStats getBucketStats() {
69-
return buckets.getStats();
70-
}
71-
72-
public void updateCustomFieldValue(final String fieldName, final Bytes value) {
73-
discoveryManager.updateCustomFieldValue(fieldName, value);
74-
}
24+
void updateCustomFieldValue(final String fieldName, final Bytes value);
7525

7626
/**
7727
* Initiates FINDNODE with node `nodeRecord`
@@ -81,10 +31,8 @@ public void updateCustomFieldValue(final String fieldName, final Bytes value) {
8131
* @return Future which is fired when reply is received or fails in timeout/not successful
8232
* handshake/bad message exchange. Contains the collection of nodes returned by the peer.
8333
*/
84-
public CompletableFuture<Collection<NodeRecord>> findNodes(
85-
NodeRecord nodeRecord, List<Integer> distances) {
86-
return discoveryManager.findNodes(nodeRecord, distances);
87-
}
34+
CompletableFuture<Collection<NodeRecord>> findNodes(
35+
NodeRecord nodeRecord, List<Integer> distances);
8836

8937
/**
9038
* Initiates PING with node `nodeRecord`
@@ -93,42 +41,25 @@ public CompletableFuture<Collection<NodeRecord>> findNodes(
9341
* @return Future which is fired when reply is received or fails in timeout/not successful
9442
* handshake/bad message exchange.
9543
*/
96-
public CompletableFuture<Void> ping(NodeRecord nodeRecord) {
97-
return discoveryManager.ping(nodeRecord);
98-
}
44+
CompletableFuture<Void> ping(NodeRecord nodeRecord);
9945

10046
/**
10147
* Initiates TALK with node `nodeRecord`
10248
*
10349
* @param nodeRecord Ethereum Node record
10450
* @return Promise of the node TALK response.
10551
*/
106-
public CompletableFuture<Bytes> talk(NodeRecord nodeRecord, Bytes protocol, Bytes request) {
107-
return discoveryManager.talk(nodeRecord, protocol, request);
108-
}
52+
CompletableFuture<Bytes> talk(NodeRecord nodeRecord, Bytes protocol, Bytes request);
10953

110-
public Stream<NodeRecord> streamLiveNodes() {
111-
return Stream.concat(
112-
buckets.streamClosestNodes(Bytes32.ZERO), discoveryManager.streamActiveSessions());
113-
}
54+
Stream<NodeRecord> streamLiveNodes();
11455

115-
public CompletableFuture<Collection<NodeRecord>> searchForNewPeers() {
116-
return taskManager.searchForNewPeers();
117-
}
56+
CompletableFuture<Collection<NodeRecord>> searchForNewPeers();
11857

11958
/**
12059
* Lookup node in locally stored KBuckets by its nodeId. Allows lookup of local node record.
12160
*
12261
* @param nodeId NodeId, big endian UInt256 Node ID in bytes
12362
* @return NodeRecord if any found
12463
*/
125-
public Optional<NodeRecord> lookupNode(final Bytes nodeId) {
126-
if (nodeId.equals(getLocalNodeRecord().getNodeId())) {
127-
return Optional.of(getLocalNodeRecord());
128-
}
129-
return buckets
130-
.streamClosestNodes(nodeId)
131-
.findFirst()
132-
.filter(node -> node.getNodeId().equals(nodeId));
133-
}
64+
Optional<NodeRecord> lookupNode(final Bytes nodeId);
13465
}

src/main/java/org/ethereum/beacon/discovery/DiscoverySystemBuilder.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,14 @@ private void createDefaults() {
264264
ExpirationSchedulerFactory expirationSchedulerFactory;
265265

266266
public DiscoverySystem build() {
267+
return buildImpl();
268+
}
269+
270+
public MutableDiscoverySystem buildMutable() {
271+
return buildImpl();
272+
}
273+
274+
private DiscoverySystemImpl buildImpl() {
267275
checkNotNull(localNodeRecord, "Missing local node record");
268276
checkNotNull(secretKey, "Missing secret key");
269277
createDefaults();
@@ -285,7 +293,7 @@ public DiscoverySystem build() {
285293
recursiveLookupInterval,
286294
retryTimeout,
287295
lifeCheckInterval);
288-
return new DiscoverySystem(
296+
return new DiscoverySystemImpl(
289297
discoveryManager,
290298
discoveryTaskManager,
291299
expirationSchedulerFactory,
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*/
4+
package org.ethereum.beacon.discovery;
5+
6+
import java.util.Collection;
7+
import java.util.List;
8+
import java.util.Optional;
9+
import java.util.concurrent.CompletableFuture;
10+
import java.util.stream.Stream;
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.apache.tuweni.bytes.Bytes;
14+
import org.apache.tuweni.bytes.Bytes32;
15+
import org.ethereum.beacon.discovery.scheduler.ExpirationSchedulerFactory;
16+
import org.ethereum.beacon.discovery.schema.NodeRecord;
17+
import org.ethereum.beacon.discovery.storage.BucketStats;
18+
import org.ethereum.beacon.discovery.storage.KBuckets;
19+
import org.ethereum.beacon.discovery.task.DiscoveryTaskManager;
20+
21+
public class DiscoverySystemImpl implements DiscoverySystem, MutableDiscoverySystem {
22+
private static final Logger LOG = LogManager.getLogger();
23+
private final DiscoveryManager discoveryManager;
24+
private final DiscoveryTaskManager taskManager;
25+
private final ExpirationSchedulerFactory expirationSchedulerFactory;
26+
private final KBuckets buckets;
27+
private final List<NodeRecord> bootnodes;
28+
29+
DiscoverySystemImpl(
30+
final DiscoveryManager discoveryManager,
31+
final DiscoveryTaskManager taskManager,
32+
final ExpirationSchedulerFactory expirationSchedulerFactory,
33+
final KBuckets buckets,
34+
final List<NodeRecord> bootnodes) {
35+
this.discoveryManager = discoveryManager;
36+
this.taskManager = taskManager;
37+
this.expirationSchedulerFactory = expirationSchedulerFactory;
38+
this.buckets = buckets;
39+
this.bootnodes = bootnodes;
40+
}
41+
42+
@Override
43+
public CompletableFuture<Void> start() {
44+
return discoveryManager.start().thenRun(taskManager::start).thenRun(this::pingBootnodes);
45+
}
46+
47+
private void pingBootnodes() {
48+
bootnodes.forEach(
49+
bootnode ->
50+
discoveryManager
51+
.ping(bootnode)
52+
.exceptionally(
53+
e -> {
54+
LOG.debug("Failed to ping bootnode: " + bootnode);
55+
return null;
56+
}));
57+
}
58+
59+
@Override
60+
public void stop() {
61+
taskManager.stop();
62+
discoveryManager.stop();
63+
expirationSchedulerFactory.stop();
64+
}
65+
66+
@Override
67+
public NodeRecord getLocalNodeRecord() {
68+
return discoveryManager.getLocalNodeRecord();
69+
}
70+
71+
@Override
72+
public BucketStats getBucketStats() {
73+
return buckets.getStats();
74+
}
75+
76+
@Override
77+
public void updateCustomFieldValue(final String fieldName, final Bytes value) {
78+
discoveryManager.updateCustomFieldValue(fieldName, value);
79+
}
80+
81+
@Override
82+
public CompletableFuture<Collection<NodeRecord>> findNodes(
83+
NodeRecord nodeRecord, List<Integer> distances) {
84+
return discoveryManager.findNodes(nodeRecord, distances);
85+
}
86+
87+
@Override
88+
public CompletableFuture<Void> ping(NodeRecord nodeRecord) {
89+
return discoveryManager.ping(nodeRecord);
90+
}
91+
92+
@Override
93+
public CompletableFuture<Bytes> talk(NodeRecord nodeRecord, Bytes protocol, Bytes request) {
94+
return discoveryManager.talk(nodeRecord, protocol, request);
95+
}
96+
97+
@Override
98+
public Stream<NodeRecord> streamLiveNodes() {
99+
return Stream.concat(
100+
buckets.streamClosestNodes(Bytes32.ZERO), discoveryManager.streamActiveSessions());
101+
}
102+
103+
@Override
104+
public CompletableFuture<Collection<NodeRecord>> searchForNewPeers() {
105+
return taskManager.searchForNewPeers();
106+
}
107+
108+
/**
109+
* Lookup node in locally stored KBuckets by its nodeId. Allows lookup of local node record.
110+
*
111+
* @param nodeId NodeId, big endian UInt256 Node ID in bytes
112+
* @return NodeRecord if any found
113+
*/
114+
@Override
115+
public Optional<NodeRecord> lookupNode(final Bytes nodeId) {
116+
if (nodeId.equals(getLocalNodeRecord().getNodeId())) {
117+
return Optional.of(getLocalNodeRecord());
118+
}
119+
return buckets
120+
.streamClosestNodes(nodeId)
121+
.findFirst()
122+
.filter(node -> node.getNodeId().equals(nodeId));
123+
}
124+
125+
@Override
126+
public List<List<NodeRecord>> getNodeRecordBuckets() {
127+
return buckets.getNodeRecordBuckets();
128+
}
129+
130+
@Override
131+
public void addNodeRecord(NodeRecord nodeRecord) {
132+
buckets.offer(nodeRecord);
133+
}
134+
135+
@Override
136+
public void deleteNodeRecord(Bytes nodeId) {
137+
buckets.deleteNode(nodeId);
138+
}
139+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*/
4+
package org.ethereum.beacon.discovery;
5+
6+
import java.util.List;
7+
import org.apache.tuweni.bytes.Bytes;
8+
import org.ethereum.beacon.discovery.schema.NodeRecord;
9+
10+
public interface MutableDiscoverySystem extends DiscoverySystem {
11+
/**
12+
* Adds a NodeRecord to the routing table
13+
*
14+
* @param nodeRecord The NodeRecord to add to the routing table
15+
*/
16+
void addNodeRecord(NodeRecord nodeRecord);
17+
18+
/**
19+
* Deletes the NodeRecord identified by nodeId from the routing table
20+
*
21+
* @param nodeId The node ID to be deleted from the routing table
22+
*/
23+
void deleteNodeRecord(Bytes nodeId);
24+
25+
/**
26+
* Gets all the NodeRecords in the routing table, grouped by their bucket
27+
*
28+
* @return all the NodeRecords in the routing table, grouped by their bucket
29+
*/
30+
List<List<NodeRecord>> getNodeRecordBuckets();
31+
}

0 commit comments

Comments
 (0)