diff --git a/sdks/java/io/iceberg/build.gradle b/sdks/java/io/iceberg/build.gradle index bbd55fee2fc8..141de355813c 100644 --- a/sdks/java/io/iceberg/build.gradle +++ b/sdks/java/io/iceberg/build.gradle @@ -46,6 +46,7 @@ dependencies { implementation library.java.vendored_guava_32_1_2_jre implementation project(path: ":sdks:java:core", configuration: "shadow") implementation project(path: ":model:pipeline", configuration: "shadow") + implementation project(path: ":sdks:java:extensions:sorter") implementation library.java.avro implementation library.java.slf4j_api implementation library.java.joda_time diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java index 475786d3a4f6..df2d2c5433ec 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/AssignDestinationsAndPartitions.java @@ -26,12 +26,14 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.ValueInSingleWindow; +import org.apache.iceberg.DistributionMode; import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -51,38 +53,65 @@ class AssignDestinationsAndPartitions private final DynamicDestinations dynamicDestinations; private final IcebergCatalogConfig catalogConfig; + private final DistributionMode distributionMode; + private final @Nullable SerializableFunction distributionFunction; + static final String DESTINATION = "destination"; static final String PARTITION = "partition"; + static final String SHARD = "shard"; static final org.apache.beam.sdk.schemas.Schema OUTPUT_SCHEMA = org.apache.beam.sdk.schemas.Schema.builder() .addStringField(DESTINATION) .addStringField(PARTITION) + .addNullableField(SHARD, org.apache.beam.sdk.schemas.Schema.FieldType.INT32) .build(); public AssignDestinationsAndPartitions( DynamicDestinations dynamicDestinations, IcebergCatalogConfig catalogConfig) { + this(dynamicDestinations, catalogConfig, DistributionMode.HASH, null); + } + + public AssignDestinationsAndPartitions( + DynamicDestinations dynamicDestinations, + IcebergCatalogConfig catalogConfig, + DistributionMode distributionMode, + @Nullable SerializableFunction distributionFunction) { this.dynamicDestinations = dynamicDestinations; this.catalogConfig = catalogConfig; + this.distributionMode = distributionMode; + this.distributionFunction = distributionFunction; } @Override public PCollection> expand(PCollection input) { return input - .apply(ParDo.of(new AssignDoFn(dynamicDestinations, catalogConfig))) + .apply( + ParDo.of( + new AssignDoFn( + dynamicDestinations, catalogConfig, distributionMode, distributionFunction))) .setCoder( KvCoder.of( RowCoder.of(OUTPUT_SCHEMA), RowCoder.of(dynamicDestinations.getDataSchema()))); } + @SuppressWarnings("nullness") static class AssignDoFn extends DoFn> { private transient @MonotonicNonNull Map partitionKeys; private transient @MonotonicNonNull Map wrappers; private final DynamicDestinations dynamicDestinations; private final IcebergCatalogConfig catalogConfig; + private final DistributionMode distributionMode; + private final @Nullable SerializableFunction distributionFunction; - AssignDoFn(DynamicDestinations dynamicDestinations, IcebergCatalogConfig catalogConfig) { + AssignDoFn( + DynamicDestinations dynamicDestinations, + IcebergCatalogConfig catalogConfig, + DistributionMode distributionMode, + @Nullable SerializableFunction distributionFunction) { this.dynamicDestinations = dynamicDestinations; this.catalogConfig = catalogConfig; + this.distributionMode = distributionMode; + this.distributionFunction = distributionFunction; } @Setup @@ -132,8 +161,17 @@ public void processElement( partitionKey.partition(wrapper.wrap(data)); String partitionPath = partitionKey.toPath(); + Integer shardId = null; + if (distributionMode == DistributionMode.RANGE && distributionFunction != null) { + shardId = distributionFunction.apply(data); + } + Row destAndPartition = - Row.withSchema(OUTPUT_SCHEMA).addValues(tableIdentifier, partitionPath).build(); + Row.withSchema(OUTPUT_SCHEMA) + .withFieldValue(DESTINATION, tableIdentifier) + .withFieldValue(PARTITION, partitionPath) + .withFieldValue(SHARD, shardId) + .build(); out.output(KV.of(destAndPartition, data)); } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java index a5a3beef8f51..9b18a77326e0 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java @@ -26,6 +26,7 @@ import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; @@ -384,7 +385,7 @@ public class IcebergIO { public static WriteRows writeRows(IcebergCatalogConfig catalog) { return new AutoValue_IcebergIO_WriteRows.Builder() .setCatalogConfig(catalog) - .setDistributionMode(DistributionMode.NONE) + .setDistributionMode(DistributionMode.HASH) .setAutoSharding(false) .build(); } @@ -404,6 +405,8 @@ public abstract static class WriteRows extends PTransform, Iceb abstract DistributionMode getDistributionMode(); + abstract @Nullable SerializableFunction getDistributionFunction(); + abstract boolean getAutoSharding(); abstract Builder toBuilder(); @@ -422,6 +425,8 @@ abstract static class Builder { abstract Builder setDistributionMode(DistributionMode mode); + abstract Builder setDistributionFunction(SerializableFunction shardFn); + abstract Builder setAutoSharding(boolean autoSharding); abstract WriteRows build(); @@ -457,19 +462,193 @@ public WriteRows withDirectWriteByteLimit(Integer directWriteByteLimit) { } /** - * Defines distribution of write data. Supported distributions: + * The default distribution mode is {@link DistributionMode#HASH}. + * + *

Warning on HASH mode: Utilizing {@code HASH} distribution mode (with or without + * auto-sharding) can suffer from large unpartitioned or skewed writes if key spaces are + * not uniformly distributed. This can bottleneck workers and produce fragmented layout files. + * + *

Note on RANGE mode: When utilizing {@code RANGE} distribution mode, it is + * recommended that the custom distribution function is designed to produce adequately sized and + * strictly non-overlapping ranges of the sorting column to optimize downstream read + * performance. + * + *

Comparison of Distribution Modes:

+ * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
Comparison of Distribution Modes
ModeDescriptionProsCons
{@link DistributionMode#NONE}No network shuffle is performed. Records are sorted locally on workers prior to writing.Highly lightweight with zero shuffle/network overhead. Best for smaller data volumes.Writers on different workers can write to overlapping min/max key ranges across multiple files. Relies heavily on post-fact compaction or query time merges.
{@link DistributionMode#HASH}Data is shuffled and consolidated by partition key. All records for a partition are routed to a single worker.Consolidates partition files, eliminating cross-worker file overlapping for partition keys. Excellent worker stability.Can suffer from severe data skew if a single partition contains significantly more data than others (hot partitions).
{@link DistributionMode#RANGE}Data is shuffled based on a user-provided shard/bucket function (e.g., hashing/binning continuous keys).Distributes writes for hot partitions across multiple workers. Eliminates skew while keeping file min/max key ranges tight and non-overlapping.Requires providing a custom {@link SerializableFunction} mapping rows to integer shard/bucket IDs.
+ * + *

Recommendation Matrix (Sorting & Partitioning vs. Scale):

+ * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
Recommendation Matrix
PartitioningSortingScale / VolumeLatency PriorityRecommended ModeOperational Impact
PartitionedSortedSmallAny{@link DistributionMode#HASH}Consolidates partition files and sorts them locally. Avoids file overlaps for small volumes.
PartitionedSortedMedium / LargeLow Write Latency{@link DistributionMode#NONE}Eliminates shuffle overhead for maximum write speed. Results in overlapping key ranges across files, which requires downstream compaction.
PartitionedSortedMedium / LargeLow Read Latency{@link DistributionMode#HASH} with auto-sharding OR {@link DistributionMode#RANGE}HASH with auto-sharding scales writes for hot partitions but can result in overlapping file ranges requiring query-time sort merges. RANGE sharding distributes hot partitions into sequential, non-overlapping files to optimize reads.
PartitionedUnsortedSmallAny{@link DistributionMode#HASH}Consolidates data files into single partition directories to prevent file fragmentation.
PartitionedUnsortedMedium / LargeAny{@link DistributionMode#HASH} with auto-shardingConsolidates partition files while dynamically balancing hot partition writes across parallel workers.
UnpartitionedSortedSmallAny{@link DistributionMode#NONE}Bypasses network shuffle for fast, low-volume local sorting.
UnpartitionedSortedMedium / LargeLow Write Latency{@link DistributionMode#NONE}Bypasses network shuffle for parallel worker writes. Requires downstream compaction to resolve overlapping file ranges.
UnpartitionedSortedMedium / LargeLow Read Latency{@link DistributionMode#RANGE} (with custom sharding function)Shards continuous keys into non-overlapping worker ranges. Eliminates single-worker bottlenecks and guarantees zero file overlap for fast queries.
UnpartitionedUnsortedAnyAny{@link DistributionMode#NONE}Direct, parallel worker writes with maximum throughput and zero network shuffle overhead.
+ * + *

Code Samples:

* - *
    - *
  1. {@link DistributionMode.NONE}: don't shuffle rows (default) - *
  2. {@link DistributionMode.HASH}: shuffle rows by partition key before writing data - *
+ *
{@code
+     * // 1. Using default HASH distribution mode (Consolidates by partition key)
+     * pipeline
+     *     .apply(Create.of(BEAM_ROWS))
+     *     .apply(IcebergIO.writeRows(catalogConfig)
+     *         .to(tableId));
      *
-     * {@link DistributionMode.RANGE} is not supported yet
+     * // 2. Using NONE distribution mode (No shuffle, local sorting only)
+     * pipeline
+     *     .apply(Create.of(BEAM_ROWS))
+     *     .apply(IcebergIO.writeRows(catalogConfig)
+     *         .to(tableId)
+     *         .withDistributionMode(DistributionMode.NONE));
+     *
+     * // 3. Using RANGE distribution mode with a custom shard/bucket function to avoid data skew
+     * pipeline
+     *     .apply(Create.of(BEAM_ROWS))
+     *     .apply(IcebergIO.writeRows(catalogConfig)
+     *         .to(tableId)
+     *         .withDistributionMode(DistributionMode.RANGE)
+     *         .withDistributionFunction(row -> {
+     *             // Group continuous IDs into 16 parallel, non-overlapping shards
+     *             long id = row.getInt64("id");
+     *             return (int) (id / 10000);
+     *         }));
+     * }
*/ public WriteRows withDistributionMode(DistributionMode mode) { return toBuilder().setDistributionMode(mode).build(); } + /** + * Sets the custom range-distribution function. + * + *

Only applicable when the distribution mode is set to {@link DistributionMode#RANGE}. The + * function maps a Beam {@link Row} to an Integer representing a shard/bucket ID. + */ + public WriteRows withDistributionFunction(SerializableFunction shardFn) { + return toBuilder().setDistributionFunction(shardFn).build(); + } + + /** + * Enables Beam's dynamic auto-sharding when using {@link DistributionMode#HASH}. + * + *

When enabled, the pipeline uses {@link + * org.apache.beam.sdk.transforms.GroupIntoBatches#withShardedKey()} under the hood. The runner + * (such as Dataflow) dynamically monitors throughput per partition key. If a partition is + * extremely hot, the runner automatically splits it into parallel sub-shards distributed across + * multiple workers to prevent single-worker bottlenecks and out-of-memory (OOM) errors, while + * keeping the number of data files for cold partitions minimal. + * + *

Note that because auto-sharding distributes hot-partition data randomly across worker + * shards, the written data files cannot guarantee non-overlapping key ranges. Downstream + * queries may require read-time sort merges for overlapping file segments until an Iceberg + * compaction job (e.g., `rewriteDataFiles`) is executed. + * + *

Only applicable when using {@link DistributionMode#HASH}. + */ public WriteRows withAutosharding() { return toBuilder().setAutoSharding(true).build(); } @@ -514,7 +693,30 @@ public IcebergWriteResult expand(PCollection input) { return input .apply( "AssignDestinationAndPartition", - new AssignDestinationsAndPartitions(destinations, getCatalogConfig())) + new AssignDestinationsAndPartitions( + destinations, + getCatalogConfig(), + getDistributionMode(), + getDistributionFunction())) + .apply( + "Write Rows to Partitions", + new WriteToPartitions( + getCatalogConfig(), + destinations, + getTriggeringFrequency(), + getAutoSharding())); + case RANGE: + Preconditions.checkArgument( + getDistributionFunction() != null, + "Must provide a distribution function when using RANGE distribution mode."); + return input + .apply( + "AssignDestinationAndPartitionWithRange", + new AssignDestinationsAndPartitions( + destinations, + getCatalogConfig(), + getDistributionMode(), + getDistributionFunction())) .apply( "Write Rows to Partitions", new WriteToPartitions( diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergRowSorter.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergRowSorter.java new file mode 100644 index 000000000000..6efc1bbe2eec --- /dev/null +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergRowSorter.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.extensions.sorter.BufferedExternalSorter; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.Row; +import org.apache.iceberg.NullOrder; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortDirection; +import org.apache.iceberg.SortField; +import org.apache.iceberg.SortOrder; +import org.joda.time.ReadableInstant; + +/** + * A utility class to sort Beam {@link Row}s based on an Iceberg {@link SortOrder}. Leverages {@link + * BufferedExternalSorter} to spill to local disk when elements exceed memory limit. + */ +class IcebergRowSorter implements Serializable { + + public static Iterable sortRows( + Iterable rows, + SortOrder sortOrder, + Schema icebergSchema, + org.apache.beam.sdk.schemas.Schema beamSchema) { + + if (sortOrder == null || !sortOrder.isSorted()) { + return rows; + } + + BufferedExternalSorter.Options sorterOptions = BufferedExternalSorter.options(); + BufferedExternalSorter sorter = BufferedExternalSorter.create(sorterOptions); + RowCoder rowCoder = RowCoder.of(beamSchema); + + List fields = sortOrder.fields(); + String[] columnNames = new String[fields.size()]; + for (int i = 0; i < fields.size(); i++) { + columnNames[i] = icebergSchema.findColumnName(fields.get(i).sourceId()); + } + + // Create reusable ByteArrayOutputStreams for key and value encoding + ByteArrayOutputStream keyBaos = new ByteArrayOutputStream(); + ByteArrayOutputStream valBaos = new ByteArrayOutputStream(); + + try { + for (Row row : rows) { + keyBaos.reset(); + valBaos.reset(); + encodeSortKey(row, sortOrder, columnNames, keyBaos, icebergSchema, beamSchema); + byte[] keyBytes = keyBaos.toByteArray(); + + rowCoder.encode(row, valBaos); + byte[] valBytes = valBaos.toByteArray(); + sorter.add(KV.of(keyBytes, valBytes)); + } + + Iterable> sortedKVs = sorter.sort(); + return new Iterable() { + @Override + public Iterator iterator() { + final Iterator> it = sortedKVs.iterator(); + return new Iterator() { + @Override + public boolean hasNext() { + return it.hasNext(); + } + + @Override + public Row next() { + KV next = it.next(); + try { + ByteArrayInputStream bais = new ByteArrayInputStream(next.getValue()); + return rowCoder.decode(bais); + } catch (IOException e) { + throw new RuntimeException("Failed to decode Row during sorting", e); + } + } + }; + } + }; + + } catch (IOException e) { + throw new RuntimeException("Failed to sort rows with external sorter", e); + } + } + + @SuppressWarnings("nullness") + public static void encodeSortKey( + Row row, + SortOrder sortOrder, + String[] columnNames, + ByteArrayOutputStream baos, + Schema icebergSchema, + org.apache.beam.sdk.schemas.Schema beamSchema) + throws IOException { + + List fields = sortOrder.fields(); + + for (int i = 0; i < fields.size(); i++) { + SortField field = fields.get(i); + String colName = columnNames[i]; + Object val = row.getValue(colName); + + if (!field.transform().isIdentity()) { + Object icebergVal = + IcebergUtils.beamValueToIcebergValue(icebergSchema.findType(field.sourceId()), val); + if (icebergVal != null) { + val = field.transform().apply(icebergVal); + } else { + val = null; + } + } + + boolean isNull = (val == null); + boolean isDesc = (field.direction() == SortDirection.DESC); + boolean nullsFirst = (field.nullOrder() == NullOrder.NULLS_FIRST); + + // Determine correct header prefix to fulfill the NullOrder contracts + byte prefixByte; + if (isNull) { + prefixByte = nullsFirst ? (byte) 0x00 : (byte) 0xFF; + } else { + prefixByte = nullsFirst ? (byte) 0x01 : (byte) 0x00; + } + + baos.write(prefixByte); + + if (!isNull) { + writeValue(val, baos, isDesc); + } + } + } + + private static void writeInt(int v, ByteArrayOutputStream baos, boolean invert) { + byte b3 = (byte) (v >>> 24); + byte b2 = (byte) (v >>> 16); + byte b1 = (byte) (v >>> 8); + byte b0 = (byte) v; + if (invert) { + baos.write(~b3); + baos.write(~b2); + baos.write(~b1); + baos.write(~b0); + } else { + baos.write(b3); + baos.write(b2); + baos.write(b1); + baos.write(b0); + } + } + + private static void writeLong(long v, ByteArrayOutputStream baos, boolean invert) { + byte b7 = (byte) (v >>> 56); + byte b6 = (byte) (v >>> 48); + byte b5 = (byte) (v >>> 40); + byte b4 = (byte) (v >>> 32); + byte b3 = (byte) (v >>> 24); + byte b2 = (byte) (v >>> 16); + byte b1 = (byte) (v >>> 8); + byte b0 = (byte) v; + if (invert) { + baos.write(~b7); + baos.write(~b6); + baos.write(~b5); + baos.write(~b4); + baos.write(~b3); + baos.write(~b2); + baos.write(~b1); + baos.write(~b0); + } else { + baos.write(b7); + baos.write(b6); + baos.write(b5); + baos.write(b4); + baos.write(b3); + baos.write(b2); + baos.write(b1); + baos.write(b0); + } + } + + @SuppressWarnings("JavaUtilDate") + private static void writeValue(Object val, ByteArrayOutputStream baos, boolean invert) + throws IOException { + if (val instanceof String) { + writeString((String) val, baos, invert); + } else if (val instanceof Integer) { + int v = (Integer) val; + writeInt(v ^ Integer.MIN_VALUE, baos, invert); + } else if (val instanceof Long) { + long v = (Long) val; + writeLong(v ^ Long.MIN_VALUE, baos, invert); + } else if (val instanceof Float) { + int bits = Float.floatToIntBits((Float) val); + bits = (bits >= 0) ? (bits ^ Integer.MIN_VALUE) : ~bits; + writeInt(bits, baos, invert); + } else if (val instanceof Double) { + long bits = Double.doubleToLongBits((Double) val); + bits = (bits >= 0) ? (bits ^ Long.MIN_VALUE) : ~bits; + writeLong(bits, baos, invert); + } else if (val instanceof Boolean) { + byte b = ((Boolean) val) ? (byte) 0x01 : (byte) 0x00; + baos.write(invert ? ~b : b); + } else if (val instanceof byte[]) { + writeByteArray((byte[]) val, baos, invert); + } else if (val instanceof ByteBuffer) { + writeByteArray(((ByteBuffer) val).array(), baos, invert); + } else if (val instanceof ReadableInstant) { + long enc = ((ReadableInstant) val).getMillis() ^ Long.MIN_VALUE; + writeLong(enc, baos, invert); + } else if (val instanceof Instant) { + long enc = ((Instant) val).toEpochMilli() ^ Long.MIN_VALUE; + writeLong(enc, baos, invert); + } else if (val instanceof Date) { + long enc = ((Date) val).getTime() ^ Long.MIN_VALUE; + writeLong(enc, baos, invert); + } else { + throw new UnsupportedOperationException( + "Unsupported type for sorting: " + val.getClass().getName()); + } + } + + private static void writeString(String s, ByteArrayOutputStream baos, boolean invert) + throws IOException { + byte[] bytes = s.getBytes(StandardCharsets.UTF_8); + writeByteArray(bytes, baos, invert); + } + + private static void writeByteArray(byte[] bytes, ByteArrayOutputStream baos, boolean invert) { + for (byte b : bytes) { + if (b == 0x00) { + baos.write(invert ? ~(byte) 0x01 : (byte) 0x01); + baos.write(invert ? ~(byte) 0x01 : (byte) 0x01); + } else if (b == 0x01) { + baos.write(invert ? ~(byte) 0x01 : (byte) 0x01); + baos.write(invert ? ~(byte) 0x02 : (byte) 0x02); + } else { + baos.write(invert ? ~b : b); + } + } + baos.write(invert ? ~(byte) 0x00 : (byte) 0x00); + } +} diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java index d0d24532ff39..7f48a0d0128c 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java @@ -296,6 +296,45 @@ public static org.apache.iceberg.Schema beamSchemaToIcebergSchema(final Schema s return new org.apache.iceberg.Schema(fields.toArray(new Types.NestedField[fields.size()])); } + /** + * Converts a Beam field value to its Iceberg-compatible equivalent based on the Iceberg {@link + * Type}. + */ + public static @Nullable Object beamValueToIcebergValue(Type type, @Nullable Object value) { + if (value == null) { + return null; + } + switch (type.typeId()) { + case BOOLEAN: + case INTEGER: + case LONG: + case FLOAT: + case DOUBLE: + case DATE: + case TIME: + case DECIMAL: + case STRING: + return value; + case TIMESTAMP: + Types.TimestampType ts = (Types.TimestampType) type.asPrimitiveType(); + return getIcebergTimestampValue(value, ts.shouldAdjustToUTC()); + case UUID: + if (value instanceof byte[]) { + return UUID.nameUUIDFromBytes((byte[]) value); + } + return value; + case BINARY: + if (value instanceof byte[]) { + return ByteBuffer.wrap((byte[]) value); + } + return value; + case FIXED: + throw new UnsupportedOperationException("Fixed-precision fields are not yet supported."); + default: + return value; + } + } + /** Converts a Beam {@link Row} to an Iceberg {@link Record}. */ public static Record beamRowToIcebergRecord(org.apache.iceberg.Schema schema, Row row) { if (row.getSchema().getFieldCount() != schema.columns().size()) { @@ -351,7 +390,10 @@ private static void copyFieldIntoRecord(Record rec, Types.NestedField field, Row rec.setField(name, getIcebergTimestampValue(val, ts.shouldAdjustToUTC())); break; case STRING: - Optional.ofNullable(value.getString(name)).ifPresent(v -> rec.setField(name, v)); + Object strVal = value.getValue(name); + if (strVal != null) { + rec.setField(name, strVal.toString()); + } break; case UUID: Optional.ofNullable(value.getBytes(name)) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java index 12d9570d4a38..95384dea1887 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteGroupedRowsToFiles.java @@ -18,6 +18,10 @@ package org.apache.beam.sdk.io.iceberg; import java.util.List; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.RowCoder; +import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -30,6 +34,7 @@ import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; @@ -56,10 +61,17 @@ class WriteGroupedRowsToFiles @Override public PCollection expand( PCollection, Iterable>> input) { + Schema dataSchema = + ((RowCoder) + ((IterableCoder) + ((KvCoder, Iterable>) input.getCoder()) + .getValueCoder()) + .getElemCoder()) + .getSchema(); return input.apply( ParDo.of( new WriteGroupedRowsToFilesDoFn( - catalogConfig, dynamicDestinations, maxBytesPerFile, filePrefix))); + catalogConfig, dynamicDestinations, maxBytesPerFile, filePrefix, dataSchema))); } private static class WriteGroupedRowsToFilesDoFn @@ -70,16 +82,19 @@ private static class WriteGroupedRowsToFilesDoFn private transient @MonotonicNonNull Catalog catalog; private final String filePrefix; private final long maxFileSize; + private final Schema dataSchema; WriteGroupedRowsToFilesDoFn( IcebergCatalogConfig catalogConfig, DynamicDestinations dynamicDestinations, long maxFileSize, - String filePrefix) { + String filePrefix, + Schema dataSchema) { this.catalogConfig = catalogConfig; this.dynamicDestinations = dynamicDestinations; this.filePrefix = filePrefix; this.maxFileSize = maxFileSize; + this.dataSchema = dataSchema; } private org.apache.iceberg.catalog.Catalog getCatalog() { @@ -101,11 +116,16 @@ public void processElement( IcebergDestination destination = dynamicDestinations.instantiateDestination(tableIdentifier); WindowedValue windowedDestination = WindowedValues.of(destination, window.maxTimestamp(), window, paneInfo); + RecordWriterManager writer; try (RecordWriterManager openWriter = new RecordWriterManager(getCatalog(), filePrefix, maxFileSize, Integer.MAX_VALUE)) { writer = openWriter; - for (Row e : element.getValue()) { + Table table = openWriter.getOrCreateTable(destination, dataSchema); + Iterable sortedOrUnsortedRows = + IcebergRowSorter.sortRows( + element.getValue(), table.sortOrder(), table.schema(), dataSchema); + for (Row e : sortedOrUnsortedRows) { writer.write(windowedDestination, e); } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java index 54ad120f1aca..0da0d4c5968c 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WritePartitionedRowsToFiles.java @@ -44,6 +44,7 @@ import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionKey; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.SortOrder; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; @@ -130,7 +131,10 @@ public void processElement( RecordWriter writer = new RecordWriter(table, destination.getFileFormat(), fileName, partitionData); try { - for (Row row : element.getValue()) { + Iterable sortedOrUnsortedRows = + IcebergRowSorter.sortRows( + element.getValue(), table.sortOrder(), table.schema(), dataSchema); + for (Row row : sortedOrUnsortedRows) { Record record = IcebergUtils.beamRowToIcebergRecord(table.schema(), row); writer.write(record); } @@ -240,14 +244,23 @@ LastRefreshedTable getOrCreateTable(IcebergDestination destination, Schema dataS } catch (NoSuchTableException e) { // Otherwise, create the table org.apache.iceberg.Schema tableSchema = IcebergUtils.beamSchemaToIcebergSchema(dataSchema); + SortOrder sortOrder = + createConfig != null ? createConfig.getSortOrder() : SortOrder.unsorted(); try { - table = catalog.createTable(identifier, tableSchema, partitionSpec, tableProperties); + table = + catalog + .buildTable(identifier, tableSchema) + .withPartitionSpec(partitionSpec) + .withSortOrder(sortOrder) + .withProperties(tableProperties) + .create(); LOG.info( "Created Iceberg table '{}' with schema: {}\n" - + ", partition spec: {}, table properties: {}", + + ", partition spec: {}, sort order: {}, table properties: {}", identifier, tableSchema, partitionSpec, + sortOrder, tableProperties); } catch (AlreadyExistsException ignored) { // race condition: another worker already created this table diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java index 1db6ede30165..4d2dd1fe3d0d 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/WriteUngroupedRowsToFiles.java @@ -47,6 +47,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.Nullable; @@ -238,11 +239,16 @@ public void processElement( WindowedValues.of(destination, window.maxTimestamp(), window, paneInfo); // Attempt to write record. If the writer is saturated and cannot accept - // the record, spill it over to WriteGroupedRowsToFiles - boolean writeSuccess; + // the record, or if the target table is sorted, spill it over to WriteGroupedRowsToFiles + boolean writeSuccess = false; try { - writeSuccess = - Preconditions.checkNotNull(recordWriterManager).write(windowedDestination, data); + Table table = + Preconditions.checkNotNull(recordWriterManager) + .getOrCreateTable(destination, data.getSchema()); + if (!table.sortOrder().isSorted()) { + writeSuccess = + Preconditions.checkNotNull(recordWriterManager).write(windowedDestination, data); + } } catch (Exception e) { try { Preconditions.checkNotNull(recordWriterManager).close(); diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java index 52d92911f4e4..294e66be6956 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java @@ -77,6 +77,7 @@ import org.apache.iceberg.DistributionMode; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.SortOrder; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; @@ -774,4 +775,136 @@ public void process(@Element KV>> sums) { .getCommitted(); assertEquals(5L, numWaves); } + + @Test + public void testRangeDistribution() { + assumeTrue(distributionMode.equals(HASH_WITH_AUTOSHARDING)); + + Schema schema = Schema.builder().addInt64Field("id").addStringField("name").build(); + + TableIdentifier tableId = + TableIdentifier.of("default", "range_" + Long.toString(UUID.randomUUID().hashCode(), 16)); + Map catalogProps = + ImmutableMap.builder() + .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) + .put("warehouse", warehouse.location) + .build(); + IcebergCatalogConfig catalog = + IcebergCatalogConfig.builder() + .setCatalogName("name") + .setCatalogProperties(catalogProps) + .build(); + + org.apache.iceberg.Schema icebergSchema = IcebergUtils.beamSchemaToIcebergSchema(schema); + catalog.catalog().createTable(tableId, icebergSchema, PartitionSpec.unpartitioned()); + + PCollection rows = + testPipeline + .apply(GenerateSequence.from(0).to(100)) + .apply( + "Make rows", + MapElements.into(TypeDescriptors.rows()) + .via(i -> Row.withSchema(schema).addValues(i, "name_" + i).build())) + .setRowSchema(schema); + + rows.apply( + "range distribution write", + IcebergIO.writeRows(catalog) + .to(tableId) + .withDistributionMode(DistributionMode.RANGE) + .withDistributionFunction(row -> (int) (row.getInt64("id") % 5))); + + testPipeline.run().waitUntilFinish(); + + Table table = warehouse.loadTable(tableId); + List writtenRecords = ImmutableList.copyOf(IcebergGenerics.read(table).build()); + assertEquals(100, writtenRecords.size()); + } + + @Test + public void testSortedWrite() { + TableIdentifier tableId = + TableIdentifier.of("default", "sorted_" + Long.toString(UUID.randomUUID().hashCode(), 16)); + + Map catalogProps = + ImmutableMap.builder() + .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) + .put("warehouse", warehouse.location) + .build(); + + IcebergCatalogConfig catalog = + IcebergCatalogConfig.builder() + .setCatalogName("name") + .setCatalogProperties(catalogProps) + .build(); + + Schema schema = Schema.builder().addInt64Field("id").addStringField("name").build(); + org.apache.iceberg.Schema icebergSchema = IcebergUtils.beamSchemaToIcebergSchema(schema); + + catalog + .catalog() + .buildTable(tableId, icebergSchema) + .withPartitionSpec(PartitionSpec.unpartitioned()) + .withSortOrder(SortOrder.builderFor(icebergSchema).asc("name").desc("id").build()) + .create(); + + List inputRows = + Arrays.asList( + Row.withSchema(schema).addValues(2L, "banana").build(), + Row.withSchema(schema).addValues(1L, "banana").build(), + Row.withSchema(schema).addValues(5L, "apple").build(), + Row.withSchema(schema).addValues(10L, "cherry").build()); + + testPipeline + .apply("Scrambled Input", Create.of(inputRows)) + .setRowSchema(schema) + .apply("Append Sorted To Table", writeTransform(catalog, tableId)); + + testPipeline.run().waitUntilFinish(); + + Table table = warehouse.loadTable(tableId); + List writtenRecords = ImmutableList.copyOf(IcebergGenerics.read(table).build()); + + assertEquals(4, writtenRecords.size()); + + try { + assertFilesAreInternallySorted(table); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void assertFilesAreInternallySorted(Table table) throws Exception { + for (org.apache.iceberg.FileScanTask task : table.newScan().planFiles()) { + String path = task.file().path().toString(); + try (org.apache.iceberg.io.CloseableIterable reader = + org.apache.iceberg.parquet.Parquet.read(table.io().newInputFile(path)) + .project(table.schema()) + .createReaderFunc(org.apache.iceberg.data.parquet.GenericParquetReaders::buildReader) + .build()) { + List records = + org.apache.commons.compress.utils.Lists.newArrayList(reader.iterator()); + assertTrue("File must have at least one record", records.size() > 0); + + for (int i = 1; i < records.size(); i++) { + Record prev = records.get(i - 1); + Record curr = records.get(i); + + String prevName = (String) prev.getField("name"); + String currName = (String) curr.getField("name"); + + int cmpName = prevName.compareTo(currName); + if (cmpName > 0) { + throw new AssertionError("File not sorted by name ASC: " + prevName + " > " + currName); + } else if (cmpName == 0) { + long prevId = (Long) prev.getField("id"); + long currId = (Long) curr.getField("id"); + if (prevId < currId) { + throw new AssertionError("File not sorted by id DESC: " + prevId + " < " + currId); + } + } + } + } + } + } } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergRowSorterTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergRowSorterTest.java new file mode 100644 index 000000000000..6eb26e6de4b5 --- /dev/null +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergRowSorterTest.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.iceberg; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayOutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Random; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.values.Row; +import org.apache.iceberg.NullOrder; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.types.Types; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class IcebergRowSorterTest { + + private static final Schema BEAM_SCHEMA = + Schema.builder() + .addInt32Field("id") + .addNullableField("name", Schema.FieldType.STRING) + .addNullableField("value", Schema.FieldType.DOUBLE) + .addNullableField("active", Schema.FieldType.BOOLEAN) + .build(); + + private static final org.apache.iceberg.Schema ICEBERG_SCHEMA = + new org.apache.iceberg.Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "name", Types.StringType.get()), + Types.NestedField.optional(3, "value", Types.DoubleType.get()), + Types.NestedField.optional(4, "active", Types.BooleanType.get())); + + private static final Comparator BYTE_ARR_COMPARATOR = + org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.UnsignedBytes + .lexicographicalComparator(); + + private static byte[] encodeSortKeyHelper(Row row, SortOrder sortOrder) throws Exception { + java.util.List fields = sortOrder.fields(); + String[] columnNames = new String[fields.size()]; + for (int i = 0; i < fields.size(); i++) { + columnNames[i] = ICEBERG_SCHEMA.findColumnName(fields.get(i).sourceId()); + } + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + IcebergRowSorter.encodeSortKey(row, sortOrder, columnNames, baos, ICEBERG_SCHEMA, BEAM_SCHEMA); + return baos.toByteArray(); + } + + @Test + public void testStringKeyEncodingOrder() throws Exception { + SortOrder sortOrder = SortOrder.builderFor(ICEBERG_SCHEMA).asc("name").build(); + + Row r1 = Row.withSchema(BEAM_SCHEMA).addValues(1, "apple", 1.5, true).build(); + Row r2 = Row.withSchema(BEAM_SCHEMA).addValues(2, "banana", 2.0, true).build(); + Row r3 = Row.withSchema(BEAM_SCHEMA).addValues(3, "apricot", 3.0, false).build(); + + byte[] k1 = encodeSortKeyHelper(r1, sortOrder); + byte[] k2 = encodeSortKeyHelper(r2, sortOrder); + byte[] k3 = encodeSortKeyHelper(r3, sortOrder); + + assertTrue(BYTE_ARR_COMPARATOR.compare(k1, k2) < 0); // apple < banana + assertTrue(BYTE_ARR_COMPARATOR.compare(k1, k3) < 0); // apple < apricot + assertTrue(BYTE_ARR_COMPARATOR.compare(k3, k2) < 0); // apricot < banana + } + + @Test + public void testStringCollisionProofing() throws Exception { + SortOrder sortOrder = SortOrder.builderFor(ICEBERG_SCHEMA).asc("name").asc("value").build(); + + Row r1 = Row.withSchema(BEAM_SCHEMA).addValues(1, "abc", 1.0, true).build(); + Row r2 = Row.withSchema(BEAM_SCHEMA).addValues(2, "abcdef", null, true).build(); + + byte[] k1 = encodeSortKeyHelper(r1, sortOrder); + byte[] k2 = encodeSortKeyHelper(r2, sortOrder); + + assertTrue(BYTE_ARR_COMPARATOR.compare(k1, k2) < 0); + } + + @Test + public void testDescInversion() throws Exception { + SortOrder sortOrderAsc = SortOrder.builderFor(ICEBERG_SCHEMA).asc("id").build(); + SortOrder sortOrderDesc = SortOrder.builderFor(ICEBERG_SCHEMA).desc("id").build(); + + Row r1 = Row.withSchema(BEAM_SCHEMA).addValues(10, "test", 1.5, true).build(); + Row r2 = Row.withSchema(BEAM_SCHEMA).addValues(20, "test", 2.0, true).build(); + + byte[] k1Asc = encodeSortKeyHelper(r1, sortOrderAsc); + byte[] k2Asc = encodeSortKeyHelper(r2, sortOrderAsc); + + byte[] k1Desc = encodeSortKeyHelper(r1, sortOrderDesc); + byte[] k2Desc = encodeSortKeyHelper(r2, sortOrderDesc); + + assertTrue(BYTE_ARR_COMPARATOR.compare(k1Asc, k2Asc) < 0); + assertTrue(BYTE_ARR_COMPARATOR.compare(k1Desc, k2Desc) > 0); + } + + @Test + public void testNullOrderingMatrix() throws Exception { + Row rNonNull = Row.withSchema(BEAM_SCHEMA).addValues(1, "apple", 1.5, true).build(); + Row rNull = Row.withSchema(BEAM_SCHEMA).addValues(2, null, 2.0, true).build(); + + // 1. ASC, NULLS_FIRST + SortOrder ascFirst = + SortOrder.builderFor(ICEBERG_SCHEMA).asc("name", NullOrder.NULLS_FIRST).build(); + byte[] kNonNullAscFirst = encodeSortKeyHelper(rNonNull, ascFirst); + byte[] kNullAscFirst = encodeSortKeyHelper(rNull, ascFirst); + assertTrue( + "ASC NULLS_FIRST failed: null should sort before non-null", + BYTE_ARR_COMPARATOR.compare(kNullAscFirst, kNonNullAscFirst) < 0); + + // 2. ASC, NULLS_LAST + SortOrder ascLast = + SortOrder.builderFor(ICEBERG_SCHEMA).asc("name", NullOrder.NULLS_LAST).build(); + byte[] kNonNullAscLast = encodeSortKeyHelper(rNonNull, ascLast); + byte[] kNullAscLast = encodeSortKeyHelper(rNull, ascLast); + assertTrue( + "ASC NULLS_LAST failed: null should sort after non-null", + BYTE_ARR_COMPARATOR.compare(kNullAscLast, kNonNullAscLast) > 0); + + // 3. DESC, NULLS_FIRST + SortOrder descFirst = + SortOrder.builderFor(ICEBERG_SCHEMA).desc("name", NullOrder.NULLS_FIRST).build(); + byte[] kNonNullDescFirst = encodeSortKeyHelper(rNonNull, descFirst); + byte[] kNullDescFirst = encodeSortKeyHelper(rNull, descFirst); + assertTrue( + "DESC NULLS_FIRST failed: null should sort before non-null", + BYTE_ARR_COMPARATOR.compare(kNullDescFirst, kNonNullDescFirst) < 0); + + // 4. DESC, NULLS_LAST + SortOrder descLast = + SortOrder.builderFor(ICEBERG_SCHEMA).desc("name", NullOrder.NULLS_LAST).build(); + byte[] kNonNullDescLast = encodeSortKeyHelper(rNonNull, descLast); + byte[] kNullDescLast = encodeSortKeyHelper(rNull, descLast); + assertTrue( + "DESC NULLS_LAST failed: null should sort after non-null", + BYTE_ARR_COMPARATOR.compare(kNullDescLast, kNonNullDescLast) > 0); + } + + @Test + public void testEndToEndSorting() { + SortOrder sortOrder = SortOrder.builderFor(ICEBERG_SCHEMA).asc("name").desc("id").build(); + + List input = + Arrays.asList( + Row.withSchema(BEAM_SCHEMA).addValues(2, "banana", 2.0, true).build(), + Row.withSchema(BEAM_SCHEMA).addValues(1, "banana", 1.0, true).build(), + Row.withSchema(BEAM_SCHEMA).addValues(5, "apple", 1.5, true).build(), + Row.withSchema(BEAM_SCHEMA).addValues(10, "cherry", 3.0, false).build()); + + Iterable sorted = IcebergRowSorter.sortRows(input, sortOrder, ICEBERG_SCHEMA, BEAM_SCHEMA); + List sortedList = + StreamSupport.stream(sorted.spliterator(), false).collect(Collectors.toList()); + + assertEquals(4, sortedList.size()); + + assertEquals("apple", sortedList.get(0).getString("name")); + assertEquals(Integer.valueOf(5), sortedList.get(0).getInt32("id")); + + assertEquals("banana", sortedList.get(1).getString("name")); + assertEquals(Integer.valueOf(2), sortedList.get(1).getInt32("id")); + + assertEquals("banana", sortedList.get(2).getString("name")); + assertEquals(Integer.valueOf(1), sortedList.get(2).getInt32("id")); + + assertEquals("cherry", sortedList.get(3).getString("name")); + assertEquals(Integer.valueOf(10), sortedList.get(3).getInt32("id")); + } + + @Test + public void testScaleAndExternalDiskSpill() { + SortOrder sortOrder = SortOrder.builderFor(ICEBERG_SCHEMA).asc("id").build(); + + int count = 5000; + List input = new ArrayList<>(count); + Random rand = new Random(42); + + for (int i = 0; i < count; i++) { + int randomId = rand.nextInt(100_000); + input.add(Row.withSchema(BEAM_SCHEMA).addValues(randomId, "item" + i, 1.0, true).build()); + } + + Iterable sorted = IcebergRowSorter.sortRows(input, sortOrder, ICEBERG_SCHEMA, BEAM_SCHEMA); + List sortedList = + StreamSupport.stream(sorted.spliterator(), false).collect(Collectors.toList()); + + assertEquals(count, sortedList.size()); + + for (int i = 0; i < sortedList.size() - 1; i++) { + int idCurrent = sortedList.get(i).getInt32("id"); + int idNext = sortedList.get(i + 1).getInt32("id"); + assertTrue( + String.format("Sort violation at index %d: %d > %d", i, idCurrent, idNext), + idCurrent <= idNext); + } + } + + @Test + public void testUnsupportedComplexTypeSorting() { + org.apache.iceberg.Schema mapSchema = + new org.apache.iceberg.Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional( + 2, + "attributes", + Types.MapType.ofOptional(3, 4, Types.StringType.get(), Types.StringType.get()))); + + assertThrows( + org.apache.iceberg.exceptions.ValidationException.class, + () -> SortOrder.builderFor(mapSchema).asc("attributes").build()); + } +} diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java index c9026522dba3..d6b2bf11370e 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergUtilsTest.java @@ -269,6 +269,42 @@ public void testMapOfRecords() { IcebergUtils.beamRowToIcebergRecord(RECORD_MAP_ICEBERG_SCHEMA, ROW_MAP_OF_ROWS); assertEquals(RECORD_MAP_OF_RECORDS, actual); } + + @Test + public void testBigDecimalToStringConversion() { + BigDecimal num = new BigDecimal("987654321.123456789"); + checkRowValueToRecordValue( + Schema.FieldType.DECIMAL, num, Types.StringType.get(), "987654321.123456789"); + } + + @Test + public void testIntegerToStringConversion() { + checkRowValueToRecordValue(Schema.FieldType.INT32, 42, Types.StringType.get(), "42"); + } + + @Test + public void testDoubleToStringConversion() { + checkRowValueToRecordValue( + Schema.FieldType.DOUBLE, 3.14159, Types.StringType.get(), "3.14159"); + } + + @Test + public void testBooleanToStringConversion() { + checkRowValueToRecordValue(Schema.FieldType.BOOLEAN, true, Types.StringType.get(), "true"); + } + + @Test + public void testNullStringConversion() { + Schema beamSchema = + Schema.of(Schema.Field.of("v", Schema.FieldType.STRING).withNullable(true)); + Row row = Row.withSchema(beamSchema).addValue(null).build(); + + org.apache.iceberg.Schema icebergSchema = + new org.apache.iceberg.Schema(optional(0, "v", Types.StringType.get())); + Record record = IcebergUtils.beamRowToIcebergRecord(icebergSchema, row); + + assertEquals(null, record.getField("v")); + } } @RunWith(JUnit4.class)