Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions flink/v1.20/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
implementation project(':iceberg-hive-metastore')

compileOnly libs.flink120.avro
compileOnly libs.joda.time
// dropwizard histogram metrics (optional in Flink)
compileOnly libs.flink120.metrics.dropwizard
compileOnly libs.flink120.streaming.java
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,17 @@ public Type visit(TimeType timeType) {

@Override
public Type visit(TimestampType timestampType) {
if (timestampType.getPrecision() > 6) {
return Types.TimestampNanoType.withoutZone();
}
return Types.TimestampType.withoutZone();
}

@Override
public Type visit(LocalZonedTimestampType localZonedTimestampType) {
if (localZonedTimestampType.getPrecision() > 6) {
return Types.TimestampNanoType.withZone();
}
return Types.TimestampType.withZone();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,19 +114,35 @@ private static PositionalGetter<?> buildGetter(LogicalType logicalType, Type typ

case TIMESTAMP_WITHOUT_TIME_ZONE:
TimestampType timestampType = (TimestampType) logicalType;
return (row, pos) -> {
LocalDateTime localDateTime =
row.getTimestamp(pos, timestampType.getPrecision()).toLocalDateTime();
return DateTimeUtil.microsFromTimestamp(localDateTime);
};
if (type.typeId() == Type.TypeID.TIMESTAMP_NANO) {
return (row, pos) -> {
LocalDateTime localDateTime =
row.getTimestamp(pos, timestampType.getPrecision()).toLocalDateTime();
return DateTimeUtil.nanosFromTimestamp(localDateTime);
};
} else {
return (row, pos) -> {
LocalDateTime localDateTime =
row.getTimestamp(pos, timestampType.getPrecision()).toLocalDateTime();
return DateTimeUtil.microsFromTimestamp(localDateTime);
};
}

case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
LocalZonedTimestampType lzTs = (LocalZonedTimestampType) logicalType;
return (row, pos) -> {
TimestampData timestampData = row.getTimestamp(pos, lzTs.getPrecision());
return timestampData.getMillisecond() * 1000
+ timestampData.getNanoOfMillisecond() / 1000;
};
if (type.typeId() == Type.TypeID.TIMESTAMP_NANO) {
return (row, pos) -> {
TimestampData timestampData = row.getTimestamp(pos, lzTs.getPrecision());
return timestampData.getMillisecond() * 1_000_000L
+ timestampData.getNanoOfMillisecond();
};
} else {
return (row, pos) -> {
TimestampData timestampData = row.getTimestamp(pos, lzTs.getPrecision());
return timestampData.getMillisecond() * 1000L
+ timestampData.getNanoOfMillisecond() / 1000;
};
}

case ROW:
RowType rowType = (RowType) logicalType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ public OrcValueReader<?> primitive(Type.PrimitiveType iPrimitive, TypeDescriptio
} else {
return FlinkOrcReaders.timestamps();
}
case TIMESTAMP_NANO:
Types.TimestampNanoType timestampNanoType = (Types.TimestampNanoType) iPrimitive;
if (timestampNanoType.shouldAdjustToUTC()) {
return FlinkOrcReaders.timestampTzs();
} else {
return FlinkOrcReaders.timestamps();
}
case STRING:
return FlinkOrcReaders.strings();
case UUID:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,13 @@ public OrcValueWriter<?> primitive(Type.PrimitiveType iPrimitive, LogicalType fl
} else {
return FlinkOrcWriters.timestamps();
}
case TIMESTAMP_NANO:
Types.TimestampNanoType timestampNanoType = (Types.TimestampNanoType) iPrimitive;
if (timestampNanoType.shouldAdjustToUTC()) {
return FlinkOrcWriters.timestampNanoTzs();
} else {
return FlinkOrcWriters.timestampNanos();
}
case STRING:
return FlinkOrcWriters.strings();
case UUID:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ static OrcValueWriter<TimestampData> timestampTzs() {
return TimestampTzWriter.INSTANCE;
}

static OrcValueWriter<TimestampData> timestampNanos() {
return TimestampNanoWriter.INSTANCE;
}

static OrcValueWriter<TimestampData> timestampNanoTzs() {
return TimestampNanoTzWriter.INSTANCE;
}

static OrcValueWriter<DecimalData> decimals(int precision, int scale) {
if (precision <= 18) {
return new Decimal18Writer(precision, scale);
Expand Down Expand Up @@ -170,6 +178,35 @@ public void nonNullWrite(int rowId, TimestampData data, ColumnVector output) {
}
}

private static class TimestampNanoWriter implements OrcValueWriter<TimestampData> {
private static final TimestampNanoWriter INSTANCE = new TimestampNanoWriter();

@Override
public void nonNullWrite(int rowId, TimestampData data, ColumnVector output) {
TimestampColumnVector cv = (TimestampColumnVector) output;
cv.setIsUTC(true);
// millis
OffsetDateTime offsetDateTime = data.toInstant().atOffset(ZoneOffset.UTC);
cv.time[rowId] =
offsetDateTime.toEpochSecond() * 1_000 + offsetDateTime.getNano() / 1_000_000;
cv.nanos[rowId] = offsetDateTime.getNano();
}
}

private static class TimestampNanoTzWriter implements OrcValueWriter<TimestampData> {
private static final TimestampNanoTzWriter INSTANCE = new TimestampNanoTzWriter();

@SuppressWarnings("JavaInstantGetSecondsGetNano")
@Override
public void nonNullWrite(int rowId, TimestampData data, ColumnVector output) {
TimestampColumnVector cv = (TimestampColumnVector) output;
// millis
Instant instant = data.toInstant();
cv.time[rowId] = instant.toEpochMilli();
cv.nanos[rowId] = instant.getNano();
}
}

private static class Decimal18Writer implements OrcValueWriter<DecimalData> {
private final int precision;
private final int scale;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public static Object convertConstant(Type type, Object value) {
return (int) ((Long) value / 1000);
case TIMESTAMP: // TimestampData
return TimestampData.fromLocalDateTime(DateTimeUtil.timestampFromMicros((Long) value));
case TIMESTAMP_NANO:
return TimestampData.fromLocalDateTime(DateTimeUtil.timestampFromNanos((Long) value));
case UUID:
return UUIDUtil.convert((UUID) value);
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ByteBuffers;
import org.apache.iceberg.util.DateTimeUtil;

@Internal
public class StructRowData implements RowData {
Expand Down Expand Up @@ -185,8 +186,27 @@ private BigDecimal getDecimalInternal(int pos) {

@Override
public TimestampData getTimestamp(int pos, int precision) {
if (precision > 6) {
Object timeVal = struct.get(pos, Object.class);
if (timeVal instanceof OffsetDateTime) {
OffsetDateTime odt = (OffsetDateTime) timeVal;
return TimestampData.fromEpochMillis(
odt.toInstant().toEpochMilli(), odt.getNano() % 1_000_000);
} else if (timeVal instanceof LocalDateTime) {
LocalDateTime ldt = (LocalDateTime) timeVal;
return TimestampData.fromEpochMillis(
ldt.toInstant(ZoneOffset.UTC).toEpochMilli(), ldt.getNano() % 1_000_000);
} else if (timeVal instanceof Long) {
long timeLong = (Long) timeVal;
return TimestampData.fromEpochMillis(
Math.floorDiv(timeLong, 1_000_000L), (int) Math.floorMod(timeLong, 1_000_000L));
} else {
throw new IllegalStateException("Unknown type for timestamp_ns: " + timeVal.getClass());
}
}
long timeLong = getLong(pos);
return TimestampData.fromEpochMillis(timeLong / 1000, (int) (timeLong % 1000) * 1000);
return TimestampData.fromEpochMillis(
Math.floorDiv(timeLong, 1000L), (int) Math.floorMod(timeLong, 1000L) * 1000);
}

@Override
Expand Down Expand Up @@ -257,9 +277,29 @@ private Object convertValue(Type elementType, Object value) {
case DECIMAL:
return value;
case TIMESTAMP:
long millisecond = (long) value / 1000;
int nanoOfMillisecond = (int) ((Long) value % 1000) * 1000;
return TimestampData.fromEpochMillis(millisecond, nanoOfMillisecond);
long timeMillis;
if (value instanceof LocalDateTime localDateTime) {
timeMillis = DateTimeUtil.microsFromTimestamp(localDateTime) / 1000L;
} else if (value instanceof OffsetDateTime offsetDateTime) {
timeMillis = DateTimeUtil.microsFromTimestamptz(offsetDateTime) / 1000L;
} else {
timeMillis = Math.floorDiv((Long) value, 1000L);
}
return TimestampData.fromEpochMillis(
timeMillis,
(int) Math.floorMod(value instanceof Long ? (Long) value : timeMillis * 1000L, 1000L)
* 1000);
case TIMESTAMP_NANO:
long nanoLong;
if (value instanceof LocalDateTime localDateTime) {
nanoLong = DateTimeUtil.nanosFromTimestamp(localDateTime);
} else if (value instanceof OffsetDateTime offsetDateTime) {
nanoLong = DateTimeUtil.nanosFromTimestamptz(offsetDateTime);
} else {
nanoLong = (Long) value;
}
return TimestampData.fromEpochMillis(
Math.floorDiv(nanoLong, 1_000_000L), (int) Math.floorMod(nanoLong, 1_000_000L));
case STRING:
return StringData.fromString(value.toString());
case FIXED:
Expand Down
Loading
Loading