diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java index cbace6e7ff40..07fd72d5d0fb 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java @@ -44,7 +44,6 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; @@ -559,8 +558,9 @@ private void handleAppendFailure( String shortTableId, AppendClientInfo appendClientInfo, Callable tryCreateTable, - BiConsumer>, Boolean> initializeContexts, + Consumer>> initializeContexts, Consumer>> clearClients, + ValueState streamName, ValueState streamOffset, MultiOutputReceiver o) { // The first context is always the one that fails. @@ -659,15 +659,6 @@ private void handleAppendFailure( throw new RuntimeException(e); } - if (!quotaError) { - // For known errors (offset mismatch, not found) we must reestablish - // the streams. - // However we've seen that doing this fixes random stuckness issues by reestablishing - // gRPC connections, - // so we close the clients for all non-quota errors. - - clearClients.accept(failedContexts); - } appendFailures.inc(); int retriedRows = failedContext.protoRows.getSerializedRowsCount(); BigQuerySinkMetrics.appendRowsRowStatusCounter( @@ -722,11 +713,24 @@ private void handleAppendFailure( // Finalize the stream and clear streamName so a new stream will be created. o.get(flushTag) .output(KV.of(failedContext.streamName, new Operation(failedContext.offset - 1, true))); + + // Clear streamName so a new stream will be created. + try { + streamName.write(""); + } catch (Exception e) { + throw new RuntimeException(e); + } + + // Re-establish the client with the new stream. + clearClients.accept(failedContexts); + // Reinitialize all contexts with the new stream and new offsets. - initializeContexts.accept(failedContexts, true); + initializeContexts.accept(failedContexts); // Offset failures imply that all subsequent parallel appends will also fail. // Retry them all. + } else if (!quotaError) { + clearClients.accept(failedContexts); } } @@ -912,13 +916,9 @@ public void process( // Initialize stream names and offsets for all contexts. This will be called initially, but // will also be called if we roll over to a new stream on a retry. - BiConsumer>, Boolean> initializeContexts = - (contexts, isFailure) -> { + Consumer>> initializeContexts = + (contexts) -> { try { - if (isFailure) { - // Clear the stream name, forcing a new one to be created. - streamName.write(""); - } String streamNameRead = Preconditions.checkArgumentNotNull(streamName.read()); long currentOffset = Preconditions.checkArgumentNotNull(streamOffset.read()); for (AppendRowsContext context : contexts) { @@ -968,6 +968,7 @@ public void process( tryCreateTable, initializeContexts, clearClients, + streamName, streamOffset, o); return RetryType.RETRY_ALL_OPERATIONS; @@ -1068,7 +1069,7 @@ public void process( Iterable> contexts = retryManager.getRemainingContexts(); if (numAppends > 0) { - initializeContexts.accept(contexts, false); + initializeContexts.accept(contexts); retryManager.run(true); appendSplitDistribution.update(numAppends);