Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
1a6c642
Implement high-performance sorted writing support in IcebergIO
atognolag May 7, 2026
094c721
Fix multiple-iteration anti-pattern in WriteGroupedRowsToFiles by ext…
atognolag May 7, 2026
f9dd27d
Refactor row sorting calls to be explicitly conditional on table.sort…
atognolag May 7, 2026
ca4ff74
Simplify sorting logic to keep call DRY and use straightforward 'rows…
atognolag May 7, 2026
df8dc69
Rename variable rows to sortedOrUnsortedRows to explicitly communicat…
atognolag May 7, 2026
9120fda
Fix sorted write PR comments: resolve null-ordering bugs and optimize…
atognolag May 7, 2026
75dcc4f
Fix Checkstyle check MissingDeprecated warning in IcebergRowSorter.java
atognolag May 7, 2026
e4c97bf
Remove unused deprecated encodeSortKey method in IcebergRowSorter.java
atognolag May 7, 2026
ecdaf7f
Finalize Hybrid Partitioning and Sorting Architecture in IcebergIO: e…
atognolag May 8, 2026
890fe76
Set HASH as the default distribution mode in IcebergIO and comprehens…
atognolag May 8, 2026
0cecd01
Add testRangeDistribution integration test case for RANGE distributio…
atognolag May 8, 2026
94fa738
Add Javadoc documentation for withAutosharding method in IcebergIO.java
atognolag May 8, 2026
f24183a
Update RANGE sharding code sample to use ID partitioning and document…
atognolag May 8, 2026
670aab1
Document the multi-dimensional distribution mode decision matrix in I…
atognolag May 8, 2026
acf0e56
Add Operational Impact column to distribution modes Javadoc table in …
atognolag May 8, 2026
e22d4dc
Fix ClassCastException in IcebergUtils string field copying by using …
atognolag May 8, 2026
1ab4fcf
Add comprehensive test scenarios for dynamic BigDecimal and Integer t…
atognolag May 8, 2026
1e19180
Add thorough test scenarios for Double, Boolean, and Null value to St…
atognolag May 8, 2026
4d20b11
Use catalog.buildTable to set SortOrder during dynamic table creation…
atognolag May 8, 2026
fca10f8
Adding a note and a warning
atognolag May 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdks/java/io/iceberg/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ dependencies {
implementation library.java.vendored_guava_32_1_2_jre
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(path: ":model:pipeline", configuration: "shadow")
implementation project(path: ":sdks:java:extensions:sorter")
implementation library.java.avro
implementation library.java.slf4j_api
implementation library.java.joda_time
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.iceberg;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Date;
import java.util.Iterator;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.extensions.sorter.BufferedExternalSorter;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.Row;
import org.apache.iceberg.NullOrder;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortDirection;
import org.apache.iceberg.SortField;
import org.apache.iceberg.SortOrder;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.ReadableInstant;

/**
* A utility class to sort Beam {@link Row}s based on an Iceberg {@link SortOrder}. Leverages {@link
* BufferedExternalSorter} to spill to local disk when elements exceed memory limit.
*/
class IcebergRowSorter implements Serializable {

public static Iterable<Row> sortRows(
Iterable<Row> rows,
SortOrder sortOrder,
Schema icebergSchema,
org.apache.beam.sdk.schemas.Schema beamSchema) {

if (sortOrder == null || !sortOrder.isSorted()) {
return rows;
}

BufferedExternalSorter.Options sorterOptions = BufferedExternalSorter.options();
BufferedExternalSorter sorter = BufferedExternalSorter.create(sorterOptions);
RowCoder rowCoder = RowCoder.of(beamSchema);

try {
for (Row row : rows) {
byte[] keyBytes = encodeSortKey(row, sortOrder, icebergSchema, beamSchema);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's to clear and reuse the output stream object instead of creating a new one each iteration

rowCoder.encode(row, baos);
byte[] valBytes = baos.toByteArray();
sorter.add(KV.of(keyBytes, valBytes));
}

Iterable<KV<byte[], byte[]>> sortedKVs = sorter.sort();
return new Iterable<Row>() {
@Override
public Iterator<Row> iterator() {
final Iterator<KV<byte[], byte[]>> it = sortedKVs.iterator();
return new Iterator<Row>() {
@Override
public boolean hasNext() {
return it.hasNext();
}

@Override
public Row next() {
KV<byte[], byte[]> next = it.next();
try {
ByteArrayInputStream bais = new ByteArrayInputStream(next.getValue());
return rowCoder.decode(bais);
} catch (IOException e) {
throw new RuntimeException("Failed to decode Row during sorting", e);
}
}
};
}
};

} catch (IOException e) {
throw new RuntimeException("Failed to sort rows with external sorter", e);
}
}

@SuppressWarnings("nullness")
public static byte[] encodeSortKey(
Row row,
SortOrder sortOrder,
Schema icebergSchema,
org.apache.beam.sdk.schemas.Schema beamSchema)
throws IOException {

ByteArrayOutputStream baos = new ByteArrayOutputStream();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing here, let's create ByteArrayOutputStream it in the caller function and pass it in here. Each call should reset and use the same one for efficiency


for (SortField field : sortOrder.fields()) {
String colName = icebergSchema.findColumnName(field.sourceId());
Comment thread
atognolag marked this conversation as resolved.
Outdated
Object val = row.getValue(colName);

if (!field.transform().isIdentity()) {
Object icebergVal =
IcebergUtils.beamRowToIcebergRecord(icebergSchema, row).getField(colName);
Comment thread
atognolag marked this conversation as resolved.
Outdated
if (icebergVal != null) {
val = field.transform().apply(icebergVal);
} else {
val = null;
}
}

boolean isNull = (val == null);
boolean isDesc = (field.direction() == SortDirection.DESC);
boolean nullsFirst = (field.nullOrder() == NullOrder.NULLS_FIRST);

// Determine correct header prefix to fulfill the NullOrder contracts
byte prefixByte;
if (isNull) {
if (isDesc) {
// Descending: High byte keys sort first.
// If Nulls First -> Null gets highest byte (0xFF)
// If Nulls Last -> Null gets lowest byte (0x00)
prefixByte = nullsFirst ? (byte) 0xFF : (byte) 0x00;
} else {
// Ascending: Low byte keys sort first.
// If Nulls First -> Null gets lowest byte (0x00)
// If Nulls Last -> Null gets highest byte (0xFF)
prefixByte = nullsFirst ? (byte) 0x00 : (byte) 0xFF;
}
} else {
if (isDesc) {
// If non-null and Descending, use a neutral value that sits opposite to the null byte
prefixByte = nullsFirst ? (byte) 0xFE : (byte) 0x01;
} else {
prefixByte = nullsFirst ? (byte) 0x01 : (byte) 0x00;
}
}
Comment thread
atognolag marked this conversation as resolved.
Outdated

baos.write(prefixByte);

if (!isNull) {
byte[] valBytes = encodeValue(val);
// Bitwise invert non-null bytes to sort descending lexicographically
if (isDesc) {
for (int i = 0; i < valBytes.length; i++) {
valBytes[i] = (byte) ~valBytes[i];
}
}
baos.write(valBytes);
}
}

return baos.toByteArray();
}

@SuppressWarnings("JavaUtilDate")
private static byte[] encodeValue(@Nullable Object val) throws IOException {
if (val == null) {
return new byte[0];
}
if (val instanceof String) {
return encodeString((String) val);
} else if (val instanceof Integer) {
int v = (Integer) val;
return ByteBuffer.allocate(4).putInt(v ^ Integer.MIN_VALUE).array();
} else if (val instanceof Long) {
long v = (Long) val;
return ByteBuffer.allocate(8).putLong(v ^ Long.MIN_VALUE).array();
} else if (val instanceof Float) {
int bits = Float.floatToIntBits((Float) val);
bits = (bits >= 0) ? (bits ^ Integer.MIN_VALUE) : ~bits;
return ByteBuffer.allocate(4).putInt(bits).array();
} else if (val instanceof Double) {
long bits = Double.doubleToLongBits((Double) val);
bits = (bits >= 0) ? (bits ^ Long.MIN_VALUE) : ~bits;
return ByteBuffer.allocate(8).putLong(bits).array();
Comment thread
atognolag marked this conversation as resolved.
Outdated
} else if (val instanceof Boolean) {
return new byte[] {((Boolean) val) ? (byte) 0x01 : (byte) 0x00};
} else if (val instanceof byte[]) {
return encodeByteArray((byte[]) val);
} else if (val instanceof ByteBuffer) {
return encodeByteArray(((ByteBuffer) val).array());
} else if (val instanceof ReadableInstant) {
long enc = ((ReadableInstant) val).getMillis() ^ Long.MIN_VALUE;
return ByteBuffer.allocate(8).putLong(enc).array();
} else if (val instanceof Instant) {
long enc = ((Instant) val).toEpochMilli() ^ Long.MIN_VALUE;
return ByteBuffer.allocate(8).putLong(enc).array();
} else if (val instanceof Date) {
long enc = ((Date) val).getTime() ^ Long.MIN_VALUE;
return ByteBuffer.allocate(8).putLong(enc).array();
}
Comment on lines +243 to +246
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Iceberg support sorting on complex types (lists, maps, structs) ?

We should either 1) add support for that or 2) throw an Unsupported Exception early on.

I'd rather throw an error than fall back on String because it may lead to unexpected sorting behavior for some types.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not supported. It now raises an UnsupportedOperationException


return encodeString(val.toString());
}

private static byte[] encodeString(String s) throws IOException {
byte[] bytes = s.getBytes(StandardCharsets.UTF_8);
return encodeByteArray(bytes);
}

/**
* Escape protocol to cleanly prevent collisions. Maps 0x00 -> [0x01, 0x01] Maps 0x01 -> [0x01,
* 0x02] Safely terminates sequence with 0x00.
*/
private static byte[] encodeByteArray(byte[] bytes) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream(bytes.length + 2);
for (byte b : bytes) {
if (b == 0x00) {
baos.write(0x01);
baos.write(0x01);
} else if (b == 0x01) {
baos.write(0x01);
baos.write(0x02);
} else {
baos.write(b);
}
}
baos.write(0x00); // Safe boundary delimiter
return baos.toByteArray();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
*/
package org.apache.beam.sdk.io.iceberg;

import java.util.Iterator;
import java.util.List;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
Expand All @@ -30,6 +32,7 @@
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;

Expand Down Expand Up @@ -101,11 +104,22 @@ public void processElement(
IcebergDestination destination = dynamicDestinations.instantiateDestination(tableIdentifier);
WindowedValue<IcebergDestination> windowedDestination =
WindowedValues.of(destination, window.maxTimestamp(), window, paneInfo);
Iterator<Row> rowIt = element.getValue().iterator();
if (!rowIt.hasNext()) {
return;
}
Row firstRow = rowIt.next();
Schema dataSchema = firstRow.getSchema();
Comment thread
atognolag marked this conversation as resolved.
Outdated

RecordWriterManager writer;
try (RecordWriterManager openWriter =
new RecordWriterManager(getCatalog(), filePrefix, maxFileSize, Integer.MAX_VALUE)) {
writer = openWriter;
for (Row e : element.getValue()) {
Table table = openWriter.getOrCreateTable(destination, dataSchema);
Iterable<Row> sortedRows =
IcebergRowSorter.sortRows(
element.getValue(), table.sortOrder(), table.schema(), dataSchema);
for (Row e : sortedRows) {
writer.write(windowedDestination, e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,10 @@ public void processElement(
RecordWriter writer =
new RecordWriter(table, destination.getFileFormat(), fileName, partitionData);
try {
for (Row row : element.getValue()) {
Iterable<Row> sortedRows =
IcebergRowSorter.sortRows(
element.getValue(), table.sortOrder(), table.schema(), dataSchema);
for (Row row : sortedRows) {
Record record = IcebergUtils.beamRowToIcebergRecord(table.schema(), row);
writer.write(record);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down Expand Up @@ -238,11 +239,16 @@ public void processElement(
WindowedValues.of(destination, window.maxTimestamp(), window, paneInfo);

// Attempt to write record. If the writer is saturated and cannot accept
// the record, spill it over to WriteGroupedRowsToFiles
boolean writeSuccess;
// the record, or if the target table is sorted, spill it over to WriteGroupedRowsToFiles
boolean writeSuccess = false;
try {
writeSuccess =
Preconditions.checkNotNull(recordWriterManager).write(windowedDestination, data);
Table table =
Preconditions.checkNotNull(recordWriterManager)
.getOrCreateTable(destination, data.getSchema());
if (!table.sortOrder().isSorted()) {
writeSuccess =
Preconditions.checkNotNull(recordWriterManager).write(windowedDestination, data);
}
} catch (Exception e) {
try {
Preconditions.checkNotNull(recordWriterManager).close();
Expand Down
Loading
Loading