File tree Expand file tree Collapse file tree
main/java/org/ethereum/beacon/discovery/pipeline
test/java/org/ethereum/beacon/discovery/pipeline Expand file tree Collapse file tree Original file line number Diff line number Diff line change 1+ /*
2+ * SPDX-License-Identifier: Apache-2.0
3+ */
4+
5+ package org .ethereum .beacon .discovery .pipeline ;
6+
7+ /**
8+ * Base class for {@link EnvelopeHandler}s that should be skipped once an envelope has been marked
9+ * with {@link Field#BAD_PACKET}. Subclasses implement {@link #handlePacket(Envelope)} and get the
10+ * skip behaviour for free; the terminal bad-packet handler implements {@link EnvelopeHandler}
11+ * directly.
12+ */
13+ public abstract class AbstractSkippingEnvelopeHandler implements EnvelopeHandler {
14+
15+ @ Override
16+ public final void handle (final Envelope envelope ) {
17+ if (envelope .contains (Field .BAD_PACKET )) {
18+ return ;
19+ }
20+ handlePacket (envelope );
21+ }
22+
23+ protected abstract void handlePacket (Envelope envelope );
24+ }
Original file line number Diff line number Diff line change 1313import org .ethereum .beacon .discovery .AddressAccessPolicy ;
1414import org .ethereum .beacon .discovery .message .V5Message ;
1515import org .ethereum .beacon .discovery .packet .HandshakeMessagePacket ;
16+ import org .ethereum .beacon .discovery .pipeline .AbstractSkippingEnvelopeHandler ;
1617import org .ethereum .beacon .discovery .pipeline .Envelope ;
17- import org .ethereum .beacon .discovery .pipeline .EnvelopeHandler ;
1818import org .ethereum .beacon .discovery .pipeline .Field ;
1919import org .ethereum .beacon .discovery .pipeline .HandlerUtil ;
2020import org .ethereum .beacon .discovery .pipeline .Pipeline ;
2727import org .ethereum .beacon .discovery .util .Functions ;
2828
2929/** Handles {@link HandshakeMessagePacket} in {@link Field#PACKET_HANDSHAKE} field */
30- public class HandshakeMessagePacketHandler implements EnvelopeHandler {
30+ public class HandshakeMessagePacketHandler extends AbstractSkippingEnvelopeHandler {
3131 private static final Logger LOG = LogManager .getLogger (HandshakeMessagePacketHandler .class );
3232 private final Pipeline outgoingPipeline ;
3333 private final Scheduler scheduler ;
@@ -49,7 +49,7 @@ public HandshakeMessagePacketHandler(
4949 }
5050
5151 @ Override
52- public void handle (Envelope envelope ) {
52+ protected void handlePacket (Envelope envelope ) {
5353 if (!HandlerUtil .requireField (Field .PACKET_HANDSHAKE , envelope )) {
5454 return ;
5555 }
Original file line number Diff line number Diff line change 99import org .apache .tuweni .bytes .Bytes ;
1010import org .ethereum .beacon .discovery .packet .Packet ;
1111import org .ethereum .beacon .discovery .packet .RawPacket ;
12+ import org .ethereum .beacon .discovery .pipeline .AbstractSkippingEnvelopeHandler ;
1213import org .ethereum .beacon .discovery .pipeline .Envelope ;
13- import org .ethereum .beacon .discovery .pipeline .EnvelopeHandler ;
1414import org .ethereum .beacon .discovery .pipeline .Field ;
1515import org .ethereum .beacon .discovery .pipeline .HandlerUtil ;
1616import org .ethereum .beacon .discovery .type .Bytes16 ;
1717import org .ethereum .beacon .discovery .util .DecodeException ;
1818
1919/** Handles raw BytesValue incoming data in {@link Field#INCOMING} */
20- public class IncomingDataPacker implements EnvelopeHandler {
20+ public class IncomingDataPacker extends AbstractSkippingEnvelopeHandler {
2121 private static final Logger LOG = LogManager .getLogger (IncomingDataPacker .class );
2222 public static final int MAX_PACKET_SIZE = 1280 ;
2323 public static final int MIN_PACKET_SIZE = 63 ;
@@ -28,7 +28,7 @@ public IncomingDataPacker(Bytes homeNodeId) {
2828 }
2929
3030 @ Override
31- public void handle (Envelope envelope ) {
31+ protected void handlePacket (Envelope envelope ) {
3232 if (!HandlerUtil .requireField (Field .INCOMING , envelope )) {
3333 return ;
3434 }
Original file line number Diff line number Diff line change 1010import org .ethereum .beacon .discovery .message .V5Message ;
1111import org .ethereum .beacon .discovery .message .handler .EnrUpdateTracker .EnrUpdater ;
1212import org .ethereum .beacon .discovery .message .handler .ExternalAddressSelector ;
13+ import org .ethereum .beacon .discovery .pipeline .AbstractSkippingEnvelopeHandler ;
1314import org .ethereum .beacon .discovery .pipeline .Envelope ;
14- import org .ethereum .beacon .discovery .pipeline .EnvelopeHandler ;
1515import org .ethereum .beacon .discovery .pipeline .Field ;
1616import org .ethereum .beacon .discovery .pipeline .HandlerUtil ;
1717import org .ethereum .beacon .discovery .processor .DiscoveryV5MessageProcessor ;
1818import org .ethereum .beacon .discovery .processor .MessageProcessor ;
1919import org .ethereum .beacon .discovery .schema .NodeSession ;
2020import org .ethereum .beacon .discovery .storage .LocalNodeRecordStore ;
2121
22- public class MessageHandler implements EnvelopeHandler {
22+ public class MessageHandler extends AbstractSkippingEnvelopeHandler {
2323 private static final Logger LOG = LogManager .getLogger (MessageHandler .class );
2424 private final MessageProcessor messageProcessor ;
2525
@@ -36,7 +36,7 @@ public MessageHandler(
3636 }
3737
3838 @ Override
39- public void handle (Envelope envelope ) {
39+ protected void handlePacket (Envelope envelope ) {
4040 if (!HandlerUtil .requireField (Field .MESSAGE , envelope )) {
4141 return ;
4242 }
Original file line number Diff line number Diff line change 99import org .ethereum .beacon .discovery .message .V5Message ;
1010import org .ethereum .beacon .discovery .packet .MessagePacket ;
1111import org .ethereum .beacon .discovery .packet .OrdinaryMessagePacket ;
12+ import org .ethereum .beacon .discovery .pipeline .AbstractSkippingEnvelopeHandler ;
1213import org .ethereum .beacon .discovery .pipeline .Envelope ;
13- import org .ethereum .beacon .discovery .pipeline .EnvelopeHandler ;
1414import org .ethereum .beacon .discovery .pipeline .Field ;
1515import org .ethereum .beacon .discovery .pipeline .HandlerUtil ;
1616import org .ethereum .beacon .discovery .schema .NodeRecordFactory ;
1919import org .ethereum .beacon .discovery .util .DecryptException ;
2020
2121/** Handles {@link MessagePacket} in {@link Field#PACKET_MESSAGE} field */
22- public class MessagePacketHandler implements EnvelopeHandler {
22+ public class MessagePacketHandler extends AbstractSkippingEnvelopeHandler {
2323 private static final Logger LOG = LogManager .getLogger (MessagePacketHandler .class );
2424 private final NodeRecordFactory nodeRecordFactory ;
2525
@@ -28,7 +28,7 @@ public MessagePacketHandler(NodeRecordFactory nodeRecordFactory) {
2828 }
2929
3030 @ Override
31- public void handle (Envelope envelope ) {
31+ protected void handlePacket (Envelope envelope ) {
3232 if (!HandlerUtil .requireField (Field .PACKET_MESSAGE , envelope )) {
3333 return ;
3434 }
Original file line number Diff line number Diff line change 66
77import org .apache .logging .log4j .LogManager ;
88import org .apache .logging .log4j .Logger ;
9+ import org .ethereum .beacon .discovery .pipeline .AbstractSkippingEnvelopeHandler ;
910import org .ethereum .beacon .discovery .pipeline .Envelope ;
10- import org .ethereum .beacon .discovery .pipeline .EnvelopeHandler ;
1111import org .ethereum .beacon .discovery .pipeline .Field ;
1212import org .ethereum .beacon .discovery .pipeline .HandlerUtil ;
1313import org .ethereum .beacon .discovery .pipeline .info .Request ;
1414import org .ethereum .beacon .discovery .schema .NodeSession ;
1515
1616/** Enqueues task in session for any task found in {@link Field#REQUEST} */
17- public class NewTaskHandler implements EnvelopeHandler {
17+ public class NewTaskHandler extends AbstractSkippingEnvelopeHandler {
1818 private static final Logger LOG = LogManager .getLogger (NewTaskHandler .class );
1919
2020 @ Override
2121 @ SuppressWarnings ("rawtypes" )
22- public void handle (Envelope envelope ) {
22+ protected void handlePacket (Envelope envelope ) {
2323 if (!HandlerUtil .requireField (Field .REQUEST , envelope )) {
2424 return ;
2525 }
Original file line number Diff line number Diff line change 1111import org .apache .logging .log4j .Logger ;
1212import org .apache .tuweni .bytes .Bytes ;
1313import org .ethereum .beacon .discovery .message .V5Message ;
14+ import org .ethereum .beacon .discovery .pipeline .AbstractSkippingEnvelopeHandler ;
1415import org .ethereum .beacon .discovery .pipeline .Envelope ;
15- import org .ethereum .beacon .discovery .pipeline .EnvelopeHandler ;
1616import org .ethereum .beacon .discovery .pipeline .Field ;
1717import org .ethereum .beacon .discovery .pipeline .HandlerUtil ;
1818import org .ethereum .beacon .discovery .pipeline .Pipeline ;
2323import org .ethereum .beacon .discovery .task .TaskStatus ;
2424
2525/** Gets next request task in session and processes it */
26- public class NextTaskHandler implements EnvelopeHandler {
26+ public class NextTaskHandler extends AbstractSkippingEnvelopeHandler {
2727 private static final Logger LOG = LogManager .getLogger (NextTaskHandler .class );
2828 private static final int DEFAULT_DELAY_MS = 1000 ;
2929 private static final int RANDOM_MESSAGE_SIZE = 128 ;
@@ -46,7 +46,7 @@ public static void tryToSendAwaitTaskIfAny(
4646 }
4747
4848 @ Override
49- public void handle (Envelope envelope ) {
49+ protected void handlePacket (Envelope envelope ) {
5050 if (!HandlerUtil .requireSessionWithNodeRecord (envelope )) {
5151 return ;
5252 }
Original file line number Diff line number Diff line change 1919import org .apache .logging .log4j .Logger ;
2020import org .apache .tuweni .bytes .Bytes ;
2121import org .ethereum .beacon .discovery .crypto .Signer ;
22+ import org .ethereum .beacon .discovery .pipeline .AbstractSkippingEnvelopeHandler ;
2223import org .ethereum .beacon .discovery .pipeline .Envelope ;
23- import org .ethereum .beacon .discovery .pipeline .EnvelopeHandler ;
2424import org .ethereum .beacon .discovery .pipeline .Field ;
2525import org .ethereum .beacon .discovery .pipeline .HandlerUtil ;
2626import org .ethereum .beacon .discovery .pipeline .Pipeline ;
3737 * Performs {@link Field#SESSION_LOOKUP} request. Looks up for Node session based on NodeId, which
3838 * should be in request field and stores it in {@link Field#SESSION} field.
3939 */
40- public class NodeSessionManager implements EnvelopeHandler {
40+ public class NodeSessionManager extends AbstractSkippingEnvelopeHandler {
4141 private static final int SESSION_CLEANUP_DELAY_SECONDS = 180 ;
4242 private static final int REQUEST_CLEANUP_DELAY_SECONDS = 60 ;
4343 private static final Logger LOG = LogManager .getLogger (NodeSessionManager .class );
@@ -67,7 +67,7 @@ public NodeSessionManager(
6767 }
6868
6969 @ Override
70- public void handle (final Envelope envelope ) {
70+ protected void handlePacket (final Envelope envelope ) {
7171 if (!HandlerUtil .requireField (Field .SESSION_LOOKUP , envelope )) {
7272 return ;
7373 }
Original file line number Diff line number Diff line change 66
77import org .apache .logging .log4j .LogManager ;
88import org .apache .logging .log4j .Logger ;
9+ import org .ethereum .beacon .discovery .pipeline .AbstractSkippingEnvelopeHandler ;
910import org .ethereum .beacon .discovery .pipeline .Envelope ;
10- import org .ethereum .beacon .discovery .pipeline .EnvelopeHandler ;
1111import org .ethereum .beacon .discovery .pipeline .Field ;
1212import org .ethereum .beacon .discovery .pipeline .HandlerUtil ;
1313
1414/**
1515 * Searches for node in {@link Field#NODE} and requests session resolving using {@link
1616 * Field#SESSION_LOOKUP}
1717 */
18- public class NodeSessionRequestHandler implements EnvelopeHandler {
18+ public class NodeSessionRequestHandler extends AbstractSkippingEnvelopeHandler {
1919 private static final Logger LOG = LogManager .getLogger (NodeSessionRequestHandler .class );
2020
2121 @ Override
22- public void handle (Envelope envelope ) {
22+ protected void handlePacket (Envelope envelope ) {
2323 if (!HandlerUtil .requireField (Field .NODE , envelope )) {
2424 return ;
2525 }
Original file line number Diff line number Diff line change 88import org .apache .logging .log4j .Logger ;
99import org .ethereum .beacon .discovery .AddressAccessPolicy ;
1010import org .ethereum .beacon .discovery .network .NetworkParcel ;
11+ import org .ethereum .beacon .discovery .pipeline .AbstractSkippingEnvelopeHandler ;
1112import org .ethereum .beacon .discovery .pipeline .Envelope ;
12- import org .ethereum .beacon .discovery .pipeline .EnvelopeHandler ;
1313import org .ethereum .beacon .discovery .pipeline .Field ;
1414import org .ethereum .beacon .discovery .pipeline .HandlerUtil ;
1515import reactor .core .publisher .FluxSink ;
1919 * we have outgoing parcel at the very first stage. Handler pushes it to `outgoingSink` stream which
2020 * is linked with discovery client.
2121 */
22- public class OutgoingParcelHandler implements EnvelopeHandler {
22+ public class OutgoingParcelHandler extends AbstractSkippingEnvelopeHandler {
2323 private static final Logger LOG = LogManager .getLogger (OutgoingParcelHandler .class );
2424
2525 private final FluxSink <NetworkParcel > outgoingSink ;
@@ -32,7 +32,7 @@ public OutgoingParcelHandler(
3232 }
3333
3434 @ Override
35- public void handle (Envelope envelope ) {
35+ protected void handlePacket (Envelope envelope ) {
3636 if (!HandlerUtil .requireField (Field .INCOMING , envelope )) {
3737 return ;
3838 }
You can’t perform that action at this time.
0 commit comments