Refactor Arrow extraction to follow the RecordExtractor contract#18434
Refactor Arrow extraction to follow the RecordExtractor contract#18434Jackie-Jiang merged 2 commits intoapache:masterfrom
Conversation
2c0ea0f to
796e0ed
Compare
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #18434 +/- ##
============================================
+ Coverage 63.57% 63.64% +0.06%
Complexity 1717 1717
============================================
Files 3252 3252
Lines 199132 199132
Branches 30875 30875
============================================
+ Hits 126596 126729 +133
+ Misses 62454 62312 -142
- Partials 10082 10091 +9
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR refactors Arrow ingestion to align with Pinot’s RecordExtractor contract by introducing a schema-driven ArrowRecordExtractor (extending BaseRecordExtractor) and removing the bespoke ArrowToGenericRowConverter. It also updates Arrow decoding behavior to return null for empty batches, write directly into the destination for single-row batches, and wrap multi-row batches under GenericRow.MULTIPLE_RECORDS_KEY.
Changes:
- Add
ArrowRecordExtractor+ArrowRecordExtractorConfig(includingextractRawTimeValues) and wire them intoArrowRecordReaderandArrowMessageDecoder. - Remove
ArrowToGenericRowConverterand migrate tests to contract-based extraction coverage (ArrowRecordExtractorTest) while slimming decoder tests. - Update decoder output shape based on Arrow batch row count (0 → null, 1 → direct, >1 → MULTIPLE_RECORDS_KEY list).
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| pinot-plugins/pinot-input-format/pinot-arrow/src/main/java/org/apache/pinot/plugin/inputformat/arrow/ArrowMessageDecoder.java | Switch decoder to use ArrowRecordExtractor; implement 0/1/multi-row output shapes; add extractor/config plugin wiring. |
| pinot-plugins/pinot-input-format/pinot-arrow/src/main/java/org/apache/pinot/plugin/inputformat/arrow/ArrowRecordExtractor.java | New schema-driven extractor implementing Pinot’s extraction contract across Arrow scalar/temporal/complex types. |
| pinot-plugins/pinot-input-format/pinot-arrow/src/main/java/org/apache/pinot/plugin/inputformat/arrow/ArrowRecordExtractorConfig.java | New extractor config supporting extractRawTimeValues. |
| pinot-plugins/pinot-input-format/pinot-arrow/src/main/java/org/apache/pinot/plugin/inputformat/arrow/ArrowRecordReader.java | Replace per-row conversion with ArrowRecordExtractor and bind reader-scoped state via setReader. |
| pinot-plugins/pinot-input-format/pinot-arrow/src/main/java/org/apache/pinot/plugin/inputformat/arrow/ArrowRecordReaderConfig.java | Extend reader config to carry extractRawTimeValues through to the extractor. |
| pinot-plugins/pinot-input-format/pinot-arrow/src/main/java/org/apache/pinot/plugin/inputformat/arrow/ArrowToGenericRowConverter.java | Deleted in favor of ArrowRecordExtractor. |
| pinot-plugins/pinot-input-format/pinot-arrow/src/test/java/org/apache/pinot/plugin/inputformat/arrow/ArrowRecordExtractorTest.java | New comprehensive per-type contract tests (including raw-time mode, complex types, dictionaries, include-list). |
| pinot-plugins/pinot-input-format/pinot-arrow/src/test/java/org/apache/pinot/plugin/inputformat/arrow/ArrowMessageDecoderTest.java | Slim decoder tests to lifecycle/edge-cases + row-count branching behavior. |
| pinot-plugins/pinot-input-format/pinot-arrow/src/test/java/org/apache/pinot/plugin/inputformat/arrow/ArrowRecordReaderTest.java | Remove converter-specific assertions and rely on contract-shaped outputs (MV as Object[]). |
| pinot-plugins/pinot-input-format/pinot-arrow/src/test/java/org/apache/pinot/plugin/inputformat/arrow/ArrowTestDataUtils.java | New minimal test payload helper for decoder-focused tests. |
| pinot-plugins/pinot-input-format/pinot-arrow/src/test/java/org/apache/pinot/plugin/inputformat/arrow/util/ArrowTestDataUtil.java | Deleted legacy test utility. |
796e0ed to
7227fd7
Compare
Introduce ArrowRecordExtractor (extends BaseRecordExtractor) with schema-driven dispatch by ArrowTypeID; drop the bespoke ArrowToGenericRowConverter. The reader and decoder bind reader-scoped state once via setReader(ArrowReader), which caches the dictionary map and pre-resolves the include list against the VectorSchemaRoot. Add ArrowRecordExtractorConfig with extractRawTimeValues — matches the Avro / Parquet flag; Date / Time / Timestamp surface as raw int / long in the schema's unit instead of the contract Java type. ArrowMessageDecoder.decode now branches on row count: 0 → null, 1 → fields populated directly into destination, >1 → wrapped under GenericRow.MULTIPLE_RECORDS_KEY. Bug fixes vs the prior converter: - DateDayVector returns Integer (not LocalDateTime); old cast threw at runtime. - UInt2Vector returns Character (not a Number); old code passed it through unchanged, violating the Int(16) -> Integer contract. - UInt1Vector was sign-extended (200 -> -56) instead of zero-extended. All three are now schema-aware. Add ArrowRecordExtractorTest covering every Arrow vector type, raw and contract modes, complex types, dictionary encoding, and include-list filtering. Slim ArrowMessageDecoderTest to decoder-specific concerns; drop the redundant type-coverage tests.
7227fd7 to
5a97afc
Compare
|
Opened a follow-up docs PR for this change: pinot-contrib/pinot-docs#802 |
Summary
Introduce
ArrowRecordExtractor(extendsBaseRecordExtractor) with schema-driven dispatch byArrowTypeID; drop the bespokeArrowToGenericRowConverter. The reader and decoder bind reader-scoped state viasetReader(ArrowReader)(caches the dictionary map and pre-resolves the include list) and per-batch state viaprepareBatch(Record)(decodes each dictionary-encoded column once into theRecordfor reuse across the batch's rows).Add
ArrowRecordExtractorConfigwithextractRawTimeValues— matches the Avro / Parquet flag;Date/Time/Timestampsurface as rawint/longin the schema's unit instead of the contract Java type.ArrowMessageDecoder.decodenow branches on row count:0→null1→ fields populated directly into the destination>1→ wrapped underGenericRow.MULTIPLE_RECORDS_KEYThe decoder also validates that a plugin-supplied extractor class extends
ArrowRecordExtractor(so the per-batchsetReader/prepareBatchhooks are honored) and fails with a clear error if not.Bug fixes vs the prior converter
DateDayVectorreturnsInteger(notLocalDateTime); the old code cast unconditionally toLocalDateTimeand would throw at runtime forDateDaycolumns.UInt2VectorreturnsCharacter(not aNumber); the old code passed it through unchanged, violating theInt(16) → Integercontract.UInt1Vectorwas sign-extended (200 → -56) instead of zero-extended.ArrowType.Int.getIsSigned()/ArrowType.Date.getUnit()).extractValue, O(N²) in batch size).Tests
ArrowRecordExtractorTestcovering every Arrow vector type, raw and contract modes, complex types (List,Struct,Map), dictionary encoding, and include-list filtering. Each test runs through a realArrowStreamWriter→ArrowStreamReaderIPC roundtrip sosetReader/prepareBatchare exercised against an actualArrowReader(no mocks).ArrowMessageDecoderTestslimmed to decoder-specific concerns (lifecycle, error handling, empty / single / multi-row batch shapes).ArrowRecordReaderTestkeeps the inheritedAbstractRecordReaderTestround-trip; redundant filter test removed.