Skip to content

Commit 367ab31

Browse files
Add DiscoverySystem and MutableDiscoverySystem interfaces
1 parent 1a6c76a commit 367ab31

6 files changed

Lines changed: 217 additions & 159 deletions

File tree

Lines changed: 50 additions & 152 deletions
Original file line numberDiff line numberDiff line change
@@ -1,161 +1,59 @@
1-
/*
2-
* SPDX-License-Identifier: Apache-2.0
3-
*/
41
package org.ethereum.beacon.discovery;
52

3+
import org.apache.tuweni.bytes.Bytes;
4+
import org.ethereum.beacon.discovery.schema.NodeRecord;
5+
import org.ethereum.beacon.discovery.storage.BucketStats;
6+
67
import java.util.Collection;
78
import java.util.List;
89
import java.util.Optional;
910
import java.util.concurrent.CompletableFuture;
1011
import java.util.stream.Stream;
11-
import org.apache.logging.log4j.LogManager;
12-
import org.apache.logging.log4j.Logger;
13-
import org.apache.tuweni.bytes.Bytes;
14-
import org.apache.tuweni.bytes.Bytes32;
15-
import org.ethereum.beacon.discovery.scheduler.ExpirationSchedulerFactory;
16-
import org.ethereum.beacon.discovery.schema.NodeRecord;
17-
import org.ethereum.beacon.discovery.storage.BucketStats;
18-
import org.ethereum.beacon.discovery.storage.KBuckets;
19-
import org.ethereum.beacon.discovery.task.DiscoveryTaskManager;
20-
21-
public class DiscoverySystem {
22-
private static final Logger LOG = LogManager.getLogger();
23-
private final DiscoveryManager discoveryManager;
24-
private final DiscoveryTaskManager taskManager;
25-
private final ExpirationSchedulerFactory expirationSchedulerFactory;
26-
private final KBuckets buckets;
27-
private final List<NodeRecord> bootnodes;
28-
29-
DiscoverySystem(
30-
final DiscoveryManager discoveryManager,
31-
final DiscoveryTaskManager taskManager,
32-
final ExpirationSchedulerFactory expirationSchedulerFactory,
33-
final KBuckets buckets,
34-
final List<NodeRecord> bootnodes) {
35-
this.discoveryManager = discoveryManager;
36-
this.taskManager = taskManager;
37-
this.expirationSchedulerFactory = expirationSchedulerFactory;
38-
this.buckets = buckets;
39-
this.bootnodes = bootnodes;
40-
}
41-
42-
public CompletableFuture<Void> start() {
43-
return discoveryManager.start().thenRun(taskManager::start).thenRun(this::pingBootnodes);
44-
}
45-
46-
private void pingBootnodes() {
47-
bootnodes.forEach(
48-
bootnode ->
49-
discoveryManager
50-
.ping(bootnode)
51-
.exceptionally(
52-
e -> {
53-
LOG.debug("Failed to ping bootnode: " + bootnode);
54-
return null;
55-
}));
56-
}
57-
58-
public void stop() {
59-
taskManager.stop();
60-
discoveryManager.stop();
61-
expirationSchedulerFactory.stop();
62-
}
63-
64-
public NodeRecord getLocalNodeRecord() {
65-
return discoveryManager.getLocalNodeRecord();
66-
}
67-
68-
public BucketStats getBucketStats() {
69-
return buckets.getStats();
70-
}
71-
72-
public void updateCustomFieldValue(final String fieldName, final Bytes value) {
73-
discoveryManager.updateCustomFieldValue(fieldName, value);
74-
}
75-
76-
/**
77-
* Initiates FINDNODE with node `nodeRecord`
78-
*
79-
* @param nodeRecord Ethereum Node record
80-
* @param distances Distances to search for
81-
* @return Future which is fired when reply is received or fails in timeout/not successful
82-
* handshake/bad message exchange. Contains the collection of nodes returned by the peer.
83-
*/
84-
public CompletableFuture<Collection<NodeRecord>> findNodes(
85-
NodeRecord nodeRecord, List<Integer> distances) {
86-
return discoveryManager.findNodes(nodeRecord, distances);
87-
}
88-
89-
/**
90-
* Initiates PING with node `nodeRecord`
91-
*
92-
* @param nodeRecord Ethereum Node record
93-
* @return Future which is fired when reply is received or fails in timeout/not successful
94-
* handshake/bad message exchange.
95-
*/
96-
public CompletableFuture<Void> ping(NodeRecord nodeRecord) {
97-
return discoveryManager.ping(nodeRecord);
98-
}
99-
100-
/**
101-
* Initiates TALK with node `nodeRecord`
102-
*
103-
* @param nodeRecord Ethereum Node record
104-
* @return Promise of the node TALK response.
105-
*/
106-
public CompletableFuture<Bytes> talk(NodeRecord nodeRecord, Bytes protocol, Bytes request) {
107-
return discoveryManager.talk(nodeRecord, protocol, request);
108-
}
109-
110-
public Stream<NodeRecord> streamLiveNodes() {
111-
return Stream.concat(
112-
buckets.streamClosestNodes(Bytes32.ZERO), discoveryManager.streamActiveSessions());
113-
}
114-
115-
public CompletableFuture<Collection<NodeRecord>> searchForNewPeers() {
116-
return taskManager.searchForNewPeers();
117-
}
118-
119-
/**
120-
* Lookup node in locally stored KBuckets by its nodeId. Allows lookup of local node record.
121-
*
122-
* @param nodeId NodeId, big endian UInt256 Node ID in bytes
123-
* @return NodeRecord if any found
124-
*/
125-
public Optional<NodeRecord> lookupNode(final Bytes nodeId) {
126-
if (nodeId.equals(getLocalNodeRecord().getNodeId())) {
127-
return Optional.of(getLocalNodeRecord());
128-
}
129-
return buckets
130-
.streamClosestNodes(nodeId)
131-
.findFirst()
132-
.filter(node -> node.getNodeId().equals(nodeId));
133-
}
134-
135-
/**
136-
* Gets all the NodeRecords in the routing table, grouped by their bucket
137-
*
138-
* @return all the NodeRecords in the routing table, grouped by their bucket
139-
*/
140-
public List<List<NodeRecord>> getNodeRecordBuckets() {
141-
return buckets.getNodeRecordBuckets();
142-
}
143-
144-
/**
145-
* Adds a NodeRecord to the routing table
146-
*
147-
* @param nodeRecord The NodeRecord to add to the routing table
148-
*/
149-
public void addNodeRecord(NodeRecord nodeRecord) {
150-
buckets.offer(nodeRecord);
151-
}
15212

153-
/**
154-
* Deletes the NodeRecord identified by nodeId from the routing table
155-
*
156-
* @param nodeId The node ID to be deleted from the routing table
157-
*/
158-
public void deleteNode(Bytes nodeId) {
159-
buckets.deleteNode(nodeId);
160-
}
13+
public interface DiscoverySystem {
14+
CompletableFuture<Void> start();
15+
void stop();
16+
NodeRecord getLocalNodeRecord();
17+
BucketStats getBucketStats();
18+
void updateCustomFieldValue(final String fieldName, final Bytes value);
19+
/**
20+
* Initiates FINDNODE with node `nodeRecord`
21+
*
22+
* @param nodeRecord Ethereum Node record
23+
* @param distances Distances to search for
24+
* @return Future which is fired when reply is received or fails in timeout/not successful
25+
* handshake/bad message exchange. Contains the collection of nodes returned by the peer.
26+
*/
27+
CompletableFuture<Collection<NodeRecord>> findNodes(
28+
NodeRecord nodeRecord, List<Integer> distances);
29+
/**
30+
* Initiates PING with node `nodeRecord`
31+
*
32+
* @param nodeRecord Ethereum Node record
33+
* @return Future which is fired when reply is received or fails in timeout/not successful
34+
* handshake/bad message exchange.
35+
*/
36+
CompletableFuture<Void> ping(NodeRecord nodeRecord);
37+
/**
38+
* Initiates TALK with node `nodeRecord`
39+
*
40+
* @param nodeRecord Ethereum Node record
41+
* @return Promise of the node TALK response.
42+
*/
43+
CompletableFuture<Bytes> talk(NodeRecord nodeRecord, Bytes protocol, Bytes request);
44+
Stream<NodeRecord> streamLiveNodes();
45+
CompletableFuture<Collection<NodeRecord>> searchForNewPeers();
46+
/**
47+
* Lookup node in locally stored KBuckets by its nodeId. Allows lookup of local node record.
48+
*
49+
* @param nodeId NodeId, big endian UInt256 Node ID in bytes
50+
* @return NodeRecord if any found
51+
*/
52+
Optional<NodeRecord> lookupNode(final Bytes nodeId);
53+
/**
54+
* Gets all the NodeRecords in the routing table, grouped by their bucket
55+
*
56+
* @return all the NodeRecords in the routing table, grouped by their bucket
57+
*/
58+
List<List<NodeRecord>> getNodeRecordBuckets();
16159
}

src/main/java/org/ethereum/beacon/discovery/DiscoverySystemBuilder.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ private void createDefaults() {
263263
LocalNodeRecordStore localNodeRecordStore;
264264
ExpirationSchedulerFactory expirationSchedulerFactory;
265265

266-
public DiscoverySystem build() {
266+
public <T extends DiscoverySystem> T build(Class<T> desiredClass) {
267267
checkNotNull(localNodeRecord, "Missing local node record");
268268
checkNotNull(secretKey, "Missing secret key");
269269
createDefaults();
@@ -285,12 +285,12 @@ public DiscoverySystem build() {
285285
recursiveLookupInterval,
286286
retryTimeout,
287287
lifeCheckInterval);
288-
return new DiscoverySystem(
288+
return desiredClass.cast(new DiscoverySystemImpl(
289289
discoveryManager,
290290
discoveryTaskManager,
291291
expirationSchedulerFactory,
292292
nodeBucketStorage,
293-
bootnodes);
293+
bootnodes));
294294
}
295295

296296
@VisibleForTesting
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*/
4+
package org.ethereum.beacon.discovery;
5+
6+
import java.util.Collection;
7+
import java.util.List;
8+
import java.util.Optional;
9+
import java.util.concurrent.CompletableFuture;
10+
import java.util.stream.Stream;
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.apache.tuweni.bytes.Bytes;
14+
import org.apache.tuweni.bytes.Bytes32;
15+
import org.ethereum.beacon.discovery.scheduler.ExpirationSchedulerFactory;
16+
import org.ethereum.beacon.discovery.schema.NodeRecord;
17+
import org.ethereum.beacon.discovery.storage.BucketStats;
18+
import org.ethereum.beacon.discovery.storage.KBuckets;
19+
import org.ethereum.beacon.discovery.task.DiscoveryTaskManager;
20+
21+
public class DiscoverySystemImpl implements DiscoverySystem, MutableDiscoverySystem {
22+
private static final Logger LOG = LogManager.getLogger();
23+
private final DiscoveryManager discoveryManager;
24+
private final DiscoveryTaskManager taskManager;
25+
private final ExpirationSchedulerFactory expirationSchedulerFactory;
26+
private final KBuckets buckets;
27+
private final List<NodeRecord> bootnodes;
28+
29+
DiscoverySystemImpl(
30+
final DiscoveryManager discoveryManager,
31+
final DiscoveryTaskManager taskManager,
32+
final ExpirationSchedulerFactory expirationSchedulerFactory,
33+
final KBuckets buckets,
34+
final List<NodeRecord> bootnodes) {
35+
this.discoveryManager = discoveryManager;
36+
this.taskManager = taskManager;
37+
this.expirationSchedulerFactory = expirationSchedulerFactory;
38+
this.buckets = buckets;
39+
this.bootnodes = bootnodes;
40+
}
41+
42+
@Override
43+
public CompletableFuture<Void> start() {
44+
return discoveryManager.start().thenRun(taskManager::start).thenRun(this::pingBootnodes);
45+
}
46+
47+
private void pingBootnodes() {
48+
bootnodes.forEach(
49+
bootnode ->
50+
discoveryManager
51+
.ping(bootnode)
52+
.exceptionally(
53+
e -> {
54+
LOG.debug("Failed to ping bootnode: " + bootnode);
55+
return null;
56+
}));
57+
}
58+
59+
@Override
60+
public void stop() {
61+
taskManager.stop();
62+
discoveryManager.stop();
63+
expirationSchedulerFactory.stop();
64+
}
65+
66+
@Override
67+
public NodeRecord getLocalNodeRecord() {
68+
return discoveryManager.getLocalNodeRecord();
69+
}
70+
71+
@Override
72+
public BucketStats getBucketStats() {
73+
return buckets.getStats();
74+
}
75+
76+
@Override
77+
public void updateCustomFieldValue(final String fieldName, final Bytes value) {
78+
discoveryManager.updateCustomFieldValue(fieldName, value);
79+
}
80+
81+
@Override
82+
public CompletableFuture<Collection<NodeRecord>> findNodes(
83+
NodeRecord nodeRecord, List<Integer> distances) {
84+
return discoveryManager.findNodes(nodeRecord, distances);
85+
}
86+
87+
@Override
88+
public CompletableFuture<Void> ping(NodeRecord nodeRecord) {
89+
return discoveryManager.ping(nodeRecord);
90+
}
91+
92+
@Override
93+
public CompletableFuture<Bytes> talk(NodeRecord nodeRecord, Bytes protocol, Bytes request) {
94+
return discoveryManager.talk(nodeRecord, protocol, request);
95+
}
96+
97+
@Override
98+
public Stream<NodeRecord> streamLiveNodes() {
99+
return Stream.concat(
100+
buckets.streamClosestNodes(Bytes32.ZERO), discoveryManager.streamActiveSessions());
101+
}
102+
103+
@Override
104+
public CompletableFuture<Collection<NodeRecord>> searchForNewPeers() {
105+
return taskManager.searchForNewPeers();
106+
}
107+
108+
/**
109+
* Lookup node in locally stored KBuckets by its nodeId. Allows lookup of local node record.
110+
*
111+
* @param nodeId NodeId, big endian UInt256 Node ID in bytes
112+
* @return NodeRecord if any found
113+
*/
114+
@Override
115+
public Optional<NodeRecord> lookupNode(final Bytes nodeId) {
116+
if (nodeId.equals(getLocalNodeRecord().getNodeId())) {
117+
return Optional.of(getLocalNodeRecord());
118+
}
119+
return buckets
120+
.streamClosestNodes(nodeId)
121+
.findFirst()
122+
.filter(node -> node.getNodeId().equals(nodeId));
123+
}
124+
125+
@Override
126+
public List<List<NodeRecord>> getNodeRecordBuckets() {
127+
return buckets.getNodeRecordBuckets();
128+
}
129+
130+
@Override
131+
public void addNodeRecord(NodeRecord nodeRecord) {
132+
buckets.offer(nodeRecord);
133+
}
134+
135+
@Override
136+
public void deleteNode(Bytes nodeId) {
137+
buckets.deleteNode(nodeId);
138+
}
139+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package org.ethereum.beacon.discovery;
2+
3+
import org.apache.tuweni.bytes.Bytes;
4+
import org.ethereum.beacon.discovery.schema.NodeRecord;
5+
6+
public interface MutableDiscoverySystem extends DiscoverySystem {
7+
/**
8+
* Adds a NodeRecord to the routing table
9+
*
10+
* @param nodeRecord The NodeRecord to add to the routing table
11+
*/
12+
void addNodeRecord(NodeRecord nodeRecord);
13+
/**
14+
* Deletes the NodeRecord identified by nodeId from the routing table
15+
*
16+
* @param nodeId The node ID to be deleted from the routing table
17+
*/
18+
void deleteNode(Bytes nodeId);
19+
}

0 commit comments

Comments
 (0)