Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.collect.Maps;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -59,7 +60,7 @@
/// - `BINARY` → `byte[]`
/// - `DATE` → [LocalDate] via [LocalDate#ofEpochDay] (TZ-independent, calendar-date semantics)
/// - `TIMESTAMP` / `TIMESTAMP_INSTANT` → [Timestamp] preserving full sub-second nanos from [TimestampColumnVector]
/// - `LIST<X>` → `Object[]` (null elements preserved; empty list surfaces as empty `Object[]`)
/// - `LIST<X>` → `List<Object>` (null elements preserved; empty list surfaces as an empty list)
/// - `MAP<K, V>` → `Map<Object, Object>`
/// - `STRUCT<...>` → `Map<String, Object>`
/// - any nullable column with `isNull[rowId]` set → `null`
Expand Down Expand Up @@ -140,9 +141,9 @@ private static Object extractValue(String field, ColumnVector columnVector, Type
ListColumnVector listColumnVector = (ListColumnVector) columnVector;
int offset = (int) listColumnVector.offsets[rowId];
int length = (int) listColumnVector.lengths[rowId];
Object[] values = new Object[length];
List<Object> values = new ArrayList<>(length);
for (int j = 0; j < length; j++) {
values[j] = extractValue(field, listColumnVector.child, childType, offset + j);
values.add(extractValue(field, listColumnVector.child, childType, offset + j));
}
return values;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning List here changes the RecordExtractor/GenericRow multi-value contract from Object[] to List. This PR only patches a few ingestion call sites, but many existing consumers still cast MV values to Object[] or test for instanceof Object[], so ORC rows can still break outside the narrow segment-build path. Please keep the extractor output as Object[] and add any user-facing List adaptation at a higher layer instead of changing the reader contract.

}
Expand Down Expand Up @@ -191,7 +192,8 @@ private static Object extractSingleValue(String field, ColumnVector columnVector
TypeDescription.Category category) {
switch (category) {
case BOOLEAN:
return ((LongColumnVector) columnVector).vector[rowId] == 1;
// ORC booleans are stored in a LongColumnVector as 0 / 1.
return Boolean.valueOf(((LongColumnVector) columnVector).vector[rowId] != 0);
case BYTE:
case SHORT:
case INT:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.hadoop.hive.common.type.HiveDecimal;
Expand Down Expand Up @@ -167,9 +169,10 @@ public void testDecimalExtractedAsBigDecimal() {

// === Complex types ===

@Test
public void testListOfIntsExtractedAsArray() {
Object[] result = (Object[]) extract("array<int>", batch -> {
@Test
public void testListOfIntsExtractedAsList() {
@SuppressWarnings("unchecked")
List<Object> result = (List<Object>) extract("array<int>", batch -> {
ListColumnVector list = (ListColumnVector) batch.cols[0];
LongColumnVector child = (LongColumnVector) list.child;
child.ensureSize(3, false);
Expand All @@ -179,13 +182,14 @@ public void testListOfIntsExtractedAsArray() {
child.vector[1] = 20;
child.vector[2] = 30;
});
assertEquals(result, new Object[]{10, 20, 30});
assertEquals(result, List.of(10, 20, 30));
}

@Test
public void testListPreservesNullElements() {
// The contract requires multi-value shape preservation: null elements stay null in `Object[]`.
Object[] result = (Object[]) extract("array<int>", batch -> {
// Null elements stay null in the returned list (`List.of` cannot express this).
@SuppressWarnings("unchecked")
List<Object> result = (List<Object>) extract("array<int>", batch -> {
ListColumnVector list = (ListColumnVector) batch.cols[0];
LongColumnVector child = (LongColumnVector) list.child;
child.ensureSize(3, false);
Expand All @@ -196,18 +200,18 @@ public void testListPreservesNullElements() {
child.vector[0] = 10;
child.vector[2] = 30;
});
assertEquals(result, new Object[]{10, null, 30});
assertEquals(result, Arrays.asList(10, null, 30));
}

@Test
public void testEmptyListExtractedAsEmptyArray() {
// The contract requires shape preservation: an empty list surfaces as an empty `Object[]`, not `null`.
Object[] result = (Object[]) extract("array<int>", batch -> {
public void testEmptyListExtractedAsEmptyList() {
@SuppressWarnings("unchecked")
List<Object> result = (List<Object>) extract("array<int>", batch -> {
ListColumnVector list = (ListColumnVector) batch.cols[0];
list.offsets[0] = 0;
list.lengths[0] = 0;
});
assertEquals(result, new Object[]{});
assertEquals(result, List.of());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.PrimaryKey;
import org.apache.pinot.spi.data.readers.RecordReaderUtils;
import org.apache.pinot.spi.stream.StreamMessageMetadata;
import org.apache.pinot.spi.utils.BooleanUtils;
import org.apache.pinot.spi.utils.ByteArray;
Expand Down Expand Up @@ -831,7 +832,7 @@ private void validateLengthOfMVColumns(GenericRow row)
continue;
}

Object[] values = (Object[]) row.getValue(entry.getKey());
Object[] values = RecordReaderUtils.toObjectArray(row.getValue(entry.getKey()));
// Note that max chunk capacity is derived from "FixedByteMVMutableForwardIndex._maxNumberOfMultiValuesPerRow"
// which is set to "1000" in "ForwardIndexType.MAX_MULTI_VALUES_PER_ROW". If the number of values in the
// multi-value entry that we are attempting to ingest is greater than the maximum accepted value, we throw an
Expand Down Expand Up @@ -859,7 +860,7 @@ private void updateDictionary(GenericRow row) {
if (indexContainer._fieldSpec.isSingleValueField()) {
indexContainer._dictId = dictionary.index(value);
} else {
indexContainer._dictIds = dictionary.index((Object[]) value);
indexContainer._dictIds = dictionary.index(RecordReaderUtils.toObjectArray(value));
}

// Update min/max value from dictionary
Expand Down Expand Up @@ -989,7 +990,7 @@ private void addNewRow(int docId, GenericRow row) {

int[] dictIds = indexContainer._dictIds;
indexContainer._valuesInfo.updateVarByteMVMaxRowLengthInBytes(value, dataType.getStoredType());
Object[] values = (Object[]) value;
Object[] values = RecordReaderUtils.toObjectArray(value);
for (Map.Entry<IndexType, MutableIndex> indexEntry : indexContainer._mutableIndexes.entrySet()) {
try {
MutableIndex mutableIndex = indexEntry.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
package org.apache.pinot.segment.local.recordtransformer;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.pinot.segment.local.utils.SpecialValueTransformerUtils;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReaderUtils;
import org.apache.pinot.spi.recordtransformer.RecordTransformer;


Expand Down Expand Up @@ -78,6 +80,10 @@ public void transform(GenericRow record) {
if (transformedValues != value) {
record.putValue(column, transformedValues);
}
} else if (value instanceof List) {
Object[] values = RecordReaderUtils.toObjectArray(value);
Object[] transformedValues = SpecialValueTransformerUtils.transformValues(values);
record.putValue(column, transformedValues);
} else if (value != null) {
// Single-valued column.
Object transformedValue = SpecialValueTransformerUtils.transformValue(value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.readers.ColumnReader;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReaderUtils;
import org.roaringbitmap.RoaringBitmap;


Expand Down Expand Up @@ -60,7 +61,7 @@ public void indexRow(GenericRow row)
if (fieldSpec.isSingleValueField()) {
indexSingleValueRow(dictionaryCreator, columnValueToIndex, creatorsByIndex);
} else {
indexMultiValueRow(dictionaryCreator, (Object[]) columnValueToIndex, creatorsByIndex);
indexMultiValueRow(dictionaryCreator, RecordReaderUtils.toObjectArray(columnValueToIndex), creatorsByIndex);
}
} catch (JsonParseException jpe) {
throw new ColumnJsonParserException(columnName, jpe);
Expand Down Expand Up @@ -172,7 +173,8 @@ public void indexColumn(String columnName, ColumnReader columnReader)
if (fieldSpec.isSingleValueField()) {
indexSingleValueRow(dictionaryCreator, reuseColumnValueToIndex, creatorsByIndex);
} else {
indexMultiValueRow(dictionaryCreator, (Object[]) reuseColumnValueToIndex, creatorsByIndex);
indexMultiValueRow(dictionaryCreator, RecordReaderUtils.toObjectArray(reuseColumnValueToIndex),
creatorsByIndex);
}
} catch (JsonParseException jpe) {
throw new ColumnJsonParserException(columnName, jpe);
Expand All @@ -197,7 +199,7 @@ private void indexColumnValue(PinotSegmentColumnReader colReader,
if (fieldSpec.isSingleValueField()) {
indexSingleValueRow(dictionaryCreator, columnValueToIndex, creatorsByIndex);
} else {
indexMultiValueRow(dictionaryCreator, (Object[]) columnValueToIndex, creatorsByIndex);
indexMultiValueRow(dictionaryCreator, RecordReaderUtils.toObjectArray(columnValueToIndex), creatorsByIndex);
}

if (nullVec != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.collect.Maps;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.pinot.segment.spi.creator.ColumnStatistics;
import org.apache.pinot.segment.spi.creator.SegmentPreIndexStatsCollector;
Expand All @@ -29,6 +30,7 @@
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReaderUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -85,10 +87,16 @@ public ColumnStatistics getColumnProfileFor(String column) {

@Override
public void collectRow(GenericRow row) {
Schema schema = _statsCollectorConfig.getSchema();
for (Map.Entry<String, ColumnStatistics> entry : _columnStatisticsMap.entrySet()) {
String column = entry.getKey();
ColumnStatistics columnStatistics = entry.getValue();
((AbstractColumnStatisticsCollector) columnStatistics).collect(row.getValue(column));
Object value = row.getValue(column);
FieldSpec fieldSpec = schema.getFieldSpecFor(column);
if (!fieldSpec.isSingleValueField() && value instanceof List) {
value = RecordReaderUtils.toObjectArray(value);
}
((AbstractColumnStatisticsCollector) columnStatistics).collect(value);
}
_totalDocCount++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.List;
import java.util.zip.GZIPInputStream;


Expand Down Expand Up @@ -88,4 +89,19 @@ public static boolean isGZippedFile(File file)
}
return magic == GZIPInputStream.GZIP_MAGIC;
}

/**
* Normalizes a multi-value column read from a {@link RecordReader} to {@code Object[]}. Most readers already
* return {@code Object[]}; some (e.g. ORC {@code LIST}) return a {@link List}.
*/
public static Object[] toObjectArray(Object multiValue) {
if (multiValue instanceof Object[]) {
return (Object[]) multiValue;
}
if (multiValue instanceof List) {
return ((List<?>) multiValue).toArray();
}
throw new IllegalArgumentException(
"Multi-value column must be Object[] or List, got: " + multiValue.getClass().getName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,11 @@ protected void checkValue(RecordReader recordReader, List<Map<String, Object>> e
if (fieldSpec.isSingleValueField()) {
assertValueEquals(actualRecord.getValue(fieldSpecName), expectedRecord.get(fieldSpecName));
} else {
Object[] actualRecords = (Object[]) actualRecord.getValue(fieldSpecName);
List expectedRecords = (List) expectedRecord.get(fieldSpecName);
List<?> expectedRecords = (List<?>) expectedRecord.get(fieldSpecName);
if (expectedRecords != null) {
Object actualMv = actualRecord.getValue(fieldSpecName);
Assert.assertNotNull(actualMv, "Multi-value column " + fieldSpecName + " is null but expected non-null");
Object[] actualRecords = RecordReaderUtils.toObjectArray(actualMv);
Assert.assertEquals(actualRecords.length, expectedRecords.size());
for (int j = 0; j < actualRecords.length; j++) {
assertValueEquals(actualRecords[j], expectedRecords.get(j));
Expand Down
Loading