-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Support Filter Pushdown in Spark Structured Streaming #16210
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,13 +23,17 @@ | |
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.stream.Collectors; | ||
| import org.apache.iceberg.expressions.Binder; | ||
| import org.apache.iceberg.expressions.Expression; | ||
| import org.apache.iceberg.expressions.Expressions; | ||
| import org.apache.iceberg.io.CloseableIterable; | ||
| import org.apache.iceberg.io.CloseableIterator; | ||
| import org.apache.iceberg.io.FileIO; | ||
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Iterables; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||
| import org.apache.iceberg.types.Types; | ||
| import org.apache.iceberg.util.Pair; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
@@ -58,11 +62,39 @@ public static CloseableIterable<FileScanTask> openManifestFile( | |
| Snapshot snapshot, | ||
| ManifestFile manifestFile, | ||
| boolean scanAllFiles) { | ||
| return openManifestFileWithFilter( | ||
| io, specsById, caseSensitive, snapshot, manifestFile, scanAllFiles, Lists.newArrayList()); | ||
| } | ||
|
|
||
| public static CloseableIterable<FileScanTask> openManifestFileWithFilter( | ||
| FileIO io, | ||
| Map<Integer, PartitionSpec> specsById, | ||
| boolean caseSensitive, | ||
| Snapshot snapshot, | ||
| ManifestFile manifestFile, | ||
| boolean scanAllFiles, | ||
| List<Expression> pushedFilters) { | ||
|
|
||
| // 1. Get the field IDs used in the partition spec | ||
| Expression partitionExpr = Expressions.alwaysTrue(); | ||
| Expression dataExpr = Expressions.alwaysTrue(); | ||
|
|
||
| for (Expression filter : pushedFilters) { | ||
| if (isPartitionOnly( | ||
| filter, specsById.values().iterator().next().partitionType(), caseSensitive)) { | ||
| partitionExpr = Expressions.and(partitionExpr, filter); | ||
| } else { | ||
| dataExpr = Expressions.and(dataExpr, filter); | ||
| } | ||
| } | ||
|
|
||
| ManifestGroup manifestGroup = | ||
| new ManifestGroup(io, ImmutableList.of(manifestFile)) | ||
| .specsById(specsById) | ||
| .caseSensitive(caseSensitive); | ||
| .caseSensitive(caseSensitive) | ||
| .filterPartitions(partitionExpr) | ||
| .filterData(dataExpr); | ||
|
|
||
| if (!scanAllFiles) { | ||
| manifestGroup = | ||
| manifestGroup | ||
|
|
@@ -76,6 +108,20 @@ public static CloseableIterable<FileScanTask> openManifestFile( | |
| return manifestGroup.planFiles(); | ||
| } | ||
|
|
||
| // 2. The Helper Method using Iceberg's core Visitor | ||
| private static boolean isPartitionOnly( | ||
| Expression expr, Types.StructType partitionType, boolean caseSensitive) { | ||
| try { | ||
| // If this doesn't throw an error, it means the filter | ||
| // only uses columns present in the partition schema. | ||
| Binder.bind(partitionType, expr, caseSensitive); | ||
| return true; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not a good idea to do exception-driven control flow for normal operations. The code base already has |
||
| } catch (org.apache.iceberg.exceptions.ValidationException e) { | ||
| // Filter references columns NOT in the partition spec | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Method to index the data files for each manifest. For example, if manifest m1 has 3 data files, | ||
| * manifest m2 has 2 data files, manifest m3 has 1 data file, then the index will be (m1, 0), (m2, | ||
|
|
@@ -209,11 +255,22 @@ public MicroBatch generate(long startFileIndex, long targetSizeInBytes, boolean | |
| Iterables.size( | ||
| SnapshotChanges.builderFor(snapshot, io, specsById).build().addedDataFiles()), | ||
| targetSizeInBytes, | ||
| scanAllFiles); | ||
| scanAllFiles, | ||
| Lists.newArrayList()); | ||
| } | ||
|
|
||
| public MicroBatch generate( | ||
| long startFileIndex, long endFileIndex, long targetSizeInBytes, boolean scanAllFiles) { | ||
| return generate( | ||
| startFileIndex, endFileIndex, targetSizeInBytes, scanAllFiles, Lists.newArrayList()); | ||
| } | ||
|
|
||
| public MicroBatch generate( | ||
| long startFileIndex, | ||
| long endFileIndex, | ||
| long targetSizeInBytes, | ||
| boolean scanAllFiles, | ||
| List<Expression> pushedFilters) { | ||
| Preconditions.checkArgument(endFileIndex >= 0, "endFileIndex is unexpectedly smaller than 0"); | ||
| Preconditions.checkArgument( | ||
| startFileIndex >= 0, "startFileIndex is unexpectedly smaller than 0"); | ||
|
|
@@ -225,7 +282,8 @@ public MicroBatch generate( | |
| startFileIndex, | ||
| endFileIndex, | ||
| targetSizeInBytes, | ||
| scanAllFiles); | ||
| scanAllFiles, | ||
| pushedFilters); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -247,7 +305,8 @@ private MicroBatch generateMicroBatch( | |
| long startFileIndex, | ||
| long endFileIndex, | ||
| long targetSizeInBytes, | ||
| boolean scanAllFiles) { | ||
| boolean scanAllFiles, | ||
| List<Expression> pushedFilters) { | ||
| if (indexedManifests.isEmpty()) { | ||
| return new MicroBatch( | ||
| snapshot.snapshotId(), startFileIndex, endFileIndex, 0L, Collections.emptyList(), true); | ||
|
|
@@ -262,13 +321,14 @@ private MicroBatch generateMicroBatch( | |
| currentFileIndex = indexedManifests.get(idx).second(); | ||
|
|
||
| try (CloseableIterable<FileScanTask> taskIterable = | ||
| openManifestFile( | ||
| openManifestFileWithFilter( | ||
| io, | ||
| specsById, | ||
| caseSensitive, | ||
| snapshot, | ||
| indexedManifests.get(idx).first(), | ||
| scanAllFiles); | ||
| scanAllFiles, | ||
| pushedFilters); | ||
| CloseableIterator<FileScanTask> taskIter = taskIterable.iterator()) { | ||
| while (taskIter.hasNext()) { | ||
| FileScanTask task = taskIter.next(); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -34,6 +34,7 @@ | |
| import org.apache.iceberg.SchemaParser; | ||
| import org.apache.iceberg.Snapshot; | ||
| import org.apache.iceberg.Table; | ||
| import org.apache.iceberg.expressions.Expression; | ||
| import org.apache.iceberg.hadoop.HadoopFileIO; | ||
| import org.apache.iceberg.io.CloseableIterable; | ||
| import org.apache.iceberg.io.FileIO; | ||
|
|
@@ -57,6 +58,7 @@ | |
| import org.slf4j.LoggerFactory; | ||
|
|
||
| public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerAvailableNow { | ||
| private static final Logger LOGGER = LoggerFactory.getLogger(SparkMicroBatchStream.class); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This class already has a logger called |
||
| private static final Joiner SLASH = Joiner.on("/"); | ||
| private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class); | ||
| private static final Types.StructType EMPTY_GROUPING_KEY_TYPE = Types.StructType.of(); | ||
|
|
@@ -77,6 +79,7 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA | |
| private final int maxFilesPerMicroBatch; | ||
| private final int maxRecordsPerMicroBatch; | ||
| private final boolean cacheDeleteFilesOnExecutors; | ||
| private final List<Expression> filters; | ||
| private SparkMicroBatchPlanner planner; | ||
| private StreamingOffset lastOffsetForTriggerAvailableNow; | ||
|
|
||
|
|
@@ -86,6 +89,7 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA | |
| Supplier<FileIO> fileIO, | ||
| SparkReadConf readConf, | ||
| Schema projection, | ||
| List<Expression> filters, | ||
| String checkpointLocation) { | ||
| this.table = table; | ||
| this.fileIO = fileIO; | ||
|
|
@@ -102,11 +106,13 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsTriggerA | |
| this.maxFilesPerMicroBatch = readConf.maxFilesPerMicroBatch(); | ||
| this.maxRecordsPerMicroBatch = readConf.maxRecordsPerMicroBatch(); | ||
| this.cacheDeleteFilesOnExecutors = readConf.cacheDeleteFilesOnExecutors(); | ||
| this.filters = filters; | ||
|
|
||
| InitialOffsetStore initialOffsetStore = | ||
| new InitialOffsetStore( | ||
| table, checkpointLocation, fromTimestamp, sparkContext.hadoopConfiguration()); | ||
| this.initialOffset = initialOffsetStore.initialOffset(); | ||
| LOGGER.error("[jalpan] creating micro batch with filter {} ", filters); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove these? Also below on L137 |
||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -128,6 +134,7 @@ public Offset latestOffset() { | |
|
|
||
| @Override | ||
| public InputPartition[] planInputPartitions(Offset start, Offset end) { | ||
| LOGGER.error("[jalpan] planning input partitions micro batch with filter {} ", filters); | ||
| Preconditions.checkArgument( | ||
| end instanceof StreamingOffset, "Invalid end offset: %s is not a StreamingOffset", end); | ||
| Preconditions.checkArgument( | ||
|
|
@@ -209,10 +216,11 @@ private void initializePlanner(StreamingOffset startOffset, StreamingOffset endO | |
| if (readConf.asyncMicroBatchPlanningEnabled()) { | ||
| this.planner = | ||
| new AsyncSparkMicroBatchPlanner( | ||
| table, readConf, startOffset, endOffset, lastOffsetForTriggerAvailableNow); | ||
| table, readConf, startOffset, endOffset, lastOffsetForTriggerAvailableNow, filters); | ||
| } else { | ||
| this.planner = | ||
| new SyncSparkMicroBatchPlanner(table, readConf, lastOffsetForTriggerAvailableNow); | ||
| new SyncSparkMicroBatchPlanner( | ||
| table, readConf, lastOffsetForTriggerAvailableNow, filters); | ||
| } | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tables with partition evolution will have multiple specs. The code here is grabbing an arbitrary partition spec. Take a look at
ExpressionUtil.selectsPartitions()for an example.