Support for 'time' type in Iceberg#1761
Conversation
|
@codex review |
|
Codex Review: Didn't find any major issues. Hooray! ℹ️ About Codex in GitHubYour team has set up Codex to review pull requests in this repo. Reviews are triggered when you
If Codex has suggestions, it will comment; otherwise it will react with 👍. Codex can also answer questions or update the PR. Try commenting "@codex address that feedback". |
Audit: PR #1761 — Iceberg
|
Audit: PR #1761 — Support for
|
| File | Net effect |
|---|---|
src/Processors/Formats/Impl/AvroRowInputFormat.cpp |
insertNumber: dispatch Time64 decimal insert. createDeserializeFn: target.isTime64() → createDecimalDeserializeFn<DataTypeTime64>. avroNodeToDataType: map AVRO_INT + TIME_MILLIS → Time64(3), AVRO_LONG + TIME_MICROS → Time64(6). |
src/Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.cpp |
Iceberg time type → DataTypeTime64(6) (was DataTypeInt64). |
src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp |
getIcebergType: add TypeIndex::Time64 → ("time", true). getAvroType: specialize TypeIndex::Time64 by scale (0/3 → "int", 6 → "long", else BAD_ARGUMENTS). New getAvroLogicalType (scale 0 → empty, 3 → "time-millis", 6 → "time-micros", else BAD_ARGUMENTS). |
src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h |
Declare getAvroLogicalType. |
src/Storages/ObjectStorage/DataLakes/Iceberg/Constant.h |
DEFINE_ICEBERG_FIELD(logicalType). |
src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp |
canDumpIcebergStats and dumpFieldToBytes: add Time/Time64. extendSchemaForPartitions: emit {type, logicalType} object instead of plain type when the logical type is non-empty. generateManifestFile: for isTime64() partition columns, build an Avro NodePrimitive(AVRO_LONG) with LogicalType::TIME_MILLIS/TIME_MICROS and wrap the Decimal64 value in a GenericDatum(node, value). |
src/Processors/Formats/Impl/Parquet/PrepareForWrite.cpp |
preparePrimitiveColumn: TypeIndex::Time → INT32 / UINT_32 / int_type(32,false); TypeIndex::Time64 → INT64 / TIME logicalType with TimeUnit::MILLIS/MICROS/NANOS depending on scale, plus state.datetime_multiplier to rescale from CH scale to the chosen Parquet unit. Throws on scale > 9. |
src/Processors/Formats/Impl/Parquet/Write.cpp |
Generalize ConverterDateTime64WithMultiplier into a template ConverterTimeType64WithMultiplierImpl<TYPE>; expose ConverterDateTime64WithMultiplier and new ConverterTime64WithMultiplier. writeColumnChunkBody: add case TypeIndex::Time64 choosing ConverterNumeric when datetime_multiplier == 1, else the multiplier-aware converter. |
tests/integration/test_database_iceberg/test.py |
New test_partitioning_by_time and test_partitioning_by_string (string partition + time non-partition value) using REST catalog + PyIceberg. |
tests/integration/test_storage_iceberg_no_spark/test_write_time.py |
New test_write_time (IcebergLocal + Parquet, Time and Time64(6), partition on key, partition pruning, min/max pruning on a second time column). |
Call graph (in scope)
| Step | Location |
|---|---|
Read Iceberg time schema → CH type |
SchemaProcessor::getSimpleType → DataTypeTime64(6) |
| Read Avro file body | AvroRowInputFormat::insertNumber + createDeserializeFn (decimal pathway for Time64) + avroNodeToDataType (logical type detection) |
| Write CH chunk → Parquet | preparePrimitiveColumn (schema) → writeColumnChunkBody (data, with multiplier rescale) |
| Write Iceberg metadata schema | getIcebergType(DataTypeTime64) → "time" |
| Write Iceberg manifest schema (Avro) | extendSchemaForPartitions → {type, logicalType} for partition columns with non-empty logical |
| Write Iceberg manifest data (Avro) | generateManifestFile → NodePrimitive(AVRO_LONG) + LogicalType(TIME_MILLIS|TIME_MICROS) per partition value |
| Write Iceberg stats | canDumpIcebergStats / dumpFieldToBytes accept Time/Time64 |
Transition matrix
| Stage | Pre-PR | Post-PR |
|---|---|---|
Read Iceberg time column |
DataTypeInt64 (raw seconds presented as integer) |
DataTypeTime64(6) (HH:MM:SS.ffffff display) — documented user-visible change |
Read Avro int + TIME_MILLIS |
unhandled / default integer | DataTypeTime64(3) |
Read Avro long + TIME_MICROS |
unhandled / default integer | DataTypeTime64(6) |
Write CH Time64(6) partition key |
unsupported (BAD_ARGUMENTS in getIcebergType) |
Iceberg time type; partition Avro datum carries TIME_MICROS logical; Parquet column = INT64 TIME(MICROS); ChunkPartitioner uses Decimal64 values without scale loss |
Write CH Time64(3)/Time64(0) |
unsupported | Iceberg time type; partition Avro = int + time-millis or plain int; Parquet column = INT64 TIME(MILLIS) or INT32 UINT_32 |
Stats dumpFieldToBytes for Time/Time64 |
not handled (would return empty bounds) | Int64 byte-dump produced; bounds participate in min/max pruning |
| Iceberg manifest schema for non-Time partition type | unchanged (plain type string) |
unchanged — getAvroLogicalType returns empty for non-Time/Time64 types |
Logical fault injection
| Category | Outcome |
|---|---|
Read existing Iceberg time column with old CH |
Was Int64 raw seconds |
Read existing Iceberg time column with new CH |
Now Time64(6) display HH:MM:SS.ffffff — behavior change documented in PR body |
Write Time64(6) partition column |
New end-to-end path covered by test_write_time.py and test_database_iceberg::test_partitioning_by_time |
Write Time64(9) (nanos) |
Parquet schema picks NANOS unit; Iceberg metadata still says "time" (Iceberg spec is micros) — getAvroType/getAvroLogicalType both throw BAD_ARGUMENTS for scale > 6 if such column is also a partition column; non-partition columns reach the writer with NANOS Parquet logical type. Iceberg readers that follow the spec (micros only) will misinterpret nanosecond Parquet values. See "Assumptions / limits" below. |
Write Time64(scale > 9) |
preparePrimitiveColumn throws LOGICAL_ERROR "Unexpected Time64 scale" |
Write CH plain Time (no 64) as Iceberg column |
getIcebergType (pre-existing): "time" (= TIME_MICROS per spec). getAvroType (pre-existing): "int". Parquet write (new): INT32 / UINT_32 — no Parquet TIME logical type, no µs rescaling. CH Time stores integer seconds. The on-disk value is therefore seconds while the Iceberg/Parquet declared type implies micros. The read side maps Iceberg time → Time64(6) and reads the Parquet INT32 raw, so 12:00:00 (43200) would surface as Time64(6) = 00:00:00.043200. The new test_write_time test runs both Time and Time64(6) and asserts 12:00:00.000000 for both — these assertions only hold for Time if some unseen rescaling path applies between Parquet read and Time64(6). Verify by running test_storage_iceberg_no_spark/test_write_time.py with time_type=Time. See defect candidate below. |
Plain Time as partition column in generateManifestFile |
The new check is WhichDataType(*partition_types[i]).isTime64() only. Plain Time falls into the generic else branch: partition_record.field(...) = avro::GenericDatum(partition_values[i].safeGet<Decimal64>().getValue()). Plain Time Field storage is Int64, not Decimal64. safeGet<Decimal64> on a non-Decimal Field throws BAD_GET. — Trigger: any partitioned write with a Time (non-64) partition column. See defect candidate below. |
Partition Avro schema (extendSchemaForPartitions) |
New {type, logicalType} object form; standard Avro readers accept both shapes |
| Non-partition columns with Avro write format | getAvroType for Time64 returns "int" (scale 0/3) or "long" (scale 6); other scales throw |
Read-back of Time64 Decimal columns from Avro |
createDecimalDeserializeFn<DataTypeTime64> reuses the Decimal pathway; matches the way DateTime64 is handled |
getDecimalScale on plain Time (called inside new getAvroLogicalType) |
DataTypeTime is not a Decimal-shaped type; getDecimalScale may not return a meaningful value. If it returns 0, the helper returns empty (no logical) — consistent. If it throws, partition Avro schema generation fails for plain Time. Needs verification. |
Confirmed / candidate defects
Candidate 1 (Medium): IcebergWrites.cpp::generateManifestFile — partition by plain Time (DataTypeTime, no 64) calls partition_values[i].safeGet<Decimal64>().
- Impact:
BAD_GETexception on the very first partition value when writing a partitioned Iceberg table whose partition column is plainTime. - Anchor:
src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp::generateManifestFile, theelsebranch afterWhichDataType(*partition_types[i]).isTime64()guard. - Trigger:
CREATE TABLE … (key Time, …) ENGINE = IcebergLocal(…) PARTITION BY key; INSERT …; - Why defect: plain
TimeFieldisInt64-shaped; the partitioner code assumesDecimal64storage (correct forTime64andDateTime64, wrong forTime).test_write_time.pyparametrisestime_typeover both"Time"and"Time64(6)"; the"Time"parametrisation exercises exactly this path. - Fix direction: widen the
isTime64()guard to(isTime() || isTime64())and picksafeGet<UInt32>/safeGet<Int64>for the plainTimebranch. - Regression test: the
time_type="Time"parametrisation oftest_write_time.pyis itself the missing repro; run it locally to confirm whether the assertion above is correct.
Candidate 2 (Medium): Iceberg time is microseconds per spec; plain Time write path emits seconds without rescaling.
- Impact: On-disk Parquet column for plain CH
TimeisINT32 / UINT_32with raw seconds; Iceberg schema claimstime(= µs). Roundtrip through CH reads viaTime64(6)and produces a 10⁶× off value (e.g.12:00:00written as43200reads back as00:00:00.043200). External Iceberg readers will misinterpret the column. - Anchor:
Parquet/PrepareForWrite.cppcase TypeIndex::Time: types(T::INT32, C::UINT_32, int_type(32, false)); break;combined withUtils.cpp::getIcebergTypereturning"time"forTypeIndex::Time. - Trigger:
CREATE TABLE x (c Time) ENGINE = IcebergLocal(...); INSERT INTO x VALUES ('12:00:00'); SELECT * FROM x; - Why defect: Iceberg spec for
timeis unambiguously TIME_MICROS aslong. No multiplier converts seconds → micros for plainTime. - Fix direction: either (a) treat plain
TimelikeTime64(0)and emitINT64 / TIME(MICROS)withdatetime_multiplier = 1_000_000, or (b) reject plainTimeingetIcebergTypeand requireTime64. - Regression test:
time_type="Time"parametrisation intest_write_time.pyplus an external Iceberg reader (PyIceberg) round-trip.
Note (Low): Time64(9) (nanoseconds) — the Parquet writer can encode TIME(NANOS), but Iceberg spec doesn't define a nanosecond time type; the metadata schema still says "time". External readers will treat the Parquet nanos as micros. getAvroType/getAvroLogicalType throw for scale > 6 only on partition columns; non-partition columns with Time64(9) reach disk unflagged. Either reject at getIcebergType or scale-down at the writer.
C++ bug classes (delta)
| Class | Assessment |
|---|---|
safeGet<Decimal64> on non-Decimal Field |
Candidate 1 above — plain Time partition columns |
| Multiplier overflow at scale 6 | OK — datetime_multiplier is 1 because Parquet stores micros directly; for scale 3 it is 1000; both within Int64 range for the time-of-day domain |
Template renaming (ConverterDateTime64WithMultiplier → ConverterTimeType64WithMultiplierImpl) |
Aliases preserved; no caller churn |
Schema-vs-data unit mismatch for plain Time |
Candidate 2 above |
Avro NodePrimitive lifetime |
constructed via std::make_shared; stored in the datum; OK |
| Lock / race / RAII / UB | No new shared mutable state, no new locks |
| Exception safety | New throws are at schema-resolution / planning stages, before data acquisition |
Test review
| Aspect | Note |
|---|---|
test_partitioning_by_time (REST catalog + PyIceberg) |
Exercises read of an Iceberg time column; the new Time64(6) mapping is validated |
test_partitioning_by_string |
Exercises a time column as a non-partition column; pruning on it is asserted |
test_write_time.py (IcebergLocal + Parquet) |
Parametrised over Time and Time64(6); covers partition pruning and min/max pruning. The Time parametrisation is the repro for both defect candidates above |
| No Avro write-format test | The Avro manifest emission path (with NodePrimitive(AVRO_LONG) + LogicalType::TIME_MICROS) is only exercised indirectly; consider adding a test using 'Avro' as the data file format |
Audit update for PR #1761 (Iceberg time type, read and write)
Confirmed defects
Two candidate defects identified that should be confirmed by running the new test suite:
Medium: plain Time (DataTypeTime) partition column triggers BAD_GET in generateManifestFile
- Impact: Partitioned Iceberg writes with a plain
Timepartition column throw immediately. - Anchor:
IcebergWrites.cpp::generateManifestFile,elsearm after theisTime64()guard. - Trigger:
time_type="Time"parametrisation oftest_storage_iceberg_no_spark/test_write_time.py. - Why defect:
safeGet<Decimal64>on a non-DecimalField(plainTimestoresInt64). - Fix direction: widen the guard to
isTime() || isTime64()and pick the rightsafeGet<...>for each shape (or coerce plainTimetoTime64(0)earlier in the schema-handling layer). - Regression test direction: existing
test_write_time.pyparametrisation; ensure it runs in CI for bothTimeandTime64(6).
Medium: plain Time writes seconds where Iceberg spec requires microseconds
- Impact: Iceberg
time-typed columns produced by CH plainTimeare 10⁶× off; external readers misinterpret values; CH round-trip throughTime64(6)returns a 10⁶× off value. - Anchor:
Parquet/PrepareForWrite.cppplainTimemapping (INT32 / UINT_32 / int_type(32,false)) +Utils.cpp::getIcebergType(Time)returning"time". - Trigger: any write where the user's column type is plain
Time(no64). - Why defect: Iceberg spec for
timeis TIME_MICROS; CHTimeis seconds; no multiplier bridges the two. - Fix direction: treat plain
TimelikeTime64(0)at the Parquet schema layer — emitINT64 / TIME(MICROS)withdatetime_multiplier = 1_000_000— or refuse plainTimeingetIcebergType.
Coverage summary
| Item | Detail |
|---|---|
| Scope reviewed | Avro reader (insertNumber, decimal deserialize, avroNodeToDataType); SchemaProcessor::getSimpleType; Utils::getIcebergType / getAvroType / new getAvroLogicalType; Constant.h field; IcebergWrites::canDumpIcebergStats / dumpFieldToBytes / extendSchemaForPartitions / generateManifestFile; Parquet preparePrimitiveColumn and writeColumnChunkBody; both integration tests. |
| Categories failed | Plain Time partition value emission; plain Time ↔ Iceberg micros unit mismatch (both listed above as candidate defects). |
| Categories passed | Time64(6) end-to-end (read + write + partition + pruning); Avro read of TIME_MILLIS / TIME_MICROS; manifest schema dual-shape (type vs {type, logicalType}); stats dumpFieldToBytes Int64-shaped values; multiplier rescaling for Time64(scale<=6); lock / race / RAII / exception-safety delta is clean. |
| Assumptions / limits | Static audit only; the workspace is on a different branch than the PR, so candidate-defect symptoms above are inferred from the diff, not observed at runtime. getDecimalScale behavior on DataTypeTime (vs DataTypeTime64) was not verified — if it throws for plain Time, getAvroLogicalType will throw on any plain-Time partition column, making Candidate 1 a different kind of failure than BAD_GET but still a defect. Time64(9) (nanos) writes via the Parquet NANOS branch produce non-spec Iceberg time data — flagged as a low-severity note. The display change from Int64 seconds to HH:MM:SS.ffffff for existing Iceberg time columns is a documented user-visible behavior change, not a defect. |
References
- PR Support for 'time' type in Iceberg #1761: Support for 'time' type in Iceberg #1761
- PR head commit: 810c6b3
- Related issue: Iceberg TIME type unit changes depending on partition spec in DataLakeCatalog #1535
- Earlier (closed) reads-only PRs: Support for 'time' type in Iceberg, reading only #1546, Antalya 26.1: Support for 'time' type in Iceberg, reading only #1613, Support for 'time' type in Iceberg, read and write #1731
- Iceberg
timetype spec: https://iceberg.apache.org/spec/#schemas-and-data-types (timeis microseconds since midnight)
Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
Support for 'time' type in Iceberg, read and write.
Documentation entry for user-facing changes
Solved #1535
This changes time format,.
Was - seconds from midnight:
Now - time with microseconds
CI/CD Options
Exclude tests:
Regression jobs to run: