Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.IntPredicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -127,14 +128,26 @@
final List<Integer> rowIndexList = generateRowIndexList(originTimestampColumn);
this.timestampColumn = rowIndexList.stream().mapToLong(i -> originTimestampColumn[i]).toArray();

final MeasurementSchema[] originMeasurementSchemaList = insertRowNode.getMeasurementSchemas();
final String[] originColumnNameStringList = insertRowNode.getMeasurements();
final TsTableColumnCategory[] originColumnCategories = insertRowNode.getColumnCategories();
final TSDataType[] originValueDataTypes = insertRowNode.getDataTypes();
final Object[] originValues = insertRowNode.getValues();

generateColumnIndexMapper(
insertRowNode.getMeasurements(), originColumnIndex2FilteredColumnIndexMapperList);
originColumnNameStringList, originColumnIndex2FilteredColumnIndexMapperList);

final int filteredColumnSize =
Arrays.stream(originColumnIndex2FilteredColumnIndexMapperList)
.filter(Objects::nonNull)
.toArray()
.length;
compactColumnIndexMapper(
originColumnIndex2FilteredColumnIndexMapperList,
i ->
isValidOriginColumn(
originColumnNameStringList,
originMeasurementSchemaList,
originValueDataTypes,
i)
&& originValues != null
&& i < originValues.length);

this.measurementSchemaList = new MeasurementSchema[filteredColumnSize];
this.columnNameStringList = new String[filteredColumnSize];
Expand All @@ -143,19 +156,15 @@
this.valueColumns = new Object[filteredColumnSize];
this.nullValueColumnBitmaps = new BitMap[filteredColumnSize];

final MeasurementSchema[] originMeasurementSchemaList = insertRowNode.getMeasurementSchemas();
final String[] originColumnNameStringList = insertRowNode.getMeasurements();
final TsTableColumnCategory[] originColumnCategories = insertRowNode.getColumnCategories();
final TSDataType[] originValueDataTypes = insertRowNode.getDataTypes();
final Object[] originValues = insertRowNode.getValues();

for (int i = 0; i < originColumnIndex2FilteredColumnIndexMapperList.length; i++) {
if (originColumnIndex2FilteredColumnIndexMapperList[i] != null) {
final int filteredColumnIndex = originColumnIndex2FilteredColumnIndexMapperList[i];
this.measurementSchemaList[filteredColumnIndex] = originMeasurementSchemaList[i];
this.columnNameStringList[filteredColumnIndex] = originColumnNameStringList[i];
this.valueColumnTypes[filteredColumnIndex] =
originColumnCategories != null && originColumnCategories[i] != null
originColumnCategories != null
&& i < originColumnCategories.length
&& originColumnCategories[i] != null
? originColumnCategories[i].toTsFileColumnType()
: ColumnCategory.FIELD;
this.valueColumnDataTypes[filteredColumnIndex] = originValueDataTypes[i];
Expand Down Expand Up @@ -202,14 +211,29 @@
final List<Integer> rowIndexList = generateRowIndexList(originTimestampColumn);
this.timestampColumn = rowIndexList.stream().mapToLong(i -> originTimestampColumn[i]).toArray();

final MeasurementSchema[] originMeasurementSchemaList =
insertTabletNode.getMeasurementSchemas();
final String[] originColumnNameStringList = insertTabletNode.getMeasurements();
final TsTableColumnCategory[] originColumnCategories = insertTabletNode.getColumnCategories();
final TSDataType[] originValueColumnDataTypes = insertTabletNode.getDataTypes();
final Object[] originValueColumns = insertTabletNode.getColumns();
final BitMap[] originBitMapList = insertTabletNode.getBitMaps();

generateColumnIndexMapper(
insertTabletNode.getMeasurements(), originColumnIndex2FilteredColumnIndexMapperList);
originColumnNameStringList, originColumnIndex2FilteredColumnIndexMapperList);

final int filteredColumnSize =
Arrays.stream(originColumnIndex2FilteredColumnIndexMapperList)
.filter(Objects::nonNull)
.toArray()
.length;
compactColumnIndexMapper(
originColumnIndex2FilteredColumnIndexMapperList,
i ->
isValidOriginColumn(
originColumnNameStringList,
originMeasurementSchemaList,
originValueColumnDataTypes,
i)
&& originValueColumns != null
&& i < originValueColumns.length
&& originValueColumns[i] != null);

this.measurementSchemaList = new MeasurementSchema[filteredColumnSize];
this.columnNameStringList = new String[filteredColumnSize];
Expand All @@ -218,21 +242,15 @@
this.valueColumns = new Object[filteredColumnSize];
this.nullValueColumnBitmaps = new BitMap[filteredColumnSize];

final MeasurementSchema[] originMeasurementSchemaList =
insertTabletNode.getMeasurementSchemas();
final String[] originColumnNameStringList = insertTabletNode.getMeasurements();
final TsTableColumnCategory[] originColumnCategories = insertTabletNode.getColumnCategories();
final TSDataType[] originValueColumnDataTypes = insertTabletNode.getDataTypes();
final Object[] originValueColumns = insertTabletNode.getColumns();
final BitMap[] originBitMapList = insertTabletNode.getBitMaps();

for (int i = 0; i < originColumnIndex2FilteredColumnIndexMapperList.length; i++) {
if (originColumnIndex2FilteredColumnIndexMapperList[i] != null) {
final int filteredColumnIndex = originColumnIndex2FilteredColumnIndexMapperList[i];
this.measurementSchemaList[filteredColumnIndex] = originMeasurementSchemaList[i];
this.columnNameStringList[filteredColumnIndex] = originColumnNameStringList[i];
this.valueColumnTypes[filteredColumnIndex] =
originColumnCategories != null && originColumnCategories[i] != null
originColumnCategories != null
&& i < originColumnCategories.length
&& originColumnCategories[i] != null
? originColumnCategories[i].toTsFileColumnType()
: ColumnCategory.FIELD;
this.valueColumnDataTypes[filteredColumnIndex] = originValueColumnDataTypes[i];
Expand Down Expand Up @@ -298,10 +316,9 @@
originMeasurementList, originColumnIndex2FilteredColumnIndexMapperList);

final int filteredColumnSize =
Arrays.stream(originColumnIndex2FilteredColumnIndexMapperList)
.filter(Objects::nonNull)
.toArray()
.length;
compactColumnIndexMapper(
originColumnIndex2FilteredColumnIndexMapperList,
i -> isValidOriginColumn(originMeasurementSchemaList, i));

this.measurementSchemaList = new MeasurementSchema[filteredColumnSize];
this.columnNameStringList = new String[filteredColumnSize];
Expand Down Expand Up @@ -373,6 +390,46 @@
final String[] originMeasurementList,
final Integer[] originColumnIndex2FilteredColumnIndexMapperList);

private static int compactColumnIndexMapper(
final Integer[] originColumnIndex2FilteredColumnIndexMapperList,
final IntPredicate columnValidator) {
int filteredCount = 0;
for (int i = 0; i < originColumnIndex2FilteredColumnIndexMapperList.length; i++) {
if (originColumnIndex2FilteredColumnIndexMapperList[i] != null && columnValidator.test(i)) {
originColumnIndex2FilteredColumnIndexMapperList[i] = filteredCount++;
} else {
originColumnIndex2FilteredColumnIndexMapperList[i] = null;
}
}
return filteredCount;
}

private static boolean isValidOriginColumn(
final String[] originColumnNameStringList,
final MeasurementSchema[] originMeasurementSchemaList,
final TSDataType[] originValueDataTypes,
final int index) {
return originColumnNameStringList != null
&& index < originColumnNameStringList.length
&& originColumnNameStringList[index] != null
&& originMeasurementSchemaList != null
&& index < originMeasurementSchemaList.length
&& originMeasurementSchemaList[index] != null
&& originMeasurementSchemaList[index].getType() != null
&& originValueDataTypes != null
&& index < originValueDataTypes.length
&& originValueDataTypes[index] != null;
}

private static boolean isValidOriginColumn(
final List<IMeasurementSchema> originMeasurementSchemaList, final int index) {
return originMeasurementSchemaList != null
&& index < originMeasurementSchemaList.size()
&& originMeasurementSchemaList.get(index) != null
&& originMeasurementSchemaList.get(index).getMeasurementName() != null
&& originMeasurementSchemaList.get(index).getType() != null;
}

private List<Integer> generateRowIndexList(final long[] originTimestampColumn) {
final int rowCount = originTimestampColumn.length;
if (Objects.isNull(sourceEvent) || !sourceEvent.shouldParseTime()) {
Expand Down Expand Up @@ -403,7 +460,7 @@
return IntStream.range(0, rowCount).boxed().collect(Collectors.toList());
}

private static Object filterValueColumnsByRowIndexList(

Check warning on line 463 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 148 to 64, Complexity from 41 to 14, Nesting Level from 4 to 2, Number of Variables from 29 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ6ra3Qoze_0heoHOrVq&open=AZ6ra3Qoze_0heoHOrVq&pullRequest=17879
final TSDataType type,
final Object originValueColumn,
final List<Integer> rowIndexList,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@
}

@Override
public void transferType(ZoneId zoneId) throws QueryProcessException {

Check failure on line 109 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 16 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ6ra3Q1ze_0heoHOrVs&open=AZ6ra3Q1ze_0heoHOrVs&pullRequest=17879
for (int i = 0; i < measurementSchemas.length; i++) {

Check warning on line 110 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Reduce the total number of break and continue statements in this loop to use at most one.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ6ra3Q1ze_0heoHOrVr&open=AZ6ra3Q1ze_0heoHOrVr&pullRequest=17879
// null when time series doesn't exist
if (measurementSchemas[i] == null) {
if (!IoTDBDescriptor.getInstance().getConfig().isEnablePartialInsert()) {
Expand All @@ -126,6 +126,9 @@

// parse string value to specific type
dataTypes[i] = measurementSchemas[i].getType();
if (values == null || i >= values.length || values[i] == null) {
continue;
}
try {
values[i] = ValueConverter.parse(values[i].toString(), dataTypes[i]);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,9 @@ private static long sizeOfBinary(final Binary binary) {

public static long sizeOfColumns(
final Object[] columns, final MeasurementSchema[] measurementSchemas) {
if (Objects.isNull(columns)) {
return 0L;
}
// Directly calculate if measurementSchemas are absent
if (Objects.isNull(measurementSchemas)) {
return RamUsageEstimator.shallowSizeOf(columns)
Expand All @@ -559,7 +562,10 @@ public static long sizeOfColumns(
RamUsageEstimator.alignObjectSize(
NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * columns.length);
for (int i = 0; i < columns.length; i++) {
if (measurementSchemas[i] == null || measurementSchemas[i].getType() == null) {
if (columns[i] == null
|| i >= measurementSchemas.length
|| measurementSchemas[i] == null
|| measurementSchemas[i].getType() == null) {
continue;
}
switch (measurementSchemas[i].getType()) {
Expand Down Expand Up @@ -611,6 +617,9 @@ private static long getNumBytesUnknownObject(final Object obj) {

public static long sizeOfValues(
final Object[] values, final MeasurementSchema[] measurementSchemas) {
if (Objects.isNull(values)) {
return 0L;
}
// Directly calculate if measurementSchemas are absent
if (Objects.isNull(measurementSchemas)) {
return RamUsageEstimator.shallowSizeOf(values)
Expand All @@ -622,7 +631,9 @@ public static long sizeOfValues(
RamUsageEstimator.alignObjectSize(
NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * values.length);
for (int i = 0; i < values.length; i++) {
if (measurementSchemas[i] == null || measurementSchemas[i].getType() == null) {
if (i >= measurementSchemas.length
|| measurementSchemas[i] == null
|| measurementSchemas[i].getType() == null) {
size += NUM_BYTES_OBJECT_HEADER;
continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@
* @return Pair of values array and memory size
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
private static Pair<Object[], Long> readValuesFromBufferWithMemory(

Check warning on line 386 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 108 to 64, Complexity from 27 to 14, Nesting Level from 5 to 2, Number of Variables from 22 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ6ra3Pxze_0heoHOrVp&open=AZ6ra3Pxze_0heoHOrVp&pullRequest=17879
final ByteBuffer byteBuffer, final TSDataType[] types, final int columns, final int rowSize) {
final Object[] values = new Object[columns];

Expand All @@ -395,7 +395,7 @@
for (int i = 0; i < columns; i++) {
final boolean isValueColumnsNotNull =
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
if (isValueColumnsNotNull && types[i] == null) {
if (types[i] == null) {
continue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ public MeasurementSchema[] getMeasurementSchemas() {

public void setMeasurementSchemas(MeasurementSchema[] measurementSchemas) {
this.measurementSchemas = measurementSchemas;
measurementColumnCnt = -1;
}

public String[] getMeasurements() {
Expand All @@ -189,13 +190,16 @@ public int measureColumnCnt() {
}

public boolean isValidMeasurement(int i) {
return measurementSchemas != null
return measurements != null
Comment thread
jt2594838 marked this conversation as resolved.
&& measurements[i] != null
&& measurementSchemas != null
&& measurementSchemas[i] != null
&& (columnCategories == null || columnCategories[i] == TsTableColumnCategory.FIELD);
}

public void setMeasurements(String[] measurements) {
this.measurements = measurements;
measurementColumnCnt = -1;
}

public TSDataType[] getDataTypes() {
Expand Down Expand Up @@ -327,8 +331,12 @@ public void markFailedMeasurement(int index) {
}

public boolean hasValidMeasurements() {
for (Object o : measurements) {
if (o != null) {
if (measurements == null) {
return false;
}
for (int i = 0; i < measurements.length; i++) {
if (measurements[i] != null
&& (columnCategories == null || columnCategories[i] == TsTableColumnCategory.FIELD)) {
return true;
}
}
Expand All @@ -354,15 +362,16 @@ protected int getValidMeasurementNumber() {
}

public boolean isMeasurementFailed(int index) {
return measurements[index] == null;
return measurements == null || measurements[index] == null;
}

protected boolean isWritableFieldMeasurement(int index) {
return !isMeasurementFailed(index)
&& (columnCategories == null || columnCategories[index] == TsTableColumnCategory.FIELD);
}

public boolean allMeasurementFailed() {
if (measurements != null) {
return failedMeasurementNumber
>= measurements.length - (tagColumnIndices == null ? 0 : tagColumnIndices.size());
}
return true;
return measurements == null || !hasValidMeasurements();
}

// endregion
Expand Down Expand Up @@ -418,6 +427,8 @@ public TsTableColumnCategory[] getColumnCategories() {

public void setColumnCategories(TsTableColumnCategory[] columnCategories) {
this.columnCategories = columnCategories;
measurementColumnCnt = -1;
tagColumnIndices = null;
if (columnCategories != null) {
tagColumnIndices = new ArrayList<>();
for (int i = 0; i < columnCategories.length; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ public void markFailedMeasurement(int index) {
measurements[index] = null;
dataTypes[index] = null;
values[index] = null;
measurementColumnCnt = -1;
}

@Override
Expand Down Expand Up @@ -268,7 +269,7 @@ void subSerialize(DataOutputStream stream) throws IOException {

/** Serialize measurements and values, ignoring failed time series. */
void serializeMeasurementsAndValues(ByteBuffer buffer) {
ReadWriteIOUtils.write(measurements.length - getFailedMeasurementNumber(), buffer);
ReadWriteIOUtils.write(getValidMeasurementNumber(), buffer);
serializeMeasurementsOrSchemas(buffer);
putDataTypesAndValues(buffer);
ReadWriteIOUtils.write((byte) (isNeedInferType ? 1 : 0), buffer);
Expand All @@ -282,7 +283,7 @@ void serializeMeasurementsAndValues(ByteBuffer buffer) {
* @throws IOException - If an I/O error occurs.
*/
void serializeMeasurementsAndValues(DataOutputStream stream) throws IOException {
ReadWriteIOUtils.write(measurements.length - getFailedMeasurementNumber(), stream);
ReadWriteIOUtils.write(getValidMeasurementNumber(), stream);
serializeMeasurementsOrSchemas(stream);
putDataTypesAndValues(stream);
ReadWriteIOUtils.write((byte) (isNeedInferType ? 1 : 0), stream);
Expand Down Expand Up @@ -637,7 +638,7 @@ protected void subSerialize(IWALByteBufferView buffer) {

/** Serialize measurements and values, ignoring failed time series. */
private void serializeMeasurementsAndValues(IWALByteBufferView buffer) {
buffer.putInt(measurements.length - getFailedMeasurementNumber());
buffer.putInt(getValidMeasurementNumber());
serializeMeasurementSchemasToWAL(buffer);
putDataTypesAndValues(buffer);
buffer.put((byte) (isAligned ? 1 : 0));
Expand Down Expand Up @@ -910,15 +911,26 @@ public <R, C> R accept(IPlanVisitor<R, C> visitor, C context) {
}

public TimeValuePair composeTimeValuePair(int columnIndex) {
if (columnIndex >= values.length
|| Objects.isNull(dataTypes[columnIndex])
|| dataTypes[columnIndex] == TSDataType.OBJECT) {
if (!canComposeTimeValuePair(columnIndex)) {
return null;
}
Object value = values[columnIndex];
return Objects.nonNull(value)
? new TimeValuePair(time, TsPrimitiveType.getByType(dataTypes[columnIndex], value))
: null;
return new TimeValuePair(time, TsPrimitiveType.getByType(dataTypes[columnIndex], value));
}

private boolean canComposeTimeValuePair(final int columnIndex) {
return measurements != null
&& columnIndex >= 0
&& columnIndex < measurements.length
&& values != null
&& columnIndex < values.length
&& values[columnIndex] != null
&& dataTypes != null
&& columnIndex < dataTypes.length
&& dataTypes[columnIndex] != null
&& dataTypes[columnIndex] != TSDataType.OBJECT
&& (columnCategories == null || columnIndex < columnCategories.length)
&& isWritableFieldMeasurement(columnIndex);
}

public void updateLastCache(String databaseName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,9 @@ private void storeMeasurementsAndDataType() {
String[] measurements = insertRowNode.getMeasurements();
TSDataType[] dataTypes = insertRowNode.getDataTypes();
for (int i = 0; i < measurements.length; i++) {
if (measurements[i] == null) {
continue;
}
if (!measurementSet.contains(measurements[i])) {
measurementList.add(measurements[i]);
dataTypeList.add(dataTypes[i]);
Expand Down
Loading
Loading