Skip to content

[format] Add Mosaic file format for wide tables#7829

Closed
JingsongLi wants to merge 5 commits into
apache:masterfrom
JingsongLi:fast_format
Closed

[format] Add Mosaic file format for wide tables#7829
JingsongLi wants to merge 5 commits into
apache:masterfrom
JingsongLi:fast_format

Conversation

@JingsongLi
Copy link
Copy Markdown
Contributor

Purpose

Mosaic is a columnar-bucket hybrid format optimized for wide tables (10,000+ columns). Columns are hashed into buckets by name, stored row-oriented within each bucket, and independently compressed. This enables efficient projection pushdown at bucket granularity.

Mosaic is a columnar-bucket hybrid format optimized for wide tables
(10,000+ columns). Columns are hashed into buckets by name, stored
row-oriented within each bucket, and independently compressed. This
enables efficient projection pushdown at bucket granularity.
Copy link
Copy Markdown
Contributor

@leaves12138 leaves12138 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. The overall direction looks good for the intended very-wide-table workload.

Design summary as I understand it:

  • Mosaic hashes columns by field name into a configurable number of column buckets, then writes each row group as a sequence of non-empty bucket blocks.
  • Inside each bucket block, rows are encoded row-wise: a row-size varint table, then per-row null bitmap plus non-null column values in bucket-local column order.
  • Bucket blocks and the schema block are compressed independently, and the row-group index stores per-bucket offsets and compressed/uncompressed sizes so the reader can load only the buckets needed by the projected columns.
  • This makes the format a useful middle ground for 10k+ column tables: much less per-column metadata than Parquet/ORC, and projection pushdown at bucket granularity instead of full-row reads.

I found one correctness issue that I think should be fixed before merge:

  1. MosaicReader can expose a reused Zstd decompression buffer through returned string values.

    In MosaicReader.readBatch, the last required Zstd bucket uses bucketData = decompressBuf instead of a stable per-bucket copy. MosaicBucketReader then creates strings with BinaryString.fromBytes(r.data, r.pos, len), which wraps the backing byte array. RecordReader.readBatch() explicitly allows the returned iterator and contained objects to be held for some time, so the reader should not reuse backing storage that is visible through returned rows.

    I verified this with a temporary repro: set writeBatchMemory to 1 byte to force two row groups, write two single-string rows, keep the BinaryString from the first readBatch, then call the second readBatch. The first string changes from first_value to second_valx after the second batch decompresses into the same decompressBuf.

    Suggested fix: do not hand the reusable decompressBuf to MosaicBucketReader. Either always copy the decompressed bytes for every bucket before init, or better, allocate an exact per-bucket byte array and decompress directly into it. For COMPRESSION_NONE, read directly into an exact bucketData array instead of reading into compressedBuf and copying. Please also add a multi-row-group test with a projected string column that retains the first batch's value while reading the next batch.

A smaller spec/doc mismatch:

  1. docs/content/concepts/spec/mosaic.md lists type ids 18/19 for VARIANT and BLOB, and the limitations only mention complex types. The Java implementation rejects VARIANT and BLOB in MosaicFileFormat.validateFieldType, and MosaicTypes only registers readers for ids 0..17. Please either remove those ids from the spec / mention them as unsupported, or implement them.

Compression and performance suggestions (non-blocking):

  • MosaicWriter.reachTargetSize currently estimates the open file size as out.getPos() + currentBufferedSize * 0.3. This is workload- and compression-dependent, and it is clearly wrong for COMPRESSION_NONE. Consider tracking the observed compressed/raw ratio from flushed row groups, with 1.0 for no compression, and use that adaptive ratio for the current buffered row group.
  • The default 100 buckets is a reasonable first cut for 10k columns (~100 columns/bucket), but it fixes the read amplification for a single projected column at roughly one bucket. It would be useful to benchmark/document the bucket-count trade-off: more buckets reduce projection read amplification but reduce compression context and increase index/metadata overhead. A target-columns-per-bucket heuristic or stronger tuning guidance would help users choose between compression ratio and projection latency.
  • After the buffer-lifetime fix, there is an opportunity to avoid extra copies in the read path by decompressing directly into the final per-bucket byte array. That should preserve correctness and reduce CPU/memory bandwidth versus decompressBuf + Arrays.copyOf.
  • The format should be documented as optimized for sparse/few-row or very-wide string-heavy workloads. For dense numeric or many-row scans, the row-wise bucket encoding lacks Parquet/ORC-style per-column encodings/statistics and may not compress or scan as well.

Validation run locally:

  • mvn -pl paimon-format -Pfast-build -Dtest=MosaicFileFormatTest test passed.
  • mvn -pl paimon-core -Pfast-build -Dtest=DataEvolutionFileStoreScanTest,FilesTableTest,PartitionsTableTest,TableCommitTest test passed.
  • PyPaimon tests could not be run in this container because pytest / pyarrow are not installed.

…and compression ratio

- Fix decompressBuf reuse bug: allocate per-bucket byte arrays instead
  of sharing a reusable decompression buffer, since BinaryString wraps
  the backing array and callers may retain values across row groups.
- Remove VARIANT/BLOB type IDs from spec doc (unsupported in code).
- Replace hardcoded 0.3 compression ratio in reachTargetSize with
  adaptive ratio tracked from flushed row groups (1.0 for no compression).
- Add testMultiRowGroupStringStability regression test.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@leaves12138 leaves12138 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. I rechecked the latest head 5fbe646.

The previous blocker is fixed: MosaicReader now materializes a stable per-bucket byte array for both none and zstd, so returned BinaryString values no longer wrap a reusable decompression buffer. The newly added multi-row-group string stability test covers the repro case I used in the previous review.

The spec mismatch is also fixed by removing the unsupported VARIANT / BLOB type IDs and documenting them as unsupported. The adaptive compressionRatio in reachTargetSize is a reasonable improvement over the previous fixed 0.3 estimate; further bucket-count tuning can remain a follow-up benchmark/documentation topic.

Local validation:

  • mvn -pl paimon-format -Pfast-build -Dtest=MosaicFileFormatTest test passed, 10 tests.
  • mvn -pl paimon-core -Pfast-build -Dtest=DataEvolutionFileStoreScanTest,FilesTableTest,PartitionsTableTest,TableCommitTest test passed, 43 tests.

LGTM from my side, pending the remaining GitHub CI checks.

Copy link
Copy Markdown
Contributor

@leaves12138 leaves12138 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rechecked the latest head 050a7a9. The new column-oriented bucket layout plus ALL_NULL / CONST / DICT encodings is a clear compression-ratio improvement for the target very-wide sparse-table workload, and the added tests cover the basic round trips well. I also ran mvn -pl paimon-format -Pfast-build -Dtest=MosaicFileFormatTest test locally, and it passed 29 tests.

I still have a few concerns about whether the compression strategy is efficient/general enough:

  1. DICT selection should be cost-based, not only cardinality-based.

    The current rule chooses DICT whenever there are 2..255 distinct non-null serialized values. That can make fixed-width or mostly-unique small values larger than PLAIN. For example, 200 distinct INT values appearing once each are roughly 800 bytes as plain payload, but dictionary encoding needs about varint(200) + 200 * 4 + 200 = 1002 bytes before Zstd, excluding common flags/bitmaps. TINYINT / SMALLINT are even easier to regress. I think the writer should compute the candidate encoded size and choose DICT only when it is actually smaller than PLAIN (maybe with a small margin), e.g. compare varint(numEntries) + sum(dictEntryBytes) + nonNullCount against valueBufPos[i].

  2. Dictionary tracking has non-trivial write-side CPU/heap overhead.

    writeRow creates a new copied ByteKey for every non-null cell while the column is still a dictionary candidate. For wide dense rows, columns that eventually fall back to PLAIN still pay up to 256 serialized-value copies and hash-map entries per column. It would be better to disable dictionary tracking as soon as the column cannot/will not use it, and ideally use cheaper primitive tracking for fixed-width primitive types instead of serialized byte-array copies. A per-column byte/entry budget would also make this more robust for long variable-width values.

  3. currentBufferedSize underestimates sparse/all-null memory pressure.

    After this change, MosaicBucketWriter.writeRow returns only the serialized non-null value bytes. The per-column null bitmaps and dictionary tracking memory are not counted. For an all-null or very sparse wide table, currentBufferedSize can stay near zero, so rowGroupMaxSize may not trigger a flush even though the in-memory null bitmaps grow with numColumns * numRows / 8. This matters for the exact sparse wide-table workload Mosaic is targeting. Please consider including null-bitmap growth / encoding metadata in the buffered-size estimate, or adding a separate max-row / memory-budget guard. A lazy bitmap representation for columns that are still all-null would also help.

  4. A generality trade-off: the current dictionary format is limited to 1-byte indices.

    This is simple and good for small-cardinality columns, but it misses cases like repeated long strings with 256+ distinct values where a 2-byte/varint dictionary index could still be much smaller than plain. I am fine with keeping 1-byte dictionaries for the first version, but then the docs should describe this as a deliberate limitation and the encoding decision should still be cost-driven.

A small spec nit: the docs say the has-nulls flag means a null bitmap exists, but ALL_NULL columns currently set hasNulls = true while no bitmap is written. Either clear the flag for ALL_NULL, or explicitly document that ALL_NULL is the exception.

So overall: the direction looks good and the correctness tests pass, but I would prefer to make the DICT decision cost-based and fix the buffered-size/memory estimate before treating these encodings as generally efficient.

Copy link
Copy Markdown
Contributor

@leaves12138 leaves12138 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. I rechecked the latest head a4eb906.

The main concerns from my previous review are mostly addressed now:

  • DICT selection is now cost-based instead of cardinality-only.
  • ALL_NULL now clears the has-nulls flag and the spec is aligned.
  • The buffered-size estimate now includes the per-row null-bitmap overhead.
  • The docs explicitly call out the 1-byte dictionary-index limitation.

I also reran mvn -pl paimon-format -Pfast-build -Dtest=MosaicFileFormatTest test locally, and it passed 29 tests.

One important remaining generality/compression concern: MAX_DICT_VALUE_BYTES = 256 currently disables both DICT and CONST for long variable-width values.

In writeRow, if a serialized value is larger than 256 bytes, byteDictMaps[i] is set to null immediately. Later finish uses getDictSize(i) to choose CONST / DICT, so even a column where every row has the same 1KB string will fall back to PLAIN instead of CONST. The same applies to a small number of repeated long strings, where dictionary encoding could be much smaller than plain. This makes the encoding less general for STRING / BINARY / large DECIMAL columns, which are exactly common variable-width cases.

I think const detection should be decoupled from dictionary tracking/budgeting. For example:

  • Keep a separate first-value equality tracker for CONST, regardless of value length (or at least with a much higher/safe budget).
  • For DICT, prefer a cumulative dictionary-byte budget plus the existing final cost check, rather than abandoning solely because one entry is over 256 bytes.
  • Add tests for a long constant string and a few repeated long strings, so this does not regress.

A smaller performance/doc nit: Map<Long, Integer> is much better than copying ByteKey for fixed-width types, but it is not really zero-allocation because Long / Integer boxing and HashMap nodes are still allocated. For very wide dense high-cardinality row groups, this can still allocate up to 256 map entries per column before abandonment. A primitive long -> int tracker (or a small custom open-addressed map) would make the “zero-allocation” claim true; otherwise the comment/docs should avoid that wording.

So from my side the latest version is much closer. I would still fix the long variable-width CONST / DICT issue before considering the compression algorithms generally efficient and reusable.

Copy link
Copy Markdown
Contributor

@leaves12138 leaves12138 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick update. I rechecked the latest head 8149723.

The remaining issue from my last review is addressed now: CONST detection is independent of dictionary tracking, so long constant variable-width values can still use CONST; variable-width DICT tracking is now bounded by a cumulative dictionary-byte budget instead of a per-value 256-byte cutoff. The spec was updated accordingly, and the new long-string tests cover the added paths.

Local validation:

  • mvn -pl paimon-format -Pfast-build -Dtest=MosaicFileFormatTest test passed, 32 tests.

I do not see a correctness blocker now. Two small non-blocking follow-ups:

  1. The new long-string tests mainly verify round-trip correctness. A PLAIN fallback would also pass those assertions. It would be stronger to assert the expected compression effect/file-size bound, or parse the bucket encoding flags in the test to ensure the intended CONST / DICT path is actually selected. testLongConstantString already computes fileSize but does not assert it.
  2. MAX_DICT_TOTAL_BYTES = 16384 is a reasonable guard for write-side memory, but it is still a hard-coded trade-off: repeated long strings with more than 16KB of distinct dictionary entries will fall back to PLAIN even if dictionary encoding would be much smaller. I am fine keeping it for the first version, but it may be worth benchmarking/tuning or making configurable later.

LGTM from my side.

@JingsongLi JingsongLi closed this May 13, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants