Skip to content
Draft
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
1a6c642
Implement high-performance sorted writing support in IcebergIO
atognolag May 7, 2026
094c721
Fix multiple-iteration anti-pattern in WriteGroupedRowsToFiles by ext…
atognolag May 7, 2026
f9dd27d
Refactor row sorting calls to be explicitly conditional on table.sort…
atognolag May 7, 2026
ca4ff74
Simplify sorting logic to keep call DRY and use straightforward 'rows…
atognolag May 7, 2026
df8dc69
Rename variable rows to sortedOrUnsortedRows to explicitly communicat…
atognolag May 7, 2026
9120fda
Fix sorted write PR comments: resolve null-ordering bugs and optimize…
atognolag May 7, 2026
75dcc4f
Fix Checkstyle check MissingDeprecated warning in IcebergRowSorter.java
atognolag May 7, 2026
e4c97bf
Remove unused deprecated encodeSortKey method in IcebergRowSorter.java
atognolag May 7, 2026
ecdaf7f
Finalize Hybrid Partitioning and Sorting Architecture in IcebergIO: e…
atognolag May 8, 2026
890fe76
Set HASH as the default distribution mode in IcebergIO and comprehens…
atognolag May 8, 2026
0cecd01
Add testRangeDistribution integration test case for RANGE distributio…
atognolag May 8, 2026
94fa738
Add Javadoc documentation for withAutosharding method in IcebergIO.java
atognolag May 8, 2026
f24183a
Update RANGE sharding code sample to use ID partitioning and document…
atognolag May 8, 2026
670aab1
Document the multi-dimensional distribution mode decision matrix in I…
atognolag May 8, 2026
acf0e56
Add Operational Impact column to distribution modes Javadoc table in …
atognolag May 8, 2026
e22d4dc
Fix ClassCastException in IcebergUtils string field copying by using …
atognolag May 8, 2026
1ab4fcf
Add comprehensive test scenarios for dynamic BigDecimal and Integer t…
atognolag May 8, 2026
1e19180
Add thorough test scenarios for Double, Boolean, and Null value to St…
atognolag May 8, 2026
4d20b11
Use catalog.buildTable to set SortOrder during dynamic table creation…
atognolag May 8, 2026
fca10f8
Adding a note and a warning
atognolag May 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdks/java/io/iceberg/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ dependencies {
implementation library.java.vendored_guava_32_1_2_jre
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation project(path: ":sdks:java:extensions:sorter")
implementation library.java.avro
implementation library.java.slf4j_api
implementation library.java.joda_time
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.iceberg;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.extensions.sorter.BufferedExternalSorter;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.Row;
import org.apache.iceberg.NullOrder;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortDirection;
import org.apache.iceberg.SortField;
import org.apache.iceberg.SortOrder;
import org.joda.time.ReadableInstant;

/**
* A utility class to sort Beam {@link Row}s based on an Iceberg {@link SortOrder}. Leverages {@link
* BufferedExternalSorter} to spill to local disk when elements exceed memory limit.
*/
class IcebergRowSorter implements Serializable {

public static Iterable<Row> sortRows(
Iterable<Row> rows,
SortOrder sortOrder,
Schema icebergSchema,
org.apache.beam.sdk.schemas.Schema beamSchema) {

if (sortOrder == null || !sortOrder.isSorted()) {
return rows;
}

BufferedExternalSorter.Options sorterOptions = BufferedExternalSorter.options();
BufferedExternalSorter sorter = BufferedExternalSorter.create(sorterOptions);
RowCoder rowCoder = RowCoder.of(beamSchema);

List<SortField> fields = sortOrder.fields();
String[] columnNames = new String[fields.size()];
for (int i = 0; i < fields.size(); i++) {
columnNames[i] = icebergSchema.findColumnName(fields.get(i).sourceId());
}

try {
for (Row row : rows) {
byte[] keyBytes = encodeSortKey(row, sortOrder, columnNames, icebergSchema, beamSchema);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's to clear and reuse the output stream object instead of creating a new one each iteration

rowCoder.encode(row, baos);
byte[] valBytes = baos.toByteArray();
sorter.add(KV.of(keyBytes, valBytes));
}

Iterable<KV<byte[], byte[]>> sortedKVs = sorter.sort();
return new Iterable<Row>() {
@Override
public Iterator<Row> iterator() {
final Iterator<KV<byte[], byte[]>> it = sortedKVs.iterator();
return new Iterator<Row>() {
@Override
public boolean hasNext() {
return it.hasNext();
}

@Override
public Row next() {
KV<byte[], byte[]> next = it.next();
try {
ByteArrayInputStream bais = new ByteArrayInputStream(next.getValue());
return rowCoder.decode(bais);
} catch (IOException e) {
throw new RuntimeException("Failed to decode Row during sorting", e);
}
}
};
}
};

} catch (IOException e) {
throw new RuntimeException("Failed to sort rows with external sorter", e);
}
}

@SuppressWarnings("nullness")
public static byte[] encodeSortKey(
Row row,
SortOrder sortOrder,
String[] columnNames,
Schema icebergSchema,
org.apache.beam.sdk.schemas.Schema beamSchema)
throws IOException {

ByteArrayOutputStream baos = new ByteArrayOutputStream();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing here, let's create ByteArrayOutputStream it in the caller function and pass it in here. Each call should reset and use the same one for efficiency

List<SortField> fields = sortOrder.fields();
org.apache.iceberg.data.Record icebergRecord = null;

for (int i = 0; i < fields.size(); i++) {
SortField field = fields.get(i);
String colName = columnNames[i];
Object val = row.getValue(colName);

if (!field.transform().isIdentity()) {
if (icebergRecord == null) {
icebergRecord = IcebergUtils.beamRowToIcebergRecord(icebergSchema, row);
}
Object icebergVal = icebergRecord.getField(colName);
if (icebergVal != null) {
val = field.transform().apply(icebergVal);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty heavy to extract the transformed value. Will make writes pretty slow

Can we apply the transform to the Beam field object directly? If not, maybe we need a "beam object to iceberg object" conversion. IcebergUtils.copyFieldIntoRecord does something similar, but we may need to refactor it a little to fit this use case.

} else {
val = null;
}
}

boolean isNull = (val == null);
boolean isDesc = (field.direction() == SortDirection.DESC);
boolean nullsFirst = (field.nullOrder() == NullOrder.NULLS_FIRST);

// Determine correct header prefix to fulfill the NullOrder contracts
byte prefixByte;
if (isNull) {
prefixByte = nullsFirst ? (byte) 0x00 : (byte) 0xFF;
} else {
prefixByte = nullsFirst ? (byte) 0x01 : (byte) 0x00;
}

baos.write(prefixByte);

if (!isNull) {
writeValue(val, baos, isDesc);
}
}

return baos.toByteArray();
}

private static void writeInt(int v, ByteArrayOutputStream baos, boolean invert) {
byte b3 = (byte) (v >>> 24);
byte b2 = (byte) (v >>> 16);
byte b1 = (byte) (v >>> 8);
byte b0 = (byte) v;
if (invert) {
baos.write(~b3);
baos.write(~b2);
baos.write(~b1);
baos.write(~b0);
} else {
baos.write(b3);
baos.write(b2);
baos.write(b1);
baos.write(b0);
}
}

private static void writeLong(long v, ByteArrayOutputStream baos, boolean invert) {
byte b7 = (byte) (v >>> 56);
byte b6 = (byte) (v >>> 48);
byte b5 = (byte) (v >>> 40);
byte b4 = (byte) (v >>> 32);
byte b3 = (byte) (v >>> 24);
byte b2 = (byte) (v >>> 16);
byte b1 = (byte) (v >>> 8);
byte b0 = (byte) v;
if (invert) {
baos.write(~b7);
baos.write(~b6);
baos.write(~b5);
baos.write(~b4);
baos.write(~b3);
baos.write(~b2);
baos.write(~b1);
baos.write(~b0);
} else {
baos.write(b7);
baos.write(b6);
baos.write(b5);
baos.write(b4);
baos.write(b3);
baos.write(b2);
baos.write(b1);
baos.write(b0);
}
}

@SuppressWarnings("JavaUtilDate")
private static void writeValue(Object val, ByteArrayOutputStream baos, boolean invert)
throws IOException {
if (val instanceof String) {
writeString((String) val, baos, invert);
} else if (val instanceof Integer) {
int v = (Integer) val;
writeInt(v ^ Integer.MIN_VALUE, baos, invert);
} else if (val instanceof Long) {
long v = (Long) val;
writeLong(v ^ Long.MIN_VALUE, baos, invert);
} else if (val instanceof Float) {
int bits = Float.floatToIntBits((Float) val);
bits = (bits >= 0) ? (bits ^ Integer.MIN_VALUE) : ~bits;
writeInt(bits, baos, invert);
} else if (val instanceof Double) {
long bits = Double.doubleToLongBits((Double) val);
bits = (bits >= 0) ? (bits ^ Long.MIN_VALUE) : ~bits;
writeLong(bits, baos, invert);
} else if (val instanceof Boolean) {
byte b = ((Boolean) val) ? (byte) 0x01 : (byte) 0x00;
baos.write(invert ? ~b : b);
} else if (val instanceof byte[]) {
writeByteArray((byte[]) val, baos, invert);
} else if (val instanceof ByteBuffer) {
writeByteArray(((ByteBuffer) val).array(), baos, invert);
} else if (val instanceof ReadableInstant) {
long enc = ((ReadableInstant) val).getMillis() ^ Long.MIN_VALUE;
writeLong(enc, baos, invert);
} else if (val instanceof Instant) {
long enc = ((Instant) val).toEpochMilli() ^ Long.MIN_VALUE;
writeLong(enc, baos, invert);
} else if (val instanceof Date) {
long enc = ((Date) val).getTime() ^ Long.MIN_VALUE;
writeLong(enc, baos, invert);
} else {
writeString(val.toString(), baos, invert);
}
Comment on lines +243 to +246
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Iceberg support sorting on complex types (lists, maps, structs) ?

We should either 1) add support for that or 2) throw an Unsupported Exception early on.

I'd rather throw an error than fall back on String because it may lead to unexpected sorting behavior for some types.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not supported. It now raises an UnsupportedOperationException

}

private static void writeString(String s, ByteArrayOutputStream baos, boolean invert)
throws IOException {
byte[] bytes = s.getBytes(StandardCharsets.UTF_8);
writeByteArray(bytes, baos, invert);
}

private static void writeByteArray(byte[] bytes, ByteArrayOutputStream baos, boolean invert) {
for (byte b : bytes) {
if (b == 0x00) {
baos.write(invert ? ~(byte) 0x01 : (byte) 0x01);
baos.write(invert ? ~(byte) 0x01 : (byte) 0x01);
} else if (b == 0x01) {
baos.write(invert ? ~(byte) 0x01 : (byte) 0x01);
baos.write(invert ? ~(byte) 0x02 : (byte) 0x02);
} else {
baos.write(invert ? ~b : b);
}
}
baos.write(invert ? ~(byte) 0x00 : (byte) 0x00);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
package org.apache.beam.sdk.io.iceberg;

import java.util.List;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
Expand All @@ -30,6 +34,7 @@
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;

Expand All @@ -56,10 +61,17 @@ class WriteGroupedRowsToFiles
@Override
public PCollection<FileWriteResult> expand(
PCollection<KV<ShardedKey<String>, Iterable<Row>>> input) {
Schema dataSchema =
((RowCoder)
((IterableCoder<Row>)
((KvCoder<ShardedKey<String>, Iterable<Row>>) input.getCoder())
.getValueCoder())
.getElemCoder())
.getSchema();
return input.apply(
ParDo.of(
new WriteGroupedRowsToFilesDoFn(
catalogConfig, dynamicDestinations, maxBytesPerFile, filePrefix)));
catalogConfig, dynamicDestinations, maxBytesPerFile, filePrefix, dataSchema)));
}

private static class WriteGroupedRowsToFilesDoFn
Expand All @@ -70,16 +82,19 @@ private static class WriteGroupedRowsToFilesDoFn
private transient @MonotonicNonNull Catalog catalog;
private final String filePrefix;
private final long maxFileSize;
private final Schema dataSchema;

WriteGroupedRowsToFilesDoFn(
IcebergCatalogConfig catalogConfig,
DynamicDestinations dynamicDestinations,
long maxFileSize,
String filePrefix) {
String filePrefix,
Schema dataSchema) {
this.catalogConfig = catalogConfig;
this.dynamicDestinations = dynamicDestinations;
this.filePrefix = filePrefix;
this.maxFileSize = maxFileSize;
this.dataSchema = dataSchema;
}

private org.apache.iceberg.catalog.Catalog getCatalog() {
Expand All @@ -101,11 +116,16 @@ public void processElement(
IcebergDestination destination = dynamicDestinations.instantiateDestination(tableIdentifier);
WindowedValue<IcebergDestination> windowedDestination =
WindowedValues.of(destination, window.maxTimestamp(), window, paneInfo);

RecordWriterManager writer;
try (RecordWriterManager openWriter =
new RecordWriterManager(getCatalog(), filePrefix, maxFileSize, Integer.MAX_VALUE)) {
writer = openWriter;
for (Row e : element.getValue()) {
Table table = openWriter.getOrCreateTable(destination, dataSchema);
Iterable<Row> sortedOrUnsortedRows =
IcebergRowSorter.sortRows(
element.getValue(), table.sortOrder(), table.schema(), dataSchema);
for (Row e : sortedOrUnsortedRows) {
writer.write(windowedDestination, e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,10 @@ public void processElement(
RecordWriter writer =
new RecordWriter(table, destination.getFileFormat(), fileName, partitionData);
try {
for (Row row : element.getValue()) {
Iterable<Row> sortedOrUnsortedRows =
IcebergRowSorter.sortRows(
element.getValue(), table.sortOrder(), table.schema(), dataSchema);
for (Row row : sortedOrUnsortedRows) {
Comment on lines +134 to +137
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should only be sorting if the user asked us to.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IcebergRowSorter.sortRows() will only sort if the table schema defines any sort configuration or else, it does nothing.

Record record = IcebergUtils.beamRowToIcebergRecord(table.schema(), row);
writer.write(record);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down Expand Up @@ -238,11 +239,16 @@ public void processElement(
WindowedValues.of(destination, window.maxTimestamp(), window, paneInfo);

// Attempt to write record. If the writer is saturated and cannot accept
// the record, spill it over to WriteGroupedRowsToFiles
boolean writeSuccess;
// the record, or if the target table is sorted, spill it over to WriteGroupedRowsToFiles
boolean writeSuccess = false;
try {
writeSuccess =
Preconditions.checkNotNull(recordWriterManager).write(windowedDestination, data);
Table table =
Preconditions.checkNotNull(recordWriterManager)
.getOrCreateTable(destination, data.getSchema());
if (!table.sortOrder().isSorted()) {
writeSuccess =
Preconditions.checkNotNull(recordWriterManager).write(windowedDestination, data);
}
} catch (Exception e) {
try {
Preconditions.checkNotNull(recordWriterManager).close();
Expand Down
Loading
Loading