Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run!",
"modification": 1,
"modification": 2,
}

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run!",
"modification": 1,
"modification": 2,
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,20 +117,12 @@ public NativeReader<?> create(

@Override
public NativeReaderIterator<WindowedValue<T>> iterator() throws IOException {
return new PubsubReaderIterator(context.getWorkItem());
return new PubsubReaderIterator();
}

class PubsubReaderIterator extends WindmillReaderIteratorBase<T> {
protected PubsubReaderIterator(Windmill.WorkItem work) {
super(work, skipUndecodableElements);
}

@Override
public boolean advance() throws IOException {
if (context.workIsFailed()) {
return false;
}
return super.advance();
protected PubsubReaderIterator() {
super(context, skipUndecodableElements);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInput;
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputState;
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
import org.apache.beam.runners.dataflow.worker.util.common.worker.WorkExecutor;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataId;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.GlobalDataRequest;
Expand Down Expand Up @@ -157,6 +158,8 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext<Step
*/
private @Nullable UnboundedReader<?> activeReader;

private @Nullable WorkExecutor workExecutor;
Comment thread
arunpandianp marked this conversation as resolved.

public StreamingModeExecutionContext(
CounterFactory counterFactory,
String computationId,
Expand Down Expand Up @@ -240,9 +243,11 @@ public void start(
Work work,
WindmillStateReader stateReader,
SideInputStateFetcher sideInputStateFetcher,
Windmill.WorkItemCommitRequest.Builder outputBuilder) {
Windmill.WorkItemCommitRequest.Builder outputBuilder,
WorkExecutor workExecutor) {
this.key = key;
this.work = work;
this.workExecutor = workExecutor;
Comment thread
arunpandianp marked this conversation as resolved.
this.computationKey = WindmillComputationKey.create(computationId, work.getShardedKey());
this.sideInputStateFetcher = sideInputStateFetcher;
StreamingGlobalConfig config = globalConfigHandle.getConfig();
Expand Down Expand Up @@ -270,6 +275,11 @@ public void start(
}
}

public void finishKey() throws Exception {
checkNotNull(workExecutor, "workExecutor must be set before calling finishKey()");
workExecutor.finishKey();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Use the finishKeyCalled flag to ensure that workExecutor.finishKey() is only executed once per work item.

  public void finishKey() throws Exception {
    if (finishKeyCalled) {
      return;
    }
    checkNotNull(workExecutor, "workExecutor must be set before calling finishKey()");
    finishKeyCalled = true;
    workExecutor.finishKey();
  }


/**
* Ensure that the processing time is greater than any fired processing time timers. Otherwise, a
* trigger could ignore the timer and orphan the window.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,20 +109,12 @@ public NativeReader<?> create(

@Override
public NativeReaderIterator<WindowedValue<T>> iterator() throws IOException {
return new UngroupedWindmillReaderIterator(context.getWorkItem());
return new UngroupedWindmillReaderIterator();
}

class UngroupedWindmillReaderIterator extends WindmillReaderIteratorBase<T> {
UngroupedWindmillReaderIterator(Windmill.WorkItem work) {
super(work, skipUndecodableElements);
}

@Override
public boolean advance() throws IOException {
if (context.workIsFailed()) {
return false;
}
return super.advance();
UngroupedWindmillReaderIterator() {
super(context, skipUndecodableElements);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
*/
public abstract class WindmillReaderIteratorBase<T>
extends NativeReader.NativeReaderIterator<WindowedValue<T>> {
private final StreamingModeExecutionContext context;
private final Windmill.WorkItem work;
private int bundleIndex = 0;
private int messageIndex = -1;
Expand All @@ -42,9 +43,10 @@ public abstract class WindmillReaderIteratorBase<T>
private static final Logger LOG = LoggerFactory.getLogger(WindmillReaderIteratorBase.class);

protected WindmillReaderIteratorBase(
Windmill.WorkItem work, ValueProvider<Boolean> skipUndecodableElements) {
StreamingModeExecutionContext context, ValueProvider<Boolean> skipUndecodableElements) {
this.context = context;
this.skipUndecodableElements = skipUndecodableElements;
this.work = work;
this.work = context.getWorkItem();
}

@Override
Expand All @@ -54,9 +56,18 @@ public boolean start() throws IOException {

@Override
public boolean advance() throws IOException {
if (context.workIsFailed()) {
throw new WorkItemCancelledException(context.getWorkItem().getShardingKey());
}

while (true) {
if (bundleIndex >= work.getMessageBundlesCount()) {
current = null;
try {
context.finishKey();
} catch (Exception e) {
throw new RuntimeException(e);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Instead of wrapping all exceptions in RuntimeException, consider catching IOException and RuntimeException separately to rethrow them directly, and wrapping only other checked exceptions in IOException. This maintains consistency with the advance() method's signature and allows callers to handle specific exception types.

        } catch (IOException | RuntimeException e) {
          throw e;
        } catch (Exception e) {
          throw new IOException(e);
        }

return false;
}
Windmill.InputMessageBundle bundle = work.getMessageBundles(bundleIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ public boolean start() throws IOException {
@Override
public boolean advance() throws IOException {
Comment on lines 183 to 184
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The anonymous iterator in WindowingWindmillReader should also check context.workIsFailed() and throw WorkItemCancelledException, consistent with the changes in WindmillReaderIteratorBase. This ensures that cancelled work items are handled promptly.

        @Override
        public boolean advance() throws IOException {
          if (context.workIsFailed()) {
            throw new WorkItemCancelledException(context.getWorkItem().getShardingKey());
          }

current = null;
try {
context.finishKey();
} catch (Exception e) {
throw new RuntimeException(e);
}
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ public void execute() throws Exception {
LOG.debug("Source operation execution complete");
}

@Override
public void finishKey() throws Exception {}

@Override
public SourceOperationResponse getResponse() {
return response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public final void executeWork(
SideInputStateFetcher sideInputStateFetcher,
Windmill.WorkItemCommitRequest.Builder outputBuilder)
throws Exception {
context().start(key, work, stateReader, sideInputStateFetcher, outputBuilder);
context().start(key, work, stateReader, sideInputStateFetcher, outputBuilder, workExecutor());
workExecutor().execute();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public void process(Object elem) throws Exception {
}
}

@Override
public void finishKey() throws Exception {}

@Override
public boolean supportsRestart() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ public void execute() throws Exception {
// TODO: support for success / failure ports?
}

@Override
public void finishKey() throws Exception {
for (Operation op : operations) {
op.finishKey();
}
}

@Override
public NativeReader.Progress getWorkerProgress() throws Exception {
return getReadOperation().getProgress();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ public void finish() throws Exception {
}
}

/** Called when all elements for a specific key have been processed. */
public abstract void finishKey() throws Exception;

/** Aborts this Operation's execution. */
public void abort() throws Exception {
synchronized (initializationStateLock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,17 @@ public void process(Object elem) throws Exception {
}

@Override
public void finish() throws Exception {
// Batch mode does not use this method and instead relies on BatchModeUngroupingParDoFn
// to process timers per key.
public void finishKey() throws Exception {
try (Closeable scope = context.enterProcessTimers()) {
checkStarted();
fn.processTimers();
}
}

@Override
public void finish() throws Exception {
try (Closeable scope = context.enterFinish()) {
fn.finishBundle();
super.finish();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,9 @@ public void finish() throws Exception {
}
}

@Override
public void finishKey() throws Exception {}

@Override
public void abort() throws Exception {
if (readerIterator != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ public interface WorkExecutor extends AutoCloseable {
/** Executes the task. */
public abstract void execute() throws Exception;

/** Called when all elements for a specific key have been processed. */
void finishKey() throws Exception;

/**
* Returns the worker's current progress.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ public void finish() throws Exception {
}
}

@Override
public void finishKey() throws Exception {}

@Override
public void abort() throws Exception {
if (writer == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ public void abort() throws Exception {
aborted = true;
super.abort();
}

@Override
public void finishKey() throws Exception {}
}

// A mock ReadOperation fed to a MapTaskExecutor in test.
Expand Down Expand Up @@ -312,6 +315,9 @@ public void start() throws Exception {
Metrics.counter("TestMetric", "MetricCounter").inc(1L);
}
}

@Override
public void finishKey() throws Exception {}
},
new Operation(new OutputReceiver[] {}, context2) {
@Override
Expand All @@ -321,6 +327,9 @@ public void start() throws Exception {
Metrics.counter("TestMetric", "MetricCounter").inc(2L);
}
}

@Override
public void finishKey() throws Exception {}
},
new Operation(new OutputReceiver[] {}, context3) {
@Override
Expand All @@ -330,6 +339,9 @@ public void start() throws Exception {
Metrics.counter("TestMetric", "MetricCounter").inc(3L);
}
}

@Override
public void finishKey() throws Exception {}
});

try (IntrinsicMapTaskExecutor executor =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3657,8 +3657,8 @@ public void testActiveWorkFailure() throws Exception {
server.waitForAndGetCommitsWithTimeout(1, Duration.standardSeconds(5));
assertEquals(1, commits.size());

assertEquals(0, BlockingFn.teardownCounter.get());
assertEquals(1, BlockingFn.setupCounter.get());
assertEquals(1, BlockingFn.teardownCounter.get());
assertEquals(2, BlockingFn.setupCounter.get());

worker.stop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.beam.runners.dataflow.worker.streaming.config.FakeGlobalConfigHandle;
import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig;
import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher;
import org.apache.beam.runners.dataflow.worker.util.common.worker.WorkExecutor;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.FakeGetDataClient;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
Expand Down Expand Up @@ -99,6 +100,7 @@ public class StreamingModeExecutionContextTest {
@Rule public transient Timeout globalTimeout = Timeout.seconds(600);
@Mock private SideInputStateFetcher sideInputStateFetcher;
@Mock private WindmillStateReader stateReader;
@Mock private WorkExecutor workExecutor;

private static final String COMPUTATION_ID = "computationId";

Expand Down Expand Up @@ -168,7 +170,8 @@ public void testTimerInternalsSetTimer() {
Watermarks.builder().setInputDataWatermark(new Instant(1000)).build()),
stateReader,
sideInputStateFetcher,
outputBuilder);
outputBuilder,
workExecutor);

TimerInternals timerInternals = stepContext.timerInternals();

Expand Down Expand Up @@ -218,7 +221,8 @@ public void testTimerInternalsProcessingTimeSkew() {
Watermarks.builder().setInputDataWatermark(new Instant(1000)).build()),
stateReader,
sideInputStateFetcher,
outputBuilder);
outputBuilder,
workExecutor);
TimerInternals timerInternals = stepContext.timerInternals();
assertTrue(timerTimestamp.isBefore(timerInternals.currentProcessingTime()));
}
Expand Down Expand Up @@ -427,7 +431,8 @@ public void testStateTagEncodingBasedOnConfig() {
Watermarks.builder().setInputDataWatermark(new Instant(1000)).build()),
stateReader,
sideInputStateFetcher,
outputBuilder);
outputBuilder,
workExecutor);
assertEquals(expectedEncoding, executionContext.getWindmillTagEncoding().getClass());
}
}
Expand Down
Loading
Loading