forked from Consensys/discovery
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathPipelineImpl.java
More file actions
80 lines (71 loc) · 2.43 KB
/
PipelineImpl.java
File metadata and controls
80 lines (71 loc) · 2.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
/*
* SPDX-License-Identifier: Apache-2.0
*/
package org.ethereum.beacon.discovery.pipeline;
import static org.ethereum.beacon.discovery.pipeline.Field.INCOMING;
import static org.ethereum.beacon.discovery.util.Utils.RECOVERABLE_ERRORS_PREDICATE;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.ReplayProcessor;
public class PipelineImpl implements Pipeline {
private static final Logger LOG = LogManager.getLogger();
private final List<EnvelopeHandler> envelopeHandlers = new ArrayList<>();
private final AtomicBoolean started = new AtomicBoolean(false);
private Flux<Envelope> pipeline = ReplayProcessor.cacheLast();
private final FluxSink<Envelope> pipelineSink = ((ReplayProcessor<Envelope>) pipeline).sink();
@Override
public synchronized Pipeline build() {
started.set(true);
for (EnvelopeHandler handler : envelopeHandlers) {
pipeline =
pipeline.doOnNext(
envelope -> {
try {
handler.handle(envelope);
} catch (Throwable t) {
LOG.debug(
"Unexpected error in pipeline handler {}",
handler.getClass().getSimpleName(),
t);
}
});
}
Flux.from(pipeline)
.onErrorContinue(
RECOVERABLE_ERRORS_PREDICATE,
(err, msg) -> LOG.debug("Error while processing message: " + err))
.subscribe();
return this;
}
@Override
public void push(Object object) {
if (!started.get()) {
throw new RuntimeException("You should build pipeline first");
}
if (!(object instanceof Envelope)) {
Envelope envelope = new Envelope();
envelope.put(INCOMING, object);
pipelineSink.next(envelope);
} else {
pipelineSink.next((Envelope) object);
}
}
@Override
public Pipeline addHandler(EnvelopeHandler envelopeHandler) {
if (started.get()) {
throw new RuntimeException("Pipeline already started, couldn't add any handlers");
}
envelopeHandlers.add(envelopeHandler);
return this;
}
@Override
public Publisher<Envelope> getOutgoingEnvelopes() {
return pipeline;
}
}