Skip to content

Commit 72315ae

Browse files
authored
fix: prevent pipeline errors from killing Reactor subscription (#205)
The pipeline used doOnNext to chain handlers, but onErrorContinue does not work reliably with doOnNext (a known Reactor limitation). When a StackOverflowError escaped a handler, it could intermittently terminate the Reactor subscription, causing the node to stop processing all subsequent incoming packets. Wrap each handler invocation in a try-catch within the pipeline so errors are contained at the handler level and never propagate to Reactor. Also add catch(Throwable) to HandshakeMessagePacketHandler for defense-in-depth, matching the existing pattern in MessagePacketHandler.
1 parent 77b1e42 commit 72315ae

2 files changed

Lines changed: 19 additions & 1 deletion

File tree

src/main/java/org/ethereum/beacon/discovery/pipeline/PipelineImpl.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,18 @@ public class PipelineImpl implements Pipeline {
2929
public synchronized Pipeline build() {
3030
started.set(true);
3131
for (EnvelopeHandler handler : envelopeHandlers) {
32-
pipeline = pipeline.doOnNext(handler::handle);
32+
pipeline =
33+
pipeline.doOnNext(
34+
envelope -> {
35+
try {
36+
handler.handle(envelope);
37+
} catch (Throwable t) {
38+
LOG.warn(
39+
"Unexpected error in pipeline handler {}",
40+
handler.getClass().getSimpleName(),
41+
t);
42+
}
43+
});
3344
}
3445
Flux.from(pipeline)
3546
.onErrorContinue(

src/main/java/org/ethereum/beacon/discovery/pipeline/handler/HandshakeMessagePacketHandler.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,13 @@ public void handle(Envelope envelope) {
151151
packet, session.getNodeRecord(), session.getState()),
152152
ex);
153153
markHandshakeAsFailed(envelope, session);
154+
} catch (Throwable t) {
155+
LOG.warn(
156+
"Unexpected error while processing handshake [{}] from node {}",
157+
packet,
158+
session.getNodeRecord(),
159+
t);
160+
markHandshakeAsFailed(envelope, session);
154161
}
155162
}
156163

0 commit comments

Comments
 (0)