diff --git a/flink/v1.20/build.gradle b/flink/v1.20/build.gradle index 2bbad1891c81..62eb8fab44c4 100644 --- a/flink/v1.20/build.gradle +++ b/flink/v1.20/build.gradle @@ -33,6 +33,7 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") { implementation project(':iceberg-hive-metastore') compileOnly libs.flink120.avro + compileOnly libs.joda.time // dropwizard histogram metrics (optional in Flink) compileOnly libs.flink120.metrics.dropwizard compileOnly libs.flink120.streaming.java diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkTypeToType.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkTypeToType.java index 408065f06057..8f106da8d56b 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkTypeToType.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkTypeToType.java @@ -137,11 +137,17 @@ public Type visit(TimeType timeType) { @Override public Type visit(TimestampType timestampType) { + if (timestampType.getPrecision() > 6) { + return Types.TimestampNanoType.withoutZone(); + } return Types.TimestampType.withoutZone(); } @Override public Type visit(LocalZonedTimestampType localZonedTimestampType) { + if (localZonedTimestampType.getPrecision() > 6) { + return Types.TimestampNanoType.withZone(); + } return Types.TimestampType.withZone(); } diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java index 3ef611f2ded5..920e44b24b31 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java @@ -114,19 +114,35 @@ private static PositionalGetter buildGetter(LogicalType logicalType, Type typ case TIMESTAMP_WITHOUT_TIME_ZONE: TimestampType timestampType = (TimestampType) logicalType; - return (row, pos) -> { - LocalDateTime localDateTime = - row.getTimestamp(pos, timestampType.getPrecision()).toLocalDateTime(); - return DateTimeUtil.microsFromTimestamp(localDateTime); - }; + if (type.typeId() == Type.TypeID.TIMESTAMP_NANO) { + return (row, pos) -> { + LocalDateTime localDateTime = + row.getTimestamp(pos, timestampType.getPrecision()).toLocalDateTime(); + return DateTimeUtil.nanosFromTimestamp(localDateTime); + }; + } else { + return (row, pos) -> { + LocalDateTime localDateTime = + row.getTimestamp(pos, timestampType.getPrecision()).toLocalDateTime(); + return DateTimeUtil.microsFromTimestamp(localDateTime); + }; + } case TIMESTAMP_WITH_LOCAL_TIME_ZONE: LocalZonedTimestampType lzTs = (LocalZonedTimestampType) logicalType; - return (row, pos) -> { - TimestampData timestampData = row.getTimestamp(pos, lzTs.getPrecision()); - return timestampData.getMillisecond() * 1000 - + timestampData.getNanoOfMillisecond() / 1000; - }; + if (type.typeId() == Type.TypeID.TIMESTAMP_NANO) { + return (row, pos) -> { + TimestampData timestampData = row.getTimestamp(pos, lzTs.getPrecision()); + return timestampData.getMillisecond() * 1_000_000L + + timestampData.getNanoOfMillisecond(); + }; + } else { + return (row, pos) -> { + TimestampData timestampData = row.getTimestamp(pos, lzTs.getPrecision()); + return timestampData.getMillisecond() * 1000L + + timestampData.getNanoOfMillisecond() / 1000; + }; + } case ROW: RowType rowType = (RowType) logicalType; diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java index 65b9d44ad4b8..3e3a29112cf4 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcReader.java @@ -112,6 +112,13 @@ public OrcValueReader primitive(Type.PrimitiveType iPrimitive, TypeDescriptio } else { return FlinkOrcReaders.timestamps(); } + case TIMESTAMP_NANO: + Types.TimestampNanoType timestampNanoType = (Types.TimestampNanoType) iPrimitive; + if (timestampNanoType.shouldAdjustToUTC()) { + return FlinkOrcReaders.timestampTzs(); + } else { + return FlinkOrcReaders.timestamps(); + } case STRING: return FlinkOrcReaders.strings(); case UUID: diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java index a467d848337d..c1b46252e18a 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java @@ -145,6 +145,13 @@ public OrcValueWriter primitive(Type.PrimitiveType iPrimitive, LogicalType fl } else { return FlinkOrcWriters.timestamps(); } + case TIMESTAMP_NANO: + Types.TimestampNanoType timestampNanoType = (Types.TimestampNanoType) iPrimitive; + if (timestampNanoType.shouldAdjustToUTC()) { + return FlinkOrcWriters.timestampNanoTzs(); + } else { + return FlinkOrcWriters.timestampNanos(); + } case STRING: return FlinkOrcWriters.strings(); case UUID: diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java index 684842aa099c..bf19a46c05fb 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java @@ -70,6 +70,14 @@ static OrcValueWriter timestampTzs() { return TimestampTzWriter.INSTANCE; } + static OrcValueWriter timestampNanos() { + return TimestampNanoWriter.INSTANCE; + } + + static OrcValueWriter timestampNanoTzs() { + return TimestampNanoTzWriter.INSTANCE; + } + static OrcValueWriter decimals(int precision, int scale) { if (precision <= 18) { return new Decimal18Writer(precision, scale); @@ -170,6 +178,35 @@ public void nonNullWrite(int rowId, TimestampData data, ColumnVector output) { } } + private static class TimestampNanoWriter implements OrcValueWriter { + private static final TimestampNanoWriter INSTANCE = new TimestampNanoWriter(); + + @Override + public void nonNullWrite(int rowId, TimestampData data, ColumnVector output) { + TimestampColumnVector cv = (TimestampColumnVector) output; + cv.setIsUTC(true); + // millis + OffsetDateTime offsetDateTime = data.toInstant().atOffset(ZoneOffset.UTC); + cv.time[rowId] = + offsetDateTime.toEpochSecond() * 1_000 + offsetDateTime.getNano() / 1_000_000; + cv.nanos[rowId] = offsetDateTime.getNano(); + } + } + + private static class TimestampNanoTzWriter implements OrcValueWriter { + private static final TimestampNanoTzWriter INSTANCE = new TimestampNanoTzWriter(); + + @SuppressWarnings("JavaInstantGetSecondsGetNano") + @Override + public void nonNullWrite(int rowId, TimestampData data, ColumnVector output) { + TimestampColumnVector cv = (TimestampColumnVector) output; + // millis + Instant instant = data.toInstant(); + cv.time[rowId] = instant.toEpochMilli(); + cv.nanos[rowId] = instant.getNano(); + } + } + private static class Decimal18Writer implements OrcValueWriter { private final int precision; private final int scale; diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/RowDataUtil.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/RowDataUtil.java index f23a7ee3d0d3..81bb55967992 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/RowDataUtil.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/RowDataUtil.java @@ -69,6 +69,8 @@ public static Object convertConstant(Type type, Object value) { return (int) ((Long) value / 1000); case TIMESTAMP: // TimestampData return TimestampData.fromLocalDateTime(DateTimeUtil.timestampFromMicros((Long) value)); + case TIMESTAMP_NANO: + return TimestampData.fromLocalDateTime(DateTimeUtil.timestampFromNanos((Long) value)); case UUID: return UUIDUtil.convert((UUID) value); default: diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java index 34576a1e5c0b..e5e3dd0ff23d 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/data/StructRowData.java @@ -48,6 +48,7 @@ import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.ByteBuffers; +import org.apache.iceberg.util.DateTimeUtil; @Internal public class StructRowData implements RowData { @@ -185,8 +186,27 @@ private BigDecimal getDecimalInternal(int pos) { @Override public TimestampData getTimestamp(int pos, int precision) { + if (precision > 6) { + Object timeVal = struct.get(pos, Object.class); + if (timeVal instanceof OffsetDateTime) { + OffsetDateTime odt = (OffsetDateTime) timeVal; + return TimestampData.fromEpochMillis( + odt.toInstant().toEpochMilli(), odt.getNano() % 1_000_000); + } else if (timeVal instanceof LocalDateTime) { + LocalDateTime ldt = (LocalDateTime) timeVal; + return TimestampData.fromEpochMillis( + ldt.toInstant(ZoneOffset.UTC).toEpochMilli(), ldt.getNano() % 1_000_000); + } else if (timeVal instanceof Long) { + long timeLong = (Long) timeVal; + return TimestampData.fromEpochMillis( + Math.floorDiv(timeLong, 1_000_000L), (int) Math.floorMod(timeLong, 1_000_000L)); + } else { + throw new IllegalStateException("Unknown type for timestamp_ns: " + timeVal.getClass()); + } + } long timeLong = getLong(pos); - return TimestampData.fromEpochMillis(timeLong / 1000, (int) (timeLong % 1000) * 1000); + return TimestampData.fromEpochMillis( + Math.floorDiv(timeLong, 1000L), (int) Math.floorMod(timeLong, 1000L) * 1000); } @Override @@ -257,9 +277,29 @@ private Object convertValue(Type elementType, Object value) { case DECIMAL: return value; case TIMESTAMP: - long millisecond = (long) value / 1000; - int nanoOfMillisecond = (int) ((Long) value % 1000) * 1000; - return TimestampData.fromEpochMillis(millisecond, nanoOfMillisecond); + long timeMillis; + if (value instanceof LocalDateTime localDateTime) { + timeMillis = DateTimeUtil.microsFromTimestamp(localDateTime) / 1000L; + } else if (value instanceof OffsetDateTime offsetDateTime) { + timeMillis = DateTimeUtil.microsFromTimestamptz(offsetDateTime) / 1000L; + } else { + timeMillis = Math.floorDiv((Long) value, 1000L); + } + return TimestampData.fromEpochMillis( + timeMillis, + (int) Math.floorMod(value instanceof Long ? (Long) value : timeMillis * 1000L, 1000L) + * 1000); + case TIMESTAMP_NANO: + long nanoLong; + if (value instanceof LocalDateTime localDateTime) { + nanoLong = DateTimeUtil.nanosFromTimestamp(localDateTime); + } else if (value instanceof OffsetDateTime offsetDateTime) { + nanoLong = DateTimeUtil.nanosFromTimestamptz(offsetDateTime); + } else { + nanoLong = (Long) value; + } + return TimestampData.fromEpochMillis( + Math.floorDiv(nanoLong, 1_000_000L), (int) Math.floorMod(nanoLong, 1_000_000L)); case STRING: return StringData.fromString(value.toString()); case FIXED: diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/formats/avro/AvroToRowDataConverters.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/formats/avro/AvroToRowDataConverters.java new file mode 100644 index 000000000000..0f70e60a1b9f --- /dev/null +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/formats/avro/AvroToRowDataConverters.java @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.formats.avro; + +import java.io.Serializable; +import java.lang.reflect.Array; +import java.nio.ByteBuffer; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.temporal.ChronoField; +import java.util.List; +import java.util.Map; +import org.apache.avro.generic.GenericFixed; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.utils.LogicalTypeUtils; +import org.apache.iceberg.flink.formats.avro.typeutils.AvroSchemaConverter; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +/** + * Tool class used to convert from Avro {@link GenericRecord} to {@link RowData}. + * + *

This class is adapted in Iceberg to add support for nanosecond precision timestamps + * (FLINK-39251). Once that ticket is resolved in Flink, this custom converter may be removed. + */ +@Internal +public class AvroToRowDataConverters { + + private AvroToRowDataConverters() {} + + /** + * Runtime converter that converts Avro data structures into objects of Flink Table & SQL + * internal data structures. + */ + @FunctionalInterface + public interface AvroToRowDataConverter extends Serializable { + Object convert(Object object); + } + + // ------------------------------------------------------------------------------------- + // Runtime Converters + // ------------------------------------------------------------------------------------- + + public static AvroToRowDataConverter createRowConverter(RowType rowType) { + return createRowConverter(rowType, true); + } + + public static AvroToRowDataConverter createRowConverter( + RowType rowType, boolean legacyTimestampMapping) { + final AvroToRowDataConverter[] fieldConverters = + rowType.getFields().stream() + .map(RowType.RowField::getType) + .map(type -> createNullableConverter(type, legacyTimestampMapping)) + .toArray(AvroToRowDataConverter[]::new); + final int arity = rowType.getFieldCount(); + + return avroObject -> { + IndexedRecord record = (IndexedRecord) avroObject; + GenericRowData row = new GenericRowData(arity); + for (int i = 0; i < arity; ++i) { + // avro always deserialize successfully even though the type isn't matched + // so no need to throw exception about which field can't be deserialized + row.setField(i, fieldConverters[i].convert(record.get(i))); + } + return row; + }; + } + + /** Creates a runtime converter which is null safe. */ + private static AvroToRowDataConverter createNullableConverter( + LogicalType type, boolean legacyTimestampMapping) { + final AvroToRowDataConverter converter = createConverter(type, legacyTimestampMapping); + return avroObject -> { + if (avroObject == null) { + return null; + } + return converter.convert(avroObject); + }; + } + + /** Creates a runtime converter which assuming input object is not null. */ + private static AvroToRowDataConverter createConverter( + LogicalType type, boolean legacyTimestampMapping) { + switch (type.getTypeRoot()) { + case NULL: + return avroObject -> null; + case TINYINT: + return avroObject -> ((Integer) avroObject).byteValue(); + case SMALLINT: + return avroObject -> ((Integer) avroObject).shortValue(); + case BOOLEAN: // boolean + case INTEGER: // int + case INTERVAL_YEAR_MONTH: // long + case BIGINT: // long + case INTERVAL_DAY_TIME: // long + case FLOAT: // float + case DOUBLE: // double + return avroObject -> avroObject; + case DATE: + return AvroToRowDataConverters::convertToDate; + case TIME_WITHOUT_TIME_ZONE: + return AvroToRowDataConverters::convertToTime; + case TIMESTAMP_WITHOUT_TIME_ZONE: + // Iceberg: Added support for nanoseconds precision (FLINK-39251) + return avroObject -> convertToTimestamp(avroObject, type); + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + if (legacyTimestampMapping) { + throw new UnsupportedOperationException("Unsupported type: " + type); + } else { + // Iceberg: Added support for nanoseconds precision (FLINK-39251) + return avroObject -> convertToTimestamp(avroObject, type); + } + case CHAR: + case VARCHAR: + return avroObject -> StringData.fromString(avroObject.toString()); + case BINARY: + case VARBINARY: + return AvroToRowDataConverters::convertToBytes; + case DECIMAL: + return createDecimalConverter((DecimalType) type); + case ARRAY: + return createArrayConverter((ArrayType) type, legacyTimestampMapping); + case ROW: + return createRowConverter((RowType) type); + case MAP: + case MULTISET: + return createMapConverter(type, legacyTimestampMapping); + case RAW: + default: + throw new UnsupportedOperationException("Unsupported type: " + type); + } + } + + private static AvroToRowDataConverter createDecimalConverter(DecimalType decimalType) { + final int precision = decimalType.getPrecision(); + final int scale = decimalType.getScale(); + return avroObject -> { + final byte[] bytes; + if (avroObject instanceof GenericFixed) { + bytes = ((GenericFixed) avroObject).bytes(); + } else if (avroObject instanceof ByteBuffer) { + ByteBuffer byteBuffer = (ByteBuffer) avroObject; + bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); + } else { + bytes = (byte[]) avroObject; + } + return DecimalData.fromUnscaledBytes(bytes, precision, scale); + }; + } + + private static AvroToRowDataConverter createArrayConverter( + ArrayType arrayType, boolean legacyTimestampMapping) { + final AvroToRowDataConverter elementConverter = + createNullableConverter(arrayType.getElementType(), legacyTimestampMapping); + final Class elementClass = + LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType()); + + return avroObject -> { + final List list = (List) avroObject; + final int length = list.size(); + final Object[] array = (Object[]) Array.newInstance(elementClass, length); + for (int i = 0; i < length; ++i) { + array[i] = elementConverter.convert(list.get(i)); + } + return new GenericArrayData(array); + }; + } + + private static AvroToRowDataConverter createMapConverter( + LogicalType type, boolean legacyTimestampMapping) { + final AvroToRowDataConverter keyConverter = + createConverter(DataTypes.STRING().getLogicalType(), legacyTimestampMapping); + final AvroToRowDataConverter valueConverter = + createNullableConverter( + AvroSchemaConverter.extractValueTypeToAvroMap(type), legacyTimestampMapping); + + return avroObject -> { + final Map map = (Map) avroObject; + Map result = Maps.newHashMap(); + for (Map.Entry entry : map.entrySet()) { + Object key = keyConverter.convert(entry.getKey()); + Object value = valueConverter.convert(entry.getValue()); + result.put(key, value); + } + return new GenericMapData(result); + }; + } + + private static TimestampData convertToTimestamp(Object object, LogicalType type) { + int precision = 3; + if (type instanceof org.apache.flink.table.types.logical.TimestampType) { + precision = ((org.apache.flink.table.types.logical.TimestampType) type).getPrecision(); + } else if (type instanceof org.apache.flink.table.types.logical.LocalZonedTimestampType) { + precision = + ((org.apache.flink.table.types.logical.LocalZonedTimestampType) type).getPrecision(); + } + + if (object instanceof Long) { + long timeLong = (Long) object; + if (precision <= 3) { + return TimestampData.fromEpochMillis(timeLong); + } else if (precision <= 6) { + return TimestampData.fromEpochMillis( + Math.floorDiv(timeLong, 1000L), (int) Math.floorMod(timeLong, 1000L) * 1_000_000); + } else { + // Iceberg: Added support for nanoseconds precision (FLINK-39251) + return TimestampData.fromEpochMillis( + Math.floorDiv(timeLong, 1_000_000L), (int) Math.floorMod(timeLong, 1_000_000L)); + } + } else if (object instanceof Instant) { + return TimestampData.fromInstant((Instant) object); + } else if (object instanceof LocalDateTime) { + return TimestampData.fromLocalDateTime((LocalDateTime) object); + } else { + JodaConverter jodaConverter = JodaConverter.getConverter(); + if (jodaConverter != null) { + return TimestampData.fromEpochMillis(jodaConverter.convertTimestamp(object)); + } else { + throw new IllegalArgumentException( + "Unexpected object type for TIMESTAMP logical type. Received: " + object); + } + } + } + + private static int convertToDate(Object object) { + if (object instanceof Integer) { + return (Integer) object; + } else if (object instanceof LocalDate) { + return (int) ((LocalDate) object).toEpochDay(); + } else { + JodaConverter jodaConverter = JodaConverter.getConverter(); + if (jodaConverter != null) { + return (int) jodaConverter.convertDate(object); + } else { + throw new IllegalArgumentException( + "Unexpected object type for DATE logical type. Received: " + object); + } + } + } + + private static int convertToTime(Object object) { + final int millis; + if (object instanceof Integer) { + millis = (Integer) object; + } else if (object instanceof LocalTime) { + millis = ((LocalTime) object).get(ChronoField.MILLI_OF_DAY); + } else { + JodaConverter jodaConverter = JodaConverter.getConverter(); + if (jodaConverter != null) { + millis = jodaConverter.convertTime(object); + } else { + throw new IllegalArgumentException( + "Unexpected object type for TIME logical type. Received: " + object); + } + } + return millis; + } + + private static byte[] convertToBytes(Object object) { + if (object instanceof GenericFixed) { + return ((GenericFixed) object).bytes(); + } else if (object instanceof ByteBuffer) { + ByteBuffer byteBuffer = (ByteBuffer) object; + byte[] bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); + return bytes; + } else { + return (byte[]) object; + } + } +} diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/formats/avro/JodaConverter.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/formats/avro/JodaConverter.java new file mode 100644 index 000000000000..c30b78023345 --- /dev/null +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/formats/avro/JodaConverter.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.formats.avro; + +import org.joda.time.DateTime; +import org.joda.time.DateTimeFieldType; +import org.joda.time.LocalDate; +import org.joda.time.LocalTime; + +/** + * Encapsulates joda optional dependency. Instantiates this class only if joda is available on the + * classpath. + */ +@SuppressWarnings("JavaUtilDate") +class JodaConverter { + + private static JodaConverter instance; + private static boolean instantiated = false; + + public static JodaConverter getConverter() { + if (instantiated) { + return instance; + } + + try { + Class.forName( + "org.joda.time.DateTime", false, Thread.currentThread().getContextClassLoader()); + instance = new JodaConverter(); + } catch (ClassNotFoundException e) { + instance = null; + } finally { + instantiated = true; + } + return instance; + } + + public long convertDate(Object object) { + final LocalDate value = (LocalDate) object; + return value.toDate().getTime(); + } + + public int convertTime(Object object) { + final LocalTime value = (LocalTime) object; + return value.get(DateTimeFieldType.millisOfDay()); + } + + public long convertTimestamp(Object object) { + final DateTime value = (DateTime) object; + return value.toDate().getTime(); + } + + private JodaConverter() {} +} diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/formats/avro/RowDataToAvroConverters.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/formats/avro/RowDataToAvroConverters.java new file mode 100644 index 000000000000..d4c7e4282d6e --- /dev/null +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/formats/avro/RowDataToAvroConverters.java @@ -0,0 +1,394 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.formats.avro; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.time.ZoneOffset; +import java.util.List; +import java.util.Map; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.util.Utf8; +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.ArrayData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.MapData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.CollectionUtil; +import org.apache.iceberg.flink.formats.avro.typeutils.AvroSchemaConverter; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** + * Tool class used to convert from {@link RowData} to Avro {@link GenericRecord}. + * + *

This class is adapted in Iceberg to add support for nanosecond precision timestamps + * (FLINK-39251). Once that ticket is resolved in Flink, this custom converter may be removed. + */ +@Internal +public class RowDataToAvroConverters { + + private RowDataToAvroConverters() {} + + // -------------------------------------------------------------------------------- + // Runtime Converters + // -------------------------------------------------------------------------------- + + /** + * Runtime converter that converts objects of Flink Table & SQL internal data structures to + * corresponding Avro data structures. + */ + @FunctionalInterface + public interface RowDataToAvroConverter extends Serializable { + Object convert(Schema schema, Object object); + } + + // -------------------------------------------------------------------------------- + // IMPORTANT! We use anonymous classes instead of lambdas for a reason here. It is + // necessary because the maven shade plugin cannot relocate classes in + // SerializedLambdas (MSHADE-260). On the other hand we want to relocate Avro for + // sql-client uber jars. + // -------------------------------------------------------------------------------- + + /** + * Creates a runtime converter according to the given logical type that converts objects of Flink + * Table & SQL internal data structures to corresponding Avro data structures. + */ + public static RowDataToAvroConverter createConverter(LogicalType type) { + return createConverter(type, true); + } + + @SuppressWarnings("checkstyle:MethodLength") + public static RowDataToAvroConverter createConverter( + LogicalType type, boolean legacyTimestampMapping) { + final RowDataToAvroConverter converter; + switch (type.getTypeRoot()) { + case NULL: + converter = + new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + return null; + } + }; + break; + case TINYINT: + converter = + new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + return ((Byte) object).intValue(); + } + }; + break; + case SMALLINT: + converter = + new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + return ((Short) object).intValue(); + } + }; + break; + case BOOLEAN: // boolean + case INTEGER: // int + case INTERVAL_YEAR_MONTH: // long + case BIGINT: // long + case INTERVAL_DAY_TIME: // long + case FLOAT: // float + case DOUBLE: // double + case TIME_WITHOUT_TIME_ZONE: // int + case DATE: // int + converter = + new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + return object; + } + }; + break; + case CHAR: + case VARCHAR: + converter = + new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + return new Utf8(object.toString()); + } + }; + break; + case BINARY: + case VARBINARY: + converter = + new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + return ByteBuffer.wrap((byte[]) object); + } + }; + break; + // Iceberg: Added support for nanoseconds precision (FLINK-39251) + case TIMESTAMP_WITHOUT_TIME_ZONE: + final int tzPrecision; + if (type instanceof org.apache.flink.table.types.logical.TimestampType) { + tzPrecision = ((org.apache.flink.table.types.logical.TimestampType) type).getPrecision(); + } else { + tzPrecision = 3; + } + if (legacyTimestampMapping) { + converter = + new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + TimestampData timestampData = (TimestampData) object; + if (tzPrecision <= 3) { + return timestampData.getMillisecond(); + } else if (tzPrecision <= 6) { + return timestampData.getMillisecond() * 1000L + + timestampData.getNanoOfMillisecond() / 1000; + } else { + // Iceberg: Added support for nanoseconds precision (FLINK-39251) + return timestampData.getMillisecond() * 1_000_000L + + timestampData.getNanoOfMillisecond(); + } + } + }; + } else { + converter = + new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + TimestampData timestampData = (TimestampData) object; + java.time.Instant instant = + timestampData.toLocalDateTime().toInstant(ZoneOffset.UTC); + if (tzPrecision <= 3) { + return instant.toEpochMilli(); + } else if (tzPrecision <= 6) { + return instant.getEpochSecond() * 1_000_000L + instant.getNano() / 1000; + } else { + // Iceberg: Added support for nanoseconds precision (FLINK-39251) + return instant.getEpochSecond() * 1_000_000_000L + instant.getNano(); + } + } + }; + } + break; + // Iceberg: Added support for nanoseconds precision (FLINK-39251) + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + final int ltzPrecision; + if (type instanceof org.apache.flink.table.types.logical.LocalZonedTimestampType) { + ltzPrecision = + ((org.apache.flink.table.types.logical.LocalZonedTimestampType) type).getPrecision(); + } else { + ltzPrecision = 3; + } + if (legacyTimestampMapping) { + throw new UnsupportedOperationException("Unsupported type: " + type); + } else { + converter = + new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + TimestampData timestampData = (TimestampData) object; + if (ltzPrecision <= 3) { + return timestampData.getMillisecond(); + } else if (ltzPrecision <= 6) { + return timestampData.getMillisecond() * 1000L + + timestampData.getNanoOfMillisecond() / 1000; + } else { + // Iceberg: Added support for nanoseconds precision (FLINK-39251) + return timestampData.getMillisecond() * 1_000_000L + + timestampData.getNanoOfMillisecond(); + } + } + }; + } + break; + case DECIMAL: + converter = + new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + return ByteBuffer.wrap(((DecimalData) object).toUnscaledBytes()); + } + }; + break; + case ARRAY: + converter = createArrayConverter((ArrayType) type, legacyTimestampMapping); + break; + case ROW: + converter = createRowConverter((RowType) type, legacyTimestampMapping); + break; + case MAP: + case MULTISET: + converter = createMapConverter(type, legacyTimestampMapping); + break; + case RAW: + default: + throw new UnsupportedOperationException("Unsupported type: " + type); + } + + // wrap into nullable converter + return new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + if (object == null) { + return null; + } + + // get actual schema if it is a nullable schema + Schema actualSchema; + if (schema.getType() == Schema.Type.UNION) { + List types = schema.getTypes(); + int size = types.size(); + if (size == 2 && types.get(1).getType() == Schema.Type.NULL) { + actualSchema = types.get(0); + } else if (size == 2 && types.get(0).getType() == Schema.Type.NULL) { + actualSchema = types.get(1); + } else { + throw new IllegalArgumentException( + "The Avro schema is not a nullable type: " + schema.toString()); + } + } else { + actualSchema = schema; + } + return converter.convert(actualSchema, object); + } + }; + } + + private static RowDataToAvroConverter createRowConverter( + RowType rowType, boolean legacyTimestampMapping) { + final RowDataToAvroConverter[] fieldConverters = + rowType.getChildren().stream() + .map(legacyType -> createConverter(legacyType, legacyTimestampMapping)) + .toArray(RowDataToAvroConverter[]::new); + final LogicalType[] fieldTypes = + rowType.getFields().stream().map(RowType.RowField::getType).toArray(LogicalType[]::new); + final RowData.FieldGetter[] fieldGetters = new RowData.FieldGetter[fieldTypes.length]; + for (int i = 0; i < fieldTypes.length; i++) { + fieldGetters[i] = RowData.createFieldGetter(fieldTypes[i], i); + } + final int length = rowType.getFieldCount(); + + return new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + final RowData row = (RowData) object; + final List fields = schema.getFields(); + final GenericRecord record = new GenericData.Record(schema); + for (int i = 0; i < length; ++i) { + final Schema.Field schemaField = fields.get(i); + try { + Object avroObject = + fieldConverters[i].convert( + schemaField.schema(), fieldGetters[i].getFieldOrNull(row)); + record.put(i, avroObject); + } catch (Throwable t) { + throw new RuntimeException( + String.format("Fail to serialize at field: %s.", schemaField.name()), t); + } + } + return record; + } + }; + } + + private static RowDataToAvroConverter createArrayConverter( + ArrayType arrayType, boolean legacyTimestampMapping) { + LogicalType elementType = arrayType.getElementType(); + final ArrayData.ElementGetter elementGetter = ArrayData.createElementGetter(elementType); + final RowDataToAvroConverter elementConverter = + createConverter(arrayType.getElementType(), legacyTimestampMapping); + + return new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + final Schema elementSchema = schema.getElementType(); + ArrayData arrayData = (ArrayData) object; + List list = Lists.newArrayList(); + for (int i = 0; i < arrayData.size(); ++i) { + list.add( + elementConverter.convert( + elementSchema, elementGetter.getElementOrNull(arrayData, i))); + } + return list; + } + }; + } + + private static RowDataToAvroConverter createMapConverter( + LogicalType type, boolean legacyTimestampMapping) { + LogicalType valueType = AvroSchemaConverter.extractValueTypeToAvroMap(type); + final ArrayData.ElementGetter valueGetter = ArrayData.createElementGetter(valueType); + final RowDataToAvroConverter valueConverter = + createConverter(valueType, legacyTimestampMapping); + + return new RowDataToAvroConverter() { + private static final long serialVersionUID = 1L; + + @Override + public Object convert(Schema schema, Object object) { + final Schema valueSchema = schema.getValueType(); + final MapData mapData = (MapData) object; + final ArrayData keyArray = mapData.keyArray(); + final ArrayData valueArray = mapData.valueArray(); + final Map map = CollectionUtil.newHashMapWithExpectedSize(mapData.size()); + for (int i = 0; i < mapData.size(); ++i) { + final String key = keyArray.getString(i).toString(); + final Object value = + valueConverter.convert(valueSchema, valueGetter.getElementOrNull(valueArray, i)); + map.put(key, value); + } + return map; + } + }; + } +} diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/formats/avro/typeutils/AvroSchemaConverter.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/formats/avro/typeutils/AvroSchemaConverter.java new file mode 100644 index 000000000000..cb6505c6ff9d --- /dev/null +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/formats/avro/typeutils/AvroSchemaConverter.java @@ -0,0 +1,624 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.formats.avro.typeutils; + +import java.util.List; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.SchemaParseException; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.specific.SpecificRecord; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.formats.avro.AvroRowDataDeserializationSchema; +import org.apache.flink.formats.avro.AvroRowDataSerializationSchema; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.AtomicDataType; +import org.apache.flink.table.types.logical.RawType; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LocalZonedTimestampType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.MultisetType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimeType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.types.Row; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +/** + * Converts an Avro schema into Flink's type information. It uses {@link RowTypeInfo} for + * representing objects and converts Avro types into types that are compatible with Flink's Table + * & SQL API. + * + *

Note: Changes in this class need to be kept in sync with the corresponding runtime classes + * {@link AvroRowDataDeserializationSchema} and {@link AvroRowDataSerializationSchema}. + * + *

This class is adapted in Iceberg to support custom 'timestamp-nanos' and + * 'local-timestamp-nanos' logical types (FLINK-39251). Once that ticket is resolved in Flink, these + * custom types may be removed. + */ +public class AvroSchemaConverter { + + private AvroSchemaConverter() { + // private + } + + /** + * Converts an Avro class into a nested row structure with deterministic field order and data + * types that are compatible with Flink's Table & SQL API. + * + * @param avroClass Avro specific record that contains schema information + * @return type information matching the schema + */ + @SuppressWarnings("unchecked") + public static TypeInformation convertToTypeInfo( + Class avroClass) { + return convertToTypeInfo(avroClass, true); + } + + /** + * Converts an Avro class into a nested row structure with deterministic field order and data + * types that are compatible with Flink's Table & SQL API. + * + * @param avroClass Avro specific record that contains schema information + * @param legacyTimestampMapping legacy mapping of timestamp types + * @return type information matching the schema + */ + @SuppressWarnings("unchecked") + public static TypeInformation convertToTypeInfo( + Class avroClass, boolean legacyTimestampMapping) { + Preconditions.checkNotNull(avroClass, "Avro specific record class must not be null."); + // determine schema to retrieve deterministic field order + final Schema schema = SpecificData.get().getSchema(avroClass); + return (TypeInformation) convertToTypeInfo(schema, true); + } + + /** + * Converts an Avro schema string into a nested row structure with deterministic field order and + * data types that are compatible with Flink's Table & SQL API. + * + * @param avroSchemaString Avro schema definition string + * @return type information matching the schema + */ + @SuppressWarnings("unchecked") + public static TypeInformation convertToTypeInfo(String avroSchemaString) { + return convertToTypeInfo(avroSchemaString, true); + } + + /** + * Converts an Avro schema string into a nested row structure with deterministic field order and + * data types that are compatible with Flink's Table & SQL API. + * + * @param avroSchemaString Avro schema definition string + * @param legacyTimestampMapping legacy mapping of timestamp types + * @return type information matching the schema + */ + @SuppressWarnings("unchecked") + public static TypeInformation convertToTypeInfo( + String avroSchemaString, boolean legacyTimestampMapping) { + Preconditions.checkNotNull(avroSchemaString, "Avro schema must not be null."); + final Schema schema; + try { + schema = new Schema.Parser().parse(avroSchemaString); + } catch (SchemaParseException e) { + throw new IllegalArgumentException("Could not parse Avro schema string.", e); + } + return (TypeInformation) convertToTypeInfo(schema, legacyTimestampMapping); + } + + private static TypeInformation convertToTypeInfo( + Schema schema, boolean legacyTimestampMapping) { + switch (schema.getType()) { + case RECORD: + final List fields = schema.getFields(); + + final TypeInformation[] types = new TypeInformation[fields.size()]; + final String[] names = new String[fields.size()]; + for (int i = 0; i < fields.size(); i++) { + final Schema.Field field = fields.get(i); + types[i] = convertToTypeInfo(field.schema(), legacyTimestampMapping); + names[i] = field.name(); + } + return Types.ROW_NAMED(names, types); + case ENUM: + return Types.STRING; + case ARRAY: + // result type might either be ObjectArrayTypeInfo or BasicArrayTypeInfo for Strings + return Types.OBJECT_ARRAY( + convertToTypeInfo(schema.getElementType(), legacyTimestampMapping)); + case MAP: + return Types.MAP( + Types.STRING, convertToTypeInfo(schema.getValueType(), legacyTimestampMapping)); + case UNION: + final Schema actualSchema; + if (schema.getTypes().size() == 2 + && schema.getTypes().get(0).getType() == Schema.Type.NULL) { + actualSchema = schema.getTypes().get(1); + } else if (schema.getTypes().size() == 2 + && schema.getTypes().get(1).getType() == Schema.Type.NULL) { + actualSchema = schema.getTypes().get(0); + } else if (schema.getTypes().size() == 1) { + actualSchema = schema.getTypes().get(0); + } else { + // use Kryo for serialization + return Types.GENERIC(Object.class); + } + return convertToTypeInfo(actualSchema, legacyTimestampMapping); + case FIXED: + // logical decimal type + if (schema.getLogicalType() instanceof LogicalTypes.Decimal) { + return Types.BIG_DEC; + } + // convert fixed size binary data to primitive byte arrays + return Types.PRIMITIVE_ARRAY(Types.BYTE); + case STRING: + // convert Avro's Utf8/CharSequence to String + return Types.STRING; + case BYTES: + // logical decimal type + if (schema.getLogicalType() instanceof LogicalTypes.Decimal) { + return Types.BIG_DEC; + } + return Types.PRIMITIVE_ARRAY(Types.BYTE); + case INT: + // logical date and time type + final org.apache.avro.LogicalType logicalType = schema.getLogicalType(); + if (logicalType == LogicalTypes.date()) { + return Types.SQL_DATE; + } else if (logicalType == LogicalTypes.timeMillis()) { + return Types.SQL_TIME; + } + return Types.INT; + case LONG: + if (legacyTimestampMapping) { + if (schema.getLogicalType() == LogicalTypes.timestampMillis() + || schema.getLogicalType() == LogicalTypes.timestampMicros() + // Iceberg: Added support for custom nanosecond logical type (FLINK-39251) + || (schema.getLogicalType() != null + && schema.getLogicalType().getName().equals("timestamp-nanos"))) { + return Types.SQL_TIMESTAMP; + } else if (schema.getLogicalType() == LogicalTypes.timeMicros() + || schema.getLogicalType() == LogicalTypes.timeMillis()) { + return Types.SQL_TIME; + } + } else { + // Avro logical timestamp types to Flink DataStream timestamp types + if (schema.getLogicalType() == LogicalTypes.timestampMillis() + || schema.getLogicalType() == LogicalTypes.timestampMicros() + // Iceberg: Added support for custom nanosecond logical type (FLINK-39251) + || (schema.getLogicalType() != null + && schema.getLogicalType().getName().equals("timestamp-nanos"))) { + return Types.INSTANT; + } else if (schema.getLogicalType() == LogicalTypes.localTimestampMillis() + || schema.getLogicalType() == LogicalTypes.localTimestampMicros() + // Iceberg: Added support for custom nanosecond logical type (FLINK-39251) + || (schema.getLogicalType() != null + && schema.getLogicalType().getName().equals("local-timestamp-nanos"))) { + return Types.LOCAL_DATE_TIME; + } else if (schema.getLogicalType() == LogicalTypes.timeMicros() + || schema.getLogicalType() == LogicalTypes.timeMillis()) { + return Types.SQL_TIME; + } + } + return Types.LONG; + case FLOAT: + return Types.FLOAT; + case DOUBLE: + return Types.DOUBLE; + case BOOLEAN: + return Types.BOOLEAN; + case NULL: + return Types.VOID; + } + throw new IllegalArgumentException("Unsupported Avro type '" + schema.getType() + "'."); + } + + /** + * Converts an Avro schema string into a nested row structure with deterministic field order and + * data types that are compatible with Flink's Table & SQL API. + * + * @param avroSchemaString Avro schema definition string + * @return data type matching the schema + */ + public static DataType convertToDataType(String avroSchemaString) { + return convertToDataType(avroSchemaString, true); + } + + /** + * Converts an Avro schema string into a nested row structure with deterministic field order and + * data types that are compatible with Flink's Table & SQL API. + * + * @param avroSchemaString Avro schema definition string + * @param legacyTimestampMapping legacy mapping of local timestamps + * @return data type matching the schema + */ + public static DataType convertToDataType( + String avroSchemaString, boolean legacyTimestampMapping) { + Preconditions.checkNotNull(avroSchemaString, "Avro schema must not be null."); + final Schema schema; + try { + schema = new Schema.Parser().parse(avroSchemaString); + } catch (SchemaParseException e) { + throw new IllegalArgumentException("Could not parse Avro schema string.", e); + } + return convertToDataType(schema, legacyTimestampMapping); + } + + @SuppressWarnings("deprecation") + private static DataType convertToDataType(Schema schema, boolean legacyMapping) { + switch (schema.getType()) { + case RECORD: + final List schemaFields = schema.getFields(); + + final DataTypes.Field[] fields = new DataTypes.Field[schemaFields.size()]; + for (int i = 0; i < schemaFields.size(); i++) { + final Schema.Field field = schemaFields.get(i); + fields[i] = + DataTypes.FIELD(field.name(), convertToDataType(field.schema(), legacyMapping)); + } + return DataTypes.ROW(fields).notNull(); + case ENUM: + return DataTypes.STRING().notNull(); + case ARRAY: + return DataTypes.ARRAY(convertToDataType(schema.getElementType(), legacyMapping)).notNull(); + case MAP: + return DataTypes.MAP( + DataTypes.STRING().notNull(), + convertToDataType(schema.getValueType(), legacyMapping)) + .notNull(); + case UNION: + final Schema actualSchema; + final boolean nullable; + if (schema.getTypes().size() == 2 + && schema.getTypes().get(0).getType() == Schema.Type.NULL) { + actualSchema = schema.getTypes().get(1); + nullable = true; + } else if (schema.getTypes().size() == 2 + && schema.getTypes().get(1).getType() == Schema.Type.NULL) { + actualSchema = schema.getTypes().get(0); + nullable = true; + } else if (schema.getTypes().size() == 1) { + actualSchema = schema.getTypes().get(0); + nullable = false; + } else { + // use Kryo for serialization + throw new UnsupportedOperationException("UNION with more than 2 types is not supported in Flink 1.20 backport."); + } + DataType converted = convertToDataType(actualSchema, legacyMapping); + return nullable ? converted.nullable() : converted; + case FIXED: + // logical decimal type + if (schema.getLogicalType() instanceof LogicalTypes.Decimal) { + final LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) schema.getLogicalType(); + return DataTypes.DECIMAL(decimalType.getPrecision(), decimalType.getScale()).notNull(); + } + // convert fixed size binary data to primitive byte arrays + return DataTypes.VARBINARY(schema.getFixedSize()).notNull(); + case STRING: + // convert Avro's Utf8/CharSequence to String + return DataTypes.STRING().notNull(); + case BYTES: + // logical decimal type + if (schema.getLogicalType() instanceof LogicalTypes.Decimal) { + final LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) schema.getLogicalType(); + return DataTypes.DECIMAL(decimalType.getPrecision(), decimalType.getScale()).notNull(); + } + return DataTypes.BYTES().notNull(); + case INT: + // logical date and time type + final org.apache.avro.LogicalType logicalType = schema.getLogicalType(); + if (logicalType == LogicalTypes.date()) { + return DataTypes.DATE().notNull(); + } else if (logicalType == LogicalTypes.timeMillis()) { + return DataTypes.TIME(3).notNull(); + } + return DataTypes.INT().notNull(); + case LONG: + if (legacyMapping) { + // Avro logical timestamp types to Flink SQL timestamp types + if (schema.getLogicalType() == LogicalTypes.timestampMillis()) { + return DataTypes.TIMESTAMP(3).notNull(); + } else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) { + return DataTypes.TIMESTAMP(6).notNull(); + } else if (schema.getLogicalType() != null + && schema.getLogicalType().getName().equals("timestamp-nanos")) { + // Iceberg: Added support for custom nanosecond logical type (FLINK-39251) + return DataTypes.TIMESTAMP(9).notNull(); + } else if (schema.getLogicalType() == LogicalTypes.timeMillis()) { + return DataTypes.TIME(3).notNull(); + } else if (schema.getLogicalType() == LogicalTypes.timeMicros()) { + return DataTypes.TIME(6).notNull(); + } + } else { + // Avro logical timestamp types to Flink SQL timestamp types + if (schema.getLogicalType() == LogicalTypes.timestampMillis()) { + return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull(); + } else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) { + return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6).notNull(); + } else if (schema.getLogicalType() != null + && schema.getLogicalType().getName().equals("timestamp-nanos")) { + // Iceberg: Added support for custom nanosecond logical type (FLINK-39251) + return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(9).notNull(); + } else if (schema.getLogicalType() == LogicalTypes.timeMillis()) { + return DataTypes.TIME(3).notNull(); + } else if (schema.getLogicalType() == LogicalTypes.timeMicros()) { + return DataTypes.TIME(6).notNull(); + } else if (schema.getLogicalType() == LogicalTypes.localTimestampMillis()) { + return DataTypes.TIMESTAMP(3).notNull(); + } else if (schema.getLogicalType() == LogicalTypes.localTimestampMicros()) { + return DataTypes.TIMESTAMP(6).notNull(); + } else if (schema.getLogicalType() != null + && schema.getLogicalType().getName().equals("local-timestamp-nanos")) { + // Iceberg: Added support for custom nanosecond logical type (FLINK-39251) + return DataTypes.TIMESTAMP(9).notNull(); + } + } + + return DataTypes.BIGINT().notNull(); + case FLOAT: + return DataTypes.FLOAT().notNull(); + case DOUBLE: + return DataTypes.DOUBLE().notNull(); + case BOOLEAN: + return DataTypes.BOOLEAN().notNull(); + case NULL: + return DataTypes.NULL(); + } + throw new IllegalArgumentException("Unsupported Avro type '" + schema.getType() + "'."); + } + + /** + * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro schema. + * + *

Use "org.apache.flink.avro.generated.record" as the type name. + * + * @param schema the schema type, usually it should be the top level record type, e.g. not a + * nested type + * @return Avro's {@link Schema} matching this logical type. + */ + public static Schema convertToSchema(LogicalType schema) { + return convertToSchema(schema, true); + } + + /** + * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro schema. + * + *

Use "org.apache.flink.avro.generated.record" as the type name. + * + * @param schema the schema type, usually it should be the top level record type, e.g. not a + * nested type + * @param legacyTimestampMapping whether to use the legacy timestamp mapping + * @return Avro's {@link Schema} matching this logical type. + */ + public static Schema convertToSchema(LogicalType schema, boolean legacyTimestampMapping) { + return convertToSchema( + schema, "org.apache.flink.avro.generated.record", legacyTimestampMapping); + } + + /** + * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro schema. + * + *

The "{rowName}_" is used as the nested row type name prefix in order to generate the right + * schema. Nested record type that only differs with type name is still compatible. + * + * @param logicalType logical type + * @param rowName the record name + * @return Avro's {@link Schema} matching this logical type. + */ + public static Schema convertToSchema(LogicalType logicalType, String rowName) { + return convertToSchema(logicalType, rowName, true); + } + + /** + * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro schema. + * + *

The "{rowName}_" is used as the nested row type name prefix in order to generate the right + * schema. Nested record type that only differs with type name is still compatible. + * + * @param logicalType logical type + * @param rowName the record name + * @param legacyTimestampMapping whether to use legal timestamp mapping + * @return Avro's {@link Schema} matching this logical type. + */ + public static Schema convertToSchema( + LogicalType logicalType, String rowName, boolean legacyTimestampMapping) { + int precision; + boolean nullable = logicalType.isNullable(); + switch (logicalType.getTypeRoot()) { + case NULL: + return SchemaBuilder.builder().nullType(); + case BOOLEAN: + Schema bool = SchemaBuilder.builder().booleanType(); + return nullable ? nullableSchema(bool) : bool; + case TINYINT: + case SMALLINT: + case INTEGER: + Schema integer = SchemaBuilder.builder().intType(); + return nullable ? nullableSchema(integer) : integer; + case BIGINT: + Schema bigint = SchemaBuilder.builder().longType(); + return nullable ? nullableSchema(bigint) : bigint; + case FLOAT: + Schema floatSchema = SchemaBuilder.builder().floatType(); + return nullable ? nullableSchema(floatSchema) : floatSchema; + case DOUBLE: + Schema doubleSchema = SchemaBuilder.builder().doubleType(); + return nullable ? nullableSchema(doubleSchema) : doubleSchema; + case CHAR: + case VARCHAR: + Schema str = SchemaBuilder.builder().stringType(); + return nullable ? nullableSchema(str) : str; + case BINARY: + case VARBINARY: + Schema binary = SchemaBuilder.builder().bytesType(); + return nullable ? nullableSchema(binary) : binary; + case TIMESTAMP_WITHOUT_TIME_ZONE: + // use long to represents Timestamp + final TimestampType timestampType = (TimestampType) logicalType; + precision = timestampType.getPrecision(); + org.apache.avro.LogicalType avroLogicalType; + if (legacyTimestampMapping) { + if (precision <= 3) { + avroLogicalType = LogicalTypes.timestampMillis(); + } else { + throw new IllegalArgumentException( + "Avro does not support TIMESTAMP type " + + "with precision: " + + precision + + ", it only supports precision less than 3."); + } + } else { + if (precision <= 3) { + avroLogicalType = LogicalTypes.localTimestampMillis(); + } else if (precision <= 6) { + avroLogicalType = LogicalTypes.localTimestampMicros(); + } else { + throw new IllegalArgumentException( + "Avro does not support LOCAL TIMESTAMP type " + + "with precision: " + + precision + + ", it only supports precision less than 6."); + } + } + Schema timestamp = avroLogicalType.addToSchema(SchemaBuilder.builder().longType()); + return nullable ? nullableSchema(timestamp) : timestamp; + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + if (legacyTimestampMapping) { + throw new UnsupportedOperationException( + "Unsupported to derive Schema for type: " + logicalType); + } else { + final LocalZonedTimestampType localZonedTimestampType = + (LocalZonedTimestampType) logicalType; + precision = localZonedTimestampType.getPrecision(); + if (precision <= 3) { + avroLogicalType = LogicalTypes.timestampMillis(); + } else if (precision <= 6) { + avroLogicalType = LogicalTypes.timestampMicros(); + } else { + throw new IllegalArgumentException( + "Avro does not support TIMESTAMP type " + + "with precision: " + + precision + + ", it only supports precision less than 6."); + } + timestamp = avroLogicalType.addToSchema(SchemaBuilder.builder().longType()); + return nullable ? nullableSchema(timestamp) : timestamp; + } + case DATE: + // use int to represents Date + Schema date = LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType()); + return nullable ? nullableSchema(date) : date; + case TIME_WITHOUT_TIME_ZONE: + precision = ((TimeType) logicalType).getPrecision(); + if (precision > 3) { + throw new IllegalArgumentException( + "Avro does not support TIME type with precision: " + + precision + + ", it only supports precision less than 3."); + } + // use int to represents Time, we only support millisecond when deserialization + Schema time = LogicalTypes.timeMillis().addToSchema(SchemaBuilder.builder().intType()); + return nullable ? nullableSchema(time) : time; + case DECIMAL: + DecimalType decimalType = (DecimalType) logicalType; + // store BigDecimal as byte[] + Schema decimal = + LogicalTypes.decimal(decimalType.getPrecision(), decimalType.getScale()) + .addToSchema(SchemaBuilder.builder().bytesType()); + return nullable ? nullableSchema(decimal) : decimal; + case ROW: + RowType rowType = (RowType) logicalType; + List fieldNames = rowType.getFieldNames(); + // we have to make sure the record name is different in a Schema + SchemaBuilder.FieldAssembler builder = + SchemaBuilder.builder().record(rowName).fields(); + for (int i = 0; i < rowType.getFieldCount(); i++) { + String fieldName = fieldNames.get(i); + LogicalType fieldType = rowType.getTypeAt(i); + SchemaBuilder.GenericDefault fieldBuilder = + builder + .name(fieldName) + .type( + convertToSchema( + fieldType, rowName + "_" + fieldName, legacyTimestampMapping)); + + if (fieldType.isNullable()) { + builder = fieldBuilder.withDefault(null); + } else { + builder = fieldBuilder.noDefault(); + } + } + Schema record = builder.endRecord(); + return nullable ? nullableSchema(record) : record; + case MULTISET: + case MAP: + Schema map = + SchemaBuilder.builder() + .map() + .values(convertToSchema(extractValueTypeToAvroMap(logicalType), rowName)); + return nullable ? nullableSchema(map) : map; + case ARRAY: + ArrayType arrayType = (ArrayType) logicalType; + Schema array = + SchemaBuilder.builder() + .array() + .items(convertToSchema(arrayType.getElementType(), rowName)); + return nullable ? nullableSchema(array) : array; + case RAW: + default: + throw new UnsupportedOperationException( + "Unsupported to derive Schema for type: " + logicalType); + } + } + + public static LogicalType extractValueTypeToAvroMap(LogicalType type) { + LogicalType keyType; + LogicalType valueType; + if (type instanceof MapType) { + MapType mapType = (MapType) type; + keyType = mapType.getKeyType(); + valueType = mapType.getValueType(); + } else { + MultisetType multisetType = (MultisetType) type; + keyType = multisetType.getElementType(); + valueType = new IntType(); + } + if (!keyType.is(LogicalTypeFamily.CHARACTER_STRING)) { + throw new UnsupportedOperationException( + "Avro format doesn't support non-string as key type of map. " + + "The key type is: " + + keyType.asSummaryString()); + } + return valueType; + } + + /** Returns schema with nullable true. */ + private static Schema nullableSchema(Schema schema) { + return schema.isNullable() + ? schema + : Schema.createUnion(SchemaBuilder.builder().nullType(), schema); + } +} diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/AvroGenericRecordToRowDataMapper.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/AvroGenericRecordToRowDataMapper.java index f7e8e0c884cf..d63493e4b16f 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/AvroGenericRecordToRowDataMapper.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/AvroGenericRecordToRowDataMapper.java @@ -21,8 +21,8 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.formats.avro.AvroToRowDataConverters; -import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; +import org.apache.iceberg.flink.formats.avro.AvroToRowDataConverters; +import org.apache.iceberg.flink.formats.avro.typeutils.AvroSchemaConverter; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataToAvroGenericRecordConverter.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataToAvroGenericRecordConverter.java index 8ef1f1fbb833..62874e8a42fd 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataToAvroGenericRecordConverter.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/RowDataToAvroGenericRecordConverter.java @@ -23,8 +23,8 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.flink.annotation.Internal; -import org.apache.flink.formats.avro.RowDataToAvroConverters; -import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; +import org.apache.iceberg.flink.formats.avro.RowDataToAvroConverters; +import org.apache.iceberg.flink.formats.avro.typeutils.AvroSchemaConverter; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordConverter.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordConverter.java index b158b0871a53..8140605767a8 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordConverter.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/reader/AvroGenericRecordConverter.java @@ -21,8 +21,8 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.formats.avro.RowDataToAvroConverters; -import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; +import org.apache.iceberg.flink.formats.avro.RowDataToAvroConverters; +import org.apache.iceberg.flink.formats.avro.typeutils.AvroSchemaConverter; import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.DataType; diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/DataGenerators.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/DataGenerators.java index e2cd411d7069..318843a48792 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/DataGenerators.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/DataGenerators.java @@ -75,6 +75,11 @@ public static class Primitives implements DataGenerator { OffsetDateTime.of(2022, 1, 10, 0, 0, 0, 0, ZoneOffset.UTC); private static final LocalDateTime JAVA_LOCAL_DATE_TIME_20220110 = LocalDateTime.of(2022, 1, 10, 0, 0, 0); + private static final OffsetDateTime JAVA_OFFSET_DATE_TIME_MAX_NANO = + OffsetDateTime.of(2262, 4, 11, 23, 47, 16, 854_775_807, ZoneOffset.UTC); + private static final LocalDateTime JAVA_LOCAL_DATE_TIME_MAX_NANO = + LocalDateTime.of(2262, 4, 11, 23, 47, 16, 854_775_807); + private static final long ICEBERG_MAX_NANOS_EPOCH = 9223372036854775807L; private static final BigDecimal BIG_DECIMAL_NEGATIVE = new BigDecimal("-1.50"); private static final byte[] FIXED_BYTES = "012345689012345".getBytes(StandardCharsets.UTF_8); @@ -96,7 +101,11 @@ public static class Primitives implements DataGenerator { Types.NestedField.required(12, "uuid_field", Types.UUIDType.get()), Types.NestedField.required(13, "binary_field", Types.BinaryType.get()), Types.NestedField.required(14, "decimal_field", Types.DecimalType.of(9, 2)), - Types.NestedField.required(15, "fixed_field", Types.FixedType.ofLength(16))); + Types.NestedField.required(15, "fixed_field", Types.FixedType.ofLength(16)), + Types.NestedField.required( + 16, "ts_ns_with_zone_field", Types.TimestampNanoType.withZone()), + Types.NestedField.required( + 17, "ts_ns_without_zone_field", Types.TimestampNanoType.withoutZone())); private final RowType flinkRowType = FlinkSchemaUtil.convert(icebergSchema); @@ -171,6 +180,8 @@ public GenericRecord generateIcebergGenericRecord() { genericRecord.setField("time_field", JAVA_LOCAL_TIME_HOUR8); genericRecord.setField("ts_with_zone_field", JAVA_OFFSET_DATE_TIME_20220110); genericRecord.setField("ts_without_zone_field", JAVA_LOCAL_DATE_TIME_20220110); + genericRecord.setField("ts_ns_with_zone_field", JAVA_OFFSET_DATE_TIME_MAX_NANO); + genericRecord.setField("ts_ns_without_zone_field", JAVA_LOCAL_DATE_TIME_MAX_NANO); byte[] uuidBytes = new byte[16]; for (int i = 0; i < 16; ++i) { @@ -220,7 +231,11 @@ public GenericRowData generateFlinkRowData() { uuidBytes, binaryBytes, DecimalData.fromBigDecimal(BIG_DECIMAL_NEGATIVE, 9, 2), - FIXED_BYTES); + FIXED_BYTES, + TimestampData.fromEpochMillis( + ICEBERG_MAX_NANOS_EPOCH / 1_000_000, (int) (ICEBERG_MAX_NANOS_EPOCH % 1_000_000)), + TimestampData.fromEpochMillis( + ICEBERG_MAX_NANOS_EPOCH / 1_000_000, (int) (ICEBERG_MAX_NANOS_EPOCH % 1_000_000))); } @Override @@ -238,8 +253,10 @@ public org.apache.avro.generic.GenericRecord generateAvroGenericRecord() { genericRecord.put("time_field", HOUR_8_IN_MILLI); // Although Avro logical type for timestamp fields are in micro seconds, // AvroToRowDataConverters only looks for long value in milliseconds. - genericRecord.put("ts_with_zone_field", JODA_DATETIME_20220110.getMillis()); - genericRecord.put("ts_without_zone_field", JODA_DATETIME_20220110.getMillis()); + genericRecord.put("ts_with_zone_field", JODA_DATETIME_20220110.getMillis() * 1000L); + genericRecord.put("ts_without_zone_field", JODA_DATETIME_20220110.getMillis() * 1000L); + genericRecord.put("ts_ns_with_zone_field", ICEBERG_MAX_NANOS_EPOCH); + genericRecord.put("ts_ns_without_zone_field", ICEBERG_MAX_NANOS_EPOCH); byte[] uuidBytes = new byte[16]; for (int i = 0; i < 16; ++i) { @@ -554,7 +571,11 @@ public static class ArrayOfPrimitive implements DataGenerator { new Schema( Types.NestedField.required(1, "row_id", Types.StringType.get()), Types.NestedField.required( - 2, "array_of_int", Types.ListType.ofOptional(101, Types.IntegerType.get()))); + 2, "array_of_int", Types.ListType.ofOptional(101, Types.IntegerType.get())), + Types.NestedField.optional( + 3, + "array_of_ts_ns", + Types.ListType.ofRequired(102, Types.TimestampNanoType.withoutZone()))); private final RowType flinkRowType = FlinkSchemaUtil.convert(icebergSchema); @@ -581,13 +602,33 @@ public GenericRecord generateIcebergGenericRecord() { GenericRecord genericRecord = GenericRecord.create(icebergSchema); genericRecord.setField("row_id", "row_id_value"); genericRecord.setField("array_of_int", Arrays.asList(1, 2, 3)); + + LocalDateTime posNanos = LocalDateTime.of(2023, 1, 1, 12, 0, 0, 123456789); + LocalDateTime negNanos = LocalDateTime.of(1969, 12, 31, 23, 59, 59, 987654321); + genericRecord.setField("array_of_ts_ns", Arrays.asList(posNanos, negNanos)); return genericRecord; } @Override public GenericRowData generateFlinkRowData() { Integer[] arr = {1, 2, 3}; - return GenericRowData.of(StringData.fromString("row_id_value"), new GenericArrayData(arr)); + + long posNanos = + org.apache.iceberg.util.DateTimeUtil.nanosFromTimestamp( + LocalDateTime.of(2023, 1, 1, 12, 0, 0, 123456789)); + long negNanos = + org.apache.iceberg.util.DateTimeUtil.nanosFromTimestamp( + LocalDateTime.of(1969, 12, 31, 23, 59, 59, 987654321)); + TimestampData[] tsArr = { + TimestampData.fromEpochMillis( + Math.floorDiv(posNanos, 1_000_000L), (int) Math.floorMod(posNanos, 1_000_000L)), + TimestampData.fromEpochMillis( + Math.floorDiv(negNanos, 1_000_000L), (int) Math.floorMod(negNanos, 1_000_000L)) + }; + return GenericRowData.of( + StringData.fromString("row_id_value"), + new GenericArrayData(arr), + new GenericArrayData(tsArr)); } @Override @@ -595,6 +636,14 @@ public org.apache.avro.generic.GenericRecord generateAvroGenericRecord() { org.apache.avro.generic.GenericRecord genericRecord = new GenericData.Record(avroSchema); genericRecord.put("row_id", "row_id_value"); genericRecord.put("array_of_int", Arrays.asList(1, 2, 3)); + + long posNanos = + org.apache.iceberg.util.DateTimeUtil.nanosFromTimestamp( + LocalDateTime.of(2023, 1, 1, 12, 0, 0, 123456789)); + long negNanos = + org.apache.iceberg.util.DateTimeUtil.nanosFromTimestamp( + LocalDateTime.of(1969, 12, 31, 23, 59, 59, 987654321)); + genericRecord.put("array_of_ts_ns", Arrays.asList(posNanos, negNanos)); return genericRecord; } } @@ -808,7 +857,12 @@ public static class MapOfPrimitives implements DataGenerator { 2, "map_of_primitives", Types.MapType.ofRequired( - 101, 102, Types.StringType.get(), Types.IntegerType.get()))); + 101, 102, Types.StringType.get(), Types.IntegerType.get())), + Types.NestedField.optional( + 3, + "map_of_ts_ns", + Types.MapType.ofRequired( + 103, 104, Types.StringType.get(), Types.TimestampNanoType.withoutZone()))); private final RowType flinkRowType = FlinkSchemaUtil.convert(icebergSchema); @@ -835,15 +889,37 @@ public GenericRecord generateIcebergGenericRecord() { GenericRecord genericRecord = GenericRecord.create(icebergSchema); genericRecord.setField("row_id", "row_id_value"); genericRecord.setField("map_of_primitives", ImmutableMap.of("Jane", 1, "Joe", 2)); + + LocalDateTime posNanos = LocalDateTime.of(2023, 1, 1, 12, 0, 0, 123456789); + LocalDateTime negNanos = LocalDateTime.of(1969, 12, 31, 23, 59, 59, 987654321); + genericRecord.setField( + "map_of_ts_ns", ImmutableMap.of("positive", posNanos, "negative", negNanos)); return genericRecord; } @Override public GenericRowData generateFlinkRowData() { + long posNanos = + org.apache.iceberg.util.DateTimeUtil.nanosFromTimestamp( + LocalDateTime.of(2023, 1, 1, 12, 0, 0, 123456789)); + long negNanos = + org.apache.iceberg.util.DateTimeUtil.nanosFromTimestamp( + LocalDateTime.of(1969, 12, 31, 23, 59, 59, 987654321)); + return GenericRowData.of( StringData.fromString("row_id_value"), new GenericMapData( - ImmutableMap.of(StringData.fromString("Jane"), 1, StringData.fromString("Joe"), 2))); + ImmutableMap.of(StringData.fromString("Jane"), 1, StringData.fromString("Joe"), 2)), + new GenericMapData( + ImmutableMap.of( + StringData.fromString("positive"), + TimestampData.fromEpochMillis( + Math.floorDiv(posNanos, 1_000_000L), + (int) Math.floorMod(posNanos, 1_000_000L)), + StringData.fromString("negative"), + TimestampData.fromEpochMillis( + Math.floorDiv(negNanos, 1_000_000L), + (int) Math.floorMod(negNanos, 1_000_000L))))); } @Override @@ -851,6 +927,15 @@ public org.apache.avro.generic.GenericRecord generateAvroGenericRecord() { org.apache.avro.generic.GenericRecord genericRecord = new GenericData.Record(avroSchema); genericRecord.put("row_id", "row_id_value"); genericRecord.put("map_of_primitives", ImmutableMap.of("Jane", 1, "Joe", 2)); + + long posNanos = + org.apache.iceberg.util.DateTimeUtil.nanosFromTimestamp( + LocalDateTime.of(2023, 1, 1, 12, 0, 0, 123456789)); + long negNanos = + org.apache.iceberg.util.DateTimeUtil.nanosFromTimestamp( + LocalDateTime.of(1969, 12, 31, 23, 59, 59, 987654321)); + genericRecord.put( + "map_of_ts_ns", ImmutableMap.of("positive", posNanos, "negative", negNanos)); return genericRecord; } } diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestRowDataWrapper.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestRowDataWrapper.java index cd6964b5ed0f..3d9098339a5e 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestRowDataWrapper.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestRowDataWrapper.java @@ -60,17 +60,7 @@ public void testTime() { }); } - @Disabled - @Override - public void testTimestampNanoWithoutZone() { - // Flink does not support nanosecond timestamp without zone. - } - @Disabled - @Override - public void testTimestampNanoWithZone() { - // Flink does not support nanosecond timestamp with zone. - } @Override protected void generateAndValidate( diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java index 4a70802f2a2e..b7b0a54156cc 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkOrcReaderWriter.java @@ -49,6 +49,11 @@ protected boolean allowsWritingNullValuesForRequiredFields() { return true; } + @Override + protected boolean supportsTimestampNanos() { + return true; + } + @Override protected void writeAndValidate(Schema schema) throws IOException { List expectedRecords = RandomGenericData.generate(schema, NUM_RECORDS, 1990L); diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestRowDataProjection.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestRowDataProjection.java index 4e5b38ffb026..a2411da1e344 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestRowDataProjection.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/TestRowDataProjection.java @@ -271,18 +271,19 @@ public void testMapOfPrimitivesProjection() { GenericRowData.of( StringData.fromString("row_id_value"), new GenericMapData( - ImmutableMap.of(StringData.fromString("foo"), 1, StringData.fromString("bar"), 2))); + ImmutableMap.of(StringData.fromString("foo"), 1, StringData.fromString("bar"), 2)), + null); testEqualsAndHashCode(schema, idOnly, rowData, copyRowData, otherRowData, true); testEqualsAndHashCode(schema, mapOnly, rowData, copyRowData, otherRowData); testEqualsAndHashCode(schema, schema, rowData, copyRowData, otherRowData); GenericRowData rowDataNullOptionalFields = - GenericRowData.of(StringData.fromString("row_id_value"), null); + GenericRowData.of(StringData.fromString("row_id_value"), null, null); GenericRowData copyRowDataNullOptionalFields = - GenericRowData.of(StringData.fromString("row_id_value"), null); + GenericRowData.of(StringData.fromString("row_id_value"), null, null); // modify the map field value GenericRowData otherRowDataNullOptionalFields = - GenericRowData.of(StringData.fromString("other_row_id_value"), null); + GenericRowData.of(StringData.fromString("other_row_id_value"), null, null); testEqualsAndHashCode( schema, idOnly, @@ -432,7 +433,8 @@ public void testArrayOfPrimitiveProjection() { GenericRowData otherRowData = GenericRowData.of( StringData.fromString("other_row_id_value"), - new GenericArrayData(new Integer[] {4, 5, 6})); + new GenericArrayData(new Integer[] {4, 5, 6}), + null); testEqualsAndHashCode(schema, idOnly, rowData, copyRowData, otherRowData); testEqualsAndHashCode(schema, arrayOnly, rowData, copyRowData, otherRowData); testEqualsAndHashCode(schema, schema, rowData, copyRowData, otherRowData); @@ -440,16 +442,19 @@ public void testArrayOfPrimitiveProjection() { GenericRowData rowDataNullOptionalFields = GenericRowData.of( StringData.fromString("row_id_value"), - new GenericArrayData(new Integer[] {1, null, 3})); + new GenericArrayData(new Integer[] {1, null, 3}), + null); GenericRowData copyRowDataNullOptionalFields = GenericRowData.of( StringData.fromString("row_id_value"), - new GenericArrayData(new Integer[] {1, null, 3})); + new GenericArrayData(new Integer[] {1, null, 3}), + null); // modify the map field value GenericRowData otherRowDataNullOptionalFields = GenericRowData.of( StringData.fromString("other_row_id_value"), - new GenericArrayData(new Integer[] {4, null, 6})); + new GenericArrayData(new Integer[] {4, null, 6}), + null); testEqualsAndHashCode( schema, idOnly,