Skip to content

Commit 07c8ebb

Browse files
committed
fix: prevent pipeline errors from killing the Reactor subscription
The pipeline used doOnNext to chain handlers, but onErrorContinue does not work reliably with doOnNext (a known Reactor limitation). When a StackOverflowError escaped a handler, it could intermittently terminate the Reactor subscription, causing the node to stop processing all subsequent incoming packets. Wrap each handler invocation in a try-catch within the pipeline so errors are contained at the handler level and never propagate to Reactor. Also add catch(Throwable) to HandshakeMessagePacketHandler for defense-in-depth, matching the existing pattern in MessagePacketHandler.
1 parent 77b1e42 commit 07c8ebb

2 files changed

Lines changed: 16 additions & 1 deletion

File tree

src/main/java/org/ethereum/beacon/discovery/pipeline/PipelineImpl.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,15 @@ public class PipelineImpl implements Pipeline {
2929
public synchronized Pipeline build() {
3030
started.set(true);
3131
for (EnvelopeHandler handler : envelopeHandlers) {
32-
pipeline = pipeline.doOnNext(handler::handle);
32+
pipeline =
33+
pipeline.doOnNext(
34+
envelope -> {
35+
try {
36+
handler.handle(envelope);
37+
} catch (Throwable t) {
38+
LOG.debug("Error in pipeline handler {}: {}", handler.getClass().getSimpleName(), t.getMessage());
39+
}
40+
});
3341
}
3442
Flux.from(pipeline)
3543
.onErrorContinue(

src/main/java/org/ethereum/beacon/discovery/pipeline/handler/HandshakeMessagePacketHandler.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,13 @@ public void handle(Envelope envelope) {
151151
packet, session.getNodeRecord(), session.getState()),
152152
ex);
153153
markHandshakeAsFailed(envelope, session);
154+
} catch (Throwable t) {
155+
LOG.warn(
156+
"Unexpected error while processing handshake [{}] from node {}",
157+
packet,
158+
session.getNodeRecord(),
159+
t);
160+
markHandshakeAsFailed(envelope, session);
154161
}
155162
}
156163

0 commit comments

Comments
 (0)