diff --git a/src/main/java/org/ethereum/beacon/discovery/pipeline/PipelineImpl.java b/src/main/java/org/ethereum/beacon/discovery/pipeline/PipelineImpl.java index c22005ff9..3c9aff23b 100644 --- a/src/main/java/org/ethereum/beacon/discovery/pipeline/PipelineImpl.java +++ b/src/main/java/org/ethereum/beacon/discovery/pipeline/PipelineImpl.java @@ -29,7 +29,18 @@ public class PipelineImpl implements Pipeline { public synchronized Pipeline build() { started.set(true); for (EnvelopeHandler handler : envelopeHandlers) { - pipeline = pipeline.doOnNext(handler::handle); + pipeline = + pipeline.doOnNext( + envelope -> { + try { + handler.handle(envelope); + } catch (Throwable t) { + LOG.warn( + "Unexpected error in pipeline handler {}", + handler.getClass().getSimpleName(), + t); + } + }); } Flux.from(pipeline) .onErrorContinue( diff --git a/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/HandshakeMessagePacketHandler.java b/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/HandshakeMessagePacketHandler.java index b25f127b4..328cf23f2 100644 --- a/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/HandshakeMessagePacketHandler.java +++ b/src/main/java/org/ethereum/beacon/discovery/pipeline/handler/HandshakeMessagePacketHandler.java @@ -151,6 +151,13 @@ public void handle(Envelope envelope) { packet, session.getNodeRecord(), session.getState()), ex); markHandshakeAsFailed(envelope, session); + } catch (Throwable t) { + LOG.warn( + "Unexpected error while processing handshake [{}] from node {}", + packet, + session.getNodeRecord(), + t); + markHandshakeAsFailed(envelope, session); } }