diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java index 7de8e8bf78d9d..5f414f2cd69fd 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java @@ -1277,7 +1277,7 @@ protected TSStatus loadFileV2( PipeConfigRegionSnapshotEvent.getConfigPhysicalPlanTypeSet( parameters.get(ColumnHeaderConstant.TYPE)); final boolean isTreeModelDataAllowedToBeCaptured = - parameters.containsKey(PipeTransferFileSealReqV2.TREE); + PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(parameters); final TreePattern treePattern = TreePattern.parsePatternFromString( parameters.get(ColumnHeaderConstant.PATH_PATTERN), @@ -1285,7 +1285,7 @@ protected TSStatus loadFileV2( p -> new IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, p)); final TablePattern tablePattern = new TablePattern( - parameters.containsKey(PipeTransferFileSealReqV2.TABLE), + PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(parameters), parameters.get(PipeTransferFileSealReqV2.DATABASE_PATTERN), parameters.get(ColumnHeaderConstant.TABLE_NAME)); final List results = new ArrayList<>(); diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/sink/PipeConfigNodeThriftRequestTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/sink/PipeConfigNodeThriftRequestTest.java index 5c578ebead8c5..dd948a451cf90 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/sink/PipeConfigNodeThriftRequestTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/sink/PipeConfigNodeThriftRequestTest.java @@ -19,17 +19,33 @@ package org.apache.iotdb.confignode.manager.pipe.sink; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferHandshakeConstant; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV2; +import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; import org.apache.iotdb.confignode.consensus.request.write.cq.ActiveCQPlan; import org.apache.iotdb.confignode.manager.pipe.sink.payload.PipeTransferConfigNodeHandshakeV1Req; +import org.apache.iotdb.confignode.manager.pipe.sink.payload.PipeTransferConfigNodeHandshakeV2Req; import org.apache.iotdb.confignode.manager.pipe.sink.payload.PipeTransferConfigPlanReq; import org.apache.iotdb.confignode.manager.pipe.sink.payload.PipeTransferConfigSnapshotPieceReq; import org.apache.iotdb.confignode.manager.pipe.sink.payload.PipeTransferConfigSnapshotSealReq; import org.apache.iotdb.confignode.persistence.schema.CNSnapshotFileType; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.PublicBAOS; +import org.apache.tsfile.utils.ReadWriteIOUtils; import org.junit.Assert; import org.junit.Test; +import java.io.DataOutputStream; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; public class PipeConfigNodeThriftRequestTest { @@ -48,6 +64,59 @@ public void testPipeTransferConfigHandshakeReq() throws IOException { Assert.assertEquals(req.getTimestampPrecision(), deserializeReq.getTimestampPrecision()); } + @Test + public void testPipeTransferConfigHandshakeReqFromLegacyV13Body() throws IOException { + final TPipeTransferReq req = + legacyTransferReq( + PipeRequestType.HANDSHAKE_CONFIGNODE_V1, + serializeLegacyHandshakeV1Body(TIME_PRECISION)); + + final PipeTransferConfigNodeHandshakeV1Req deserializeReq = + PipeTransferConfigNodeHandshakeV1Req.fromTPipeTransferReq(req); + + Assert.assertEquals(req.getVersion(), deserializeReq.getVersion()); + Assert.assertEquals(req.getType(), deserializeReq.getType()); + Assert.assertEquals(TIME_PRECISION, deserializeReq.getTimestampPrecision()); + } + + @Test + public void testPipeTransferConfigHandshakeV2Req() throws IOException { + final Map params = new HashMap<>(); + params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION, TIME_PRECISION); + params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID, "cluster"); + params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, "root"); + params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, "root"); + + final PipeTransferConfigNodeHandshakeV2Req req = + PipeTransferConfigNodeHandshakeV2Req.toTPipeTransferReq(params); + final PipeTransferConfigNodeHandshakeV2Req deserializeReq = + PipeTransferConfigNodeHandshakeV2Req.fromTPipeTransferReq(req); + + Assert.assertEquals(req.getVersion(), deserializeReq.getVersion()); + Assert.assertEquals(req.getType(), deserializeReq.getType()); + Assert.assertEquals(params, deserializeReq.getParams()); + } + + @Test + public void testPipeTransferConfigHandshakeV2ReqFromLegacyV13Body() throws IOException { + final Map params = new HashMap<>(); + params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION, TIME_PRECISION); + params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID, "cluster"); + params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, "root"); + params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, "root"); + + final TPipeTransferReq req = + legacyTransferReq( + PipeRequestType.HANDSHAKE_CONFIGNODE_V2, serializeLegacyHandshakeV2Body(params)); + + final PipeTransferConfigNodeHandshakeV2Req deserializeReq = + PipeTransferConfigNodeHandshakeV2Req.fromTPipeTransferReq(req); + + Assert.assertEquals(req.getVersion(), deserializeReq.getVersion()); + Assert.assertEquals(req.getType(), deserializeReq.getType()); + Assert.assertEquals(params, deserializeReq.getParams()); + } + @Test public void testPipeTransferConfigPlanReq() { PipeTransferConfigPlanReq req = @@ -56,6 +125,22 @@ public void testPipeTransferConfigPlanReq() { Assert.assertEquals(req.getVersion(), deserializeReq.getVersion()); Assert.assertEquals(req.getType(), deserializeReq.getType()); + Assert.assertArrayEquals(req.getBody(), deserializeReq.getBody()); + } + + @Test + public void testPipeTransferConfigPlanReqFromLegacyV13Body() { + final ActiveCQPlan plan = new ActiveCQPlan("cqId", "md5"); + final ByteBuffer legacyBody = plan.serializeToByteBuffer(); + final TPipeTransferReq req = + legacyTransferReq(PipeRequestType.TRANSFER_CONFIG_PLAN, legacyBody); + + final PipeTransferConfigPlanReq deserializeReq = + PipeTransferConfigPlanReq.fromTPipeTransferReq(req); + + Assert.assertEquals(req.getVersion(), deserializeReq.getVersion()); + Assert.assertEquals(req.getType(), deserializeReq.getType()); + Assert.assertArrayEquals(byteBufferToByteArray(legacyBody), deserializeReq.getBody()); } @Test @@ -76,6 +161,24 @@ public void testPipeTransferConfigSnapshotPieceReq() throws IOException { Assert.assertArrayEquals(req.getFilePiece(), deserializeReq.getFilePiece()); } + @Test + public void testPipeTransferConfigSnapshotPieceReqFromLegacyV13Body() throws IOException { + final byte[] body = "legacyConfigSnapshotPiece".getBytes(); + final TPipeTransferReq req = + legacyTransferReq( + PipeRequestType.TRANSFER_CONFIG_SNAPSHOT_PIECE, + serializeLegacyFilePieceBody("cluster_schema.bin", 10L, body)); + + final PipeTransferConfigSnapshotPieceReq deserializeReq = + PipeTransferConfigSnapshotPieceReq.fromTPipeTransferReq(req); + + Assert.assertEquals(req.getVersion(), deserializeReq.getVersion()); + Assert.assertEquals(req.getType(), deserializeReq.getType()); + Assert.assertEquals("cluster_schema.bin", deserializeReq.getFileName()); + Assert.assertEquals(10L, deserializeReq.getStartWritingOffset()); + Assert.assertArrayEquals(body, deserializeReq.getFilePiece()); + } + @Test public void testPipeTransferConfigSnapshotSealReq() throws IOException { String snapshotName = "cluster_schema.bin"; @@ -108,4 +211,113 @@ public void testPipeTransferConfigSnapshotSealReq() throws IOException { Assert.assertEquals(req.getFileLengths(), deserializeReq.getFileLengths()); Assert.assertEquals(req.getParameters(), deserializeReq.getParameters()); } + + @Test + public void testPipeTransferConfigSnapshotSealReqFromLegacyV13Body() throws IOException { + final String snapshotName = "cluster_schema.bin"; + final String templateInfoName = "template_info.bin"; + final Map parameters = new HashMap<>(); + parameters.put(ColumnHeaderConstant.PATH_PATTERN, "root.**"); + parameters.put( + PipeTransferConfigSnapshotSealReq.FILE_TYPE, + Byte.toString(CNSnapshotFileType.SCHEMA.getType())); + parameters.put(ColumnHeaderConstant.TYPE, "200"); + + final TPipeTransferReq req = + legacyTransferReq( + PipeRequestType.TRANSFER_CONFIG_SNAPSHOT_SEAL, + serializeLegacyFileSealV2Body( + Arrays.asList(snapshotName, templateInfoName), + Arrays.asList(100L, 10L), + parameters)); + final PipeTransferConfigSnapshotSealReq deserializeReq = + PipeTransferConfigSnapshotSealReq.fromTPipeTransferReq(req); + + Assert.assertEquals(req.getVersion(), deserializeReq.getVersion()); + Assert.assertEquals(req.getType(), deserializeReq.getType()); + Assert.assertEquals( + Arrays.asList(snapshotName, templateInfoName), deserializeReq.getFileNames()); + Assert.assertEquals(Arrays.asList(100L, 10L), deserializeReq.getFileLengths()); + Assert.assertEquals(parameters, deserializeReq.getParameters()); + Assert.assertTrue( + PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured( + deserializeReq.getParameters())); + Assert.assertFalse( + PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured( + deserializeReq.getParameters())); + } + + private static TPipeTransferReq legacyTransferReq( + final PipeRequestType requestType, final ByteBuffer body) { + final TPipeTransferReq req = new TPipeTransferReq(); + req.version = IoTDBSinkRequestVersion.VERSION_1.getVersion(); + req.type = requestType.getType(); + req.body = body; + return req; + } + + private static byte[] byteBufferToByteArray(final ByteBuffer byteBuffer) { + final ByteBuffer duplicatedBuffer = byteBuffer.duplicate(); + final byte[] bytes = new byte[duplicatedBuffer.remaining()]; + duplicatedBuffer.get(bytes); + return bytes; + } + + private static ByteBuffer serializeLegacyFileSealV2Body( + final List fileNames, + final List fileLengths, + final Map parameters) + throws IOException { + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + ReadWriteIOUtils.write(fileNames.size(), outputStream); + for (final String fileName : fileNames) { + ReadWriteIOUtils.write(fileName, outputStream); + } + ReadWriteIOUtils.write(fileLengths.size(), outputStream); + for (final Long fileLength : fileLengths) { + ReadWriteIOUtils.write(fileLength, outputStream); + } + ReadWriteIOUtils.write(parameters.size(), outputStream); + for (final Map.Entry entry : parameters.entrySet()) { + ReadWriteIOUtils.write(entry.getKey(), outputStream); + ReadWriteIOUtils.write(entry.getValue(), outputStream); + } + return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + } + } + + private static ByteBuffer serializeLegacyHandshakeV1Body(final String timestampPrecision) + throws IOException { + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + ReadWriteIOUtils.write(timestampPrecision, outputStream); + return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + } + } + + private static ByteBuffer serializeLegacyHandshakeV2Body(final Map params) + throws IOException { + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + ReadWriteIOUtils.write(params.size(), outputStream); + for (final Map.Entry entry : params.entrySet()) { + ReadWriteIOUtils.write(entry.getKey(), outputStream); + ReadWriteIOUtils.write(entry.getValue(), outputStream); + } + return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + } + } + + private static ByteBuffer serializeLegacyFilePieceBody( + final String fileName, final long startWritingOffset, final byte[] filePiece) + throws IOException { + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + ReadWriteIOUtils.write(fileName, outputStream); + ReadWriteIOUtils.write(startWritingOffset, outputStream); + ReadWriteIOUtils.write(new Binary(filePiece), outputStream); + return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index 55f46be8f99d5..60ea19246f0f6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -614,7 +614,7 @@ private TSStatus loadSchemaSnapShot( PipeSchemaRegionSnapshotEvent.getStatementTypeSet( parameters.get(ColumnHeaderConstant.TYPE)); final boolean isTreeModelDataAllowedToBeCaptured = - parameters.containsKey(PipeTransferFileSealReqV2.TREE); + PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(parameters); final TreePattern treePattern = TreePattern.parsePatternFromString( parameters.get(ColumnHeaderConstant.PATH_PATTERN), @@ -622,7 +622,7 @@ private TSStatus loadSchemaSnapShot( p -> new IoTDBTreePattern(isTreeModelDataAllowedToBeCaptured, p)); final TablePattern tablePattern = new TablePattern( - parameters.containsKey(PipeTransferFileSealReqV2.TABLE), + PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(parameters), parameters.get(PipeTransferFileSealReqV2.DATABASE_PATTERN), parameters.get(ColumnHeaderConstant.TABLE_NAME)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java index 98ea83b6d7631..63db24a37ba78 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java @@ -30,6 +30,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; +import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.utils.PublicBAOS; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.apache.tsfile.write.record.Tablet; @@ -118,19 +119,7 @@ public static PipeTransferTabletRawReq toTPipeTransferRawReq( final ByteBuffer buffer, final TabletStringInternPool tabletStringInternPool) { final PipeTransferTabletRawReq tabletReq = new PipeTransferTabletRawReq(); - final int startPosition = buffer.position(); - try { - final InsertTabletStatement insertTabletStatement = - TabletStatementConverter.deserializeStatementFromTabletFormat( - buffer, false, tabletStringInternPool); - tabletReq.isAligned = insertTabletStatement.isAligned(); - tabletReq.statement = insertTabletStatement; - } catch (final Exception e) { - buffer.position(startPosition); - tabletReq.tablet = - PipeTabletUtils.internTablet(Tablet.deserialize(buffer), tabletStringInternPool); - tabletReq.isAligned = ReadWriteIOUtils.readBool(buffer); - } + tabletReq.deserializeTPipeTransferRawReq(buffer, tabletStringInternPool); return tabletReq; } @@ -167,6 +156,80 @@ public static PipeTransferTabletRawReq fromTPipeTransferReq(final TPipeTransferR return tabletReq; } + private void deserializeTPipeTransferRawReq( + final ByteBuffer buffer, final TabletStringInternPool tabletStringInternPool) { + final int startPosition = buffer.position(); + try { + // Current V1 raw tablet requests can be converted to InsertTabletStatement directly. Keep + // this as the first attempt to avoid the overhead of constructing an intermediate Tablet. + final InsertTabletStatement insertTabletStatement = + TabletStatementConverter.deserializeStatementFromTabletFormat( + buffer, false, tabletStringInternPool); + // Legacy tablets do not serialize column categories. Since hasSchema=1 can be + // misread as FIELD, the current reader may return a corrupt statement instead of failing. + ensureStatementDeserializedFromCurrentTabletFormat(insertTabletStatement); + isAligned = insertTabletStatement.isAligned(); + statement = insertTabletStatement; + return; + } catch (final Exception e) { + buffer.position(startPosition); + } + + try { + // Some old senders serialize Tablet without column categories. Retry with the legacy reader + // before falling back to the full Tablet deserialization path. + final InsertTabletStatement insertTabletStatement = + TabletStatementConverter.deserializeLegacyStatementFromTabletFormat( + buffer, tabletStringInternPool); + isAligned = insertTabletStatement.isAligned(); + statement = insertTabletStatement; + return; + } catch (final Exception e) { + buffer.position(startPosition); + } + + tablet = PipeTabletUtils.internTablet(Tablet.deserialize(buffer), tabletStringInternPool); + isAligned = ReadWriteIOUtils.readBool(buffer); + } + + private static void ensureStatementDeserializedFromCurrentTabletFormat( + final InsertTabletStatement statement) { + final String[] measurements = statement.getMeasurements(); + final TSDataType[] dataTypes = statement.getDataTypes(); + + if (Objects.isNull(measurements) + || Objects.isNull(dataTypes) + || measurements.length != dataTypes.length) { + throw new IllegalArgumentException( + "Incomplete schema in current tablet format deserialization."); + } + + final Object[] columns = statement.getColumns(); + if (Objects.nonNull(columns) && columns.length != measurements.length) { + throw new IllegalArgumentException( + "Column count is inconsistent with schema count in current tablet format deserialization."); + } + + for (int i = 0; i < measurements.length; ++i) { + if (Objects.isNull(measurements[i]) || Objects.isNull(dataTypes[i])) { + throw new IllegalArgumentException( + "Incomplete measurement schema in current tablet format deserialization."); + } + if (statement.getRowCount() > 0 && (Objects.isNull(columns) || Objects.isNull(columns[i]))) { + throw new IllegalArgumentException( + "Incomplete column values in current tablet format deserialization."); + } + } + + final long[] times = statement.getTimes(); + if (statement.getRowCount() > 0 + && measurements.length > 0 + && (Objects.isNull(times) || times.length < statement.getRowCount())) { + throw new IllegalArgumentException( + "Incomplete timestamps in current tablet format deserialization."); + } + } + /////////////////////////////// Air Gap /////////////////////////////// /** diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java index 773d40e99d1f7..6656110f2c0bc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java @@ -86,6 +86,27 @@ public static InsertTabletStatement deserializeStatementFromTabletFormat( final boolean readDatabaseName, final TabletStringInternPool tabletStringInternPool) throws IllegalPathException { + return deserializeStatementFromTabletFormat( + byteBuffer, readDatabaseName, tabletStringInternPool, true); + } + + public static InsertTabletStatement deserializeLegacyStatementFromTabletFormat( + final ByteBuffer byteBuffer) throws IllegalPathException { + return deserializeLegacyStatementFromTabletFormat(byteBuffer, null); + } + + public static InsertTabletStatement deserializeLegacyStatementFromTabletFormat( + final ByteBuffer byteBuffer, final TabletStringInternPool tabletStringInternPool) + throws IllegalPathException { + return deserializeStatementFromTabletFormat(byteBuffer, false, tabletStringInternPool, false); + } + + private static InsertTabletStatement deserializeStatementFromTabletFormat( + final ByteBuffer byteBuffer, + final boolean readDatabaseName, + final TabletStringInternPool tabletStringInternPool, + final boolean readColumnCategory) + throws IllegalPathException { final InsertTabletStatement statement = new InsertTabletStatement(); // Calculate memory size during deserialization, use INSTANCE_SIZE constant @@ -132,9 +153,11 @@ public static InsertTabletStatement deserializeStatementFromTabletFormat( final Pair pair = readMeasurement(byteBuffer, tabletStringInternPool); measurement[i] = pair.getLeft(); dataTypes[i] = pair.getRight(); - columnCategories[i] = - TsTableColumnCategory.fromTsFileColumnCategory( - ColumnCategory.values()[byteBuffer.get()]); + if (readColumnCategory) { + columnCategories[i] = + TsTableColumnCategory.fromTsFileColumnCategory( + ColumnCategory.values()[byteBuffer.get()]); + } // Calculate memory for each measurement string if (measurement[i] != null) { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java index 10573e5609dba..88f7353f51925 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java @@ -20,12 +20,18 @@ package org.apache.iotdb.db.pipe.sink; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.common.PipeTransferHandshakeConstant; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV2; import org.apache.iotdb.commons.pipe.sink.payload.thrift.response.PipeTransferFilePieceResp; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.commons.schema.SchemaConstant; +import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; import org.apache.iotdb.db.pipe.processor.twostage.exchange.payload.CombineRequest; import org.apache.iotdb.db.pipe.processor.twostage.state.CountState; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req; +import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferDataNodeHandshakeV2Req; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferPlanNodeReq; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferSchemaSnapshotPieceReq; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferSchemaSnapshotSealReq; @@ -40,6 +46,7 @@ import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFilePieceReq; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFilePieceWithModReq; import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFileSealReq; +import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFileSealWithModReq; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.CreateAlignedTimeSeriesNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.queryengine.plan.statement.Statement; @@ -69,9 +76,12 @@ import java.nio.ByteBuffer; import java.time.LocalDate; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; public class PipeDataNodeThriftRequestTest { @@ -146,6 +156,61 @@ public void testPipeTransferDataNodeHandshakeReq() throws IOException { Assert.assertEquals(req.getTimestampPrecision(), deserializeReq.getTimestampPrecision()); } + @Test + public void testPipeTransferDataNodeHandshakeReqFromLegacyV13Body() throws IOException { + final TPipeTransferReq req = + legacyTransferReq( + PipeRequestType.HANDSHAKE_DATANODE_V1, serializeLegacyHandshakeV1Body(TIME_PRECISION)); + + final PipeTransferDataNodeHandshakeV1Req deserializeReq = + PipeTransferDataNodeHandshakeV1Req.fromTPipeTransferReq(req); + + Assert.assertEquals(req.getVersion(), deserializeReq.getVersion()); + Assert.assertEquals(req.getType(), deserializeReq.getType()); + Assert.assertEquals(TIME_PRECISION, deserializeReq.getTimestampPrecision()); + } + + @Test + public void testPipeTransferDataNodeHandshakeV2Req() throws IOException { + final Map params = new HashMap<>(); + params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION, TIME_PRECISION); + params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID, "cluster"); + params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, "root"); + params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, "root"); + params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_LOAD_TSFILE_STRATEGY, "async"); + params.put( + PipeTransferHandshakeConstant.HANDSHAKE_KEY_VALIDATE_TSFILE, Boolean.TRUE.toString()); + + final PipeTransferDataNodeHandshakeV2Req req = + PipeTransferDataNodeHandshakeV2Req.toTPipeTransferReq(params); + final PipeTransferDataNodeHandshakeV2Req deserializeReq = + PipeTransferDataNodeHandshakeV2Req.fromTPipeTransferReq(req); + + Assert.assertEquals(req.getVersion(), deserializeReq.getVersion()); + Assert.assertEquals(req.getType(), deserializeReq.getType()); + Assert.assertEquals(params, deserializeReq.getParams()); + } + + @Test + public void testPipeTransferDataNodeHandshakeV2ReqFromLegacyV13Body() throws IOException { + final Map params = new HashMap<>(); + params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION, TIME_PRECISION); + params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID, "cluster"); + params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_USERNAME, "root"); + params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_PASSWORD, "root"); + + final TPipeTransferReq req = + legacyTransferReq( + PipeRequestType.HANDSHAKE_DATANODE_V2, serializeLegacyHandshakeV2Body(params)); + + final PipeTransferDataNodeHandshakeV2Req deserializeReq = + PipeTransferDataNodeHandshakeV2Req.fromTPipeTransferReq(req); + + Assert.assertEquals(req.getVersion(), deserializeReq.getVersion()); + Assert.assertEquals(req.getType(), deserializeReq.getType()); + Assert.assertEquals(params, deserializeReq.getParams()); + } + @Test public void testPipeTransferInsertNodeReq() { final PipeTransferTabletInsertNodeReq req = @@ -173,6 +238,34 @@ public void testPipeTransferInsertNodeReq() { Assert.assertEquals(statement.getPaths(), paths); } + @Test + public void testPipeTransferInsertNodeReqFromLegacyV13Body() { + final InsertRowNode node = + new InsertRowNode( + new PlanNodeId(""), + new PartialPath(new String[] {"root", "sg", "d"}), + false, + new String[] {"s"}, + new TSDataType[] {TSDataType.INT32}, + 1, + new Object[] {1}, + false); + final TPipeTransferReq req = + legacyTransferReq( + PipeRequestType.TRANSFER_TABLET_INSERT_NODE, node.serializeToByteBuffer()); + + final PipeTransferTabletInsertNodeReq deserializeReq = + PipeTransferTabletInsertNodeReq.fromTPipeTransferReq(req); + + Assert.assertEquals(req.getVersion(), deserializeReq.getVersion()); + Assert.assertEquals(req.getType(), deserializeReq.getType()); + Assert.assertEquals(node, deserializeReq.getInsertNode()); + + final List paths = new ArrayList<>(); + paths.add(new PartialPath(new String[] {"root", "sg", "d", "s"})); + Assert.assertEquals(paths, deserializeReq.constructStatement().getPaths()); + } + @Test public void testPipeTransferInsertNodeReqV2() { final PipeTransferTabletInsertNodeReqV2 req = @@ -244,6 +337,23 @@ public void testPipeTransferTabletBinaryReq() { Assert.assertEquals(req.getVersion(), deserializeReq.getVersion()); Assert.assertEquals(req.getType(), deserializeReq.getType()); + Assert.assertArrayEquals( + new byte[] {'a', 'b'}, byteBufferToByteArray(deserializeReq.getByteBuffer())); + } + + @Test + public void testPipeTransferTabletBinaryReqFromLegacyV13Body() { + // Not do real test here since "serializeToWal" needs private inner class of walBuffer + final TPipeTransferReq req = + legacyTransferReq( + PipeRequestType.TRANSFER_TABLET_BINARY, ByteBuffer.wrap(new byte[] {'a', 'b'})); + final PipeTransferTabletBinaryReq deserializeReq = + PipeTransferTabletBinaryReq.fromTPipeTransferReq(req); + + Assert.assertEquals(req.getVersion(), deserializeReq.getVersion()); + Assert.assertEquals(req.getType(), deserializeReq.getType()); + Assert.assertArrayEquals( + new byte[] {'a', 'b'}, byteBufferToByteArray(deserializeReq.getByteBuffer())); } @Test @@ -285,6 +395,30 @@ public void testPipeTransferPlanNodeReq() { Assert.assertEquals(req.getPlanNode(), deserializeReq.getPlanNode()); } + @Test + public void testPipeTransferPlanNodeReqFromLegacyV13SchemaPlanBody() { + final CreateAlignedTimeSeriesNode node = + new CreateAlignedTimeSeriesNode( + new PlanNodeId(""), + new PartialPath(new String[] {"root", "sg", "d"}), + Collections.singletonList("s"), + Collections.singletonList(TSDataType.INT32), + Collections.singletonList(TSEncoding.PLAIN), + Collections.singletonList(CompressionType.UNCOMPRESSED), + null, + null, + null); + final TPipeTransferReq req = + legacyTransferReq(PipeRequestType.TRANSFER_PLAN_NODE, node.serializeToByteBuffer()); + + final PipeTransferPlanNodeReq deserializeReq = + PipeTransferPlanNodeReq.fromTPipeTransferReq(req); + + Assert.assertEquals(req.getVersion(), deserializeReq.getVersion()); + Assert.assertEquals(req.getType(), deserializeReq.getType()); + Assert.assertEquals(node, deserializeReq.getPlanNode()); + } + @Test public void testPipeTransferTabletReq() { try { @@ -518,6 +652,72 @@ public void testPipeTransferTabletBatchReqInternsRepeatedMeasurementNames() thro insertTabletStatements.get(1).getMeasurements()[0]); } + @Test + public void testPipeTransferTabletBatchReqWithLegacyTabletFormat() throws IOException { + final List tabletBuffers = new ArrayList<>(); + tabletBuffers.add(serializeLegacyTabletRawBuffer(false)); + tabletBuffers.add(serializeLegacyTabletRawBuffer(true)); + + final PipeTransferTabletBatchReq req = + PipeTransferTabletBatchReq.toTPipeTransferReq(Collections.emptyList(), tabletBuffers); + + final PipeTransferTabletBatchReq deserializedReq = + PipeTransferTabletBatchReq.fromTPipeTransferReq(req); + + Assert.assertEquals(2, deserializedReq.getTabletReqs().size()); + Assert.assertFalse(deserializedReq.getTabletReqs().get(0).getIsAligned()); + Assert.assertTrue(deserializedReq.getTabletReqs().get(1).getIsAligned()); + + assertLegacyTabletStatement(deserializedReq.getTabletReqs().get(0).constructStatement()); + assertLegacyTabletStatement(deserializedReq.getTabletReqs().get(1).constructStatement()); + } + + @Test + public void testPipeTransferTabletBatchReqFromLegacyV13Body() throws IOException { + final InsertRowNode node = + new InsertRowNode( + new PlanNodeId(""), + new PartialPath(new String[] {"root", "sg", "d"}), + false, + new String[] {"s"}, + new TSDataType[] {TSDataType.INT32}, + 1, + new Object[] {1}, + false); + final TPipeTransferReq req = + legacyTransferReq( + PipeRequestType.TRANSFER_TABLET_BATCH, + serializeLegacyTabletBatchBody( + Collections.singletonList(node.serializeToByteBuffer()), + Collections.singletonList(serializeLegacyTabletRawBuffer(false)))); + + final PipeTransferTabletBatchReq deserializedReq = + PipeTransferTabletBatchReq.fromTPipeTransferReq(req); + + Assert.assertEquals(req.getVersion(), deserializedReq.getVersion()); + Assert.assertEquals(req.getType(), deserializedReq.getType()); + Assert.assertEquals(1, deserializedReq.getInsertNodeReqs().size()); + Assert.assertEquals(1, deserializedReq.getTabletReqs().size()); + Assert.assertEquals(node, deserializedReq.getInsertNodeReqs().get(0).getInsertNode()); + assertLegacyTabletStatement(deserializedReq.getTabletReqs().get(0).constructStatement()); + } + + @Test + public void testPipeTransferTabletRawReqWithLegacyTabletFormat() throws IOException { + final TPipeTransferReq req = new TPipeTransferReq(); + req.version = IoTDBSinkRequestVersion.VERSION_1.getVersion(); + req.type = PipeRequestType.TRANSFER_TABLET_RAW.getType(); + req.body = serializeLegacyTabletRawBuffer(true); + + final PipeTransferTabletRawReq deserializedReq = + PipeTransferTabletRawReq.fromTPipeTransferReq(req); + + Assert.assertEquals(req.getVersion(), deserializedReq.getVersion()); + Assert.assertEquals(req.getType(), deserializedReq.getType()); + Assert.assertTrue(deserializedReq.getIsAligned()); + assertLegacyTabletStatement(deserializedReq.constructStatement()); + } + @Test public void testPipeTransferTabletBatchReqV2() throws IOException { final List insertNodeBuffers = new ArrayList<>(); @@ -733,6 +933,38 @@ public void testPipeTransferSchemaSnapshotPieceReq() throws IOException { Assert.assertArrayEquals(req.getFilePiece(), deserializeReq.getFilePiece()); } + @Test + public void testPipeTransferFilePieceReqsFromLegacyV13Bodies() throws IOException { + final byte[] body = "legacyPiece".getBytes(); + + final PipeTransferTsFilePieceReq tsFilePieceReq = + PipeTransferTsFilePieceReq.fromTPipeTransferReq( + legacyTransferReq( + PipeRequestType.TRANSFER_TS_FILE_PIECE, + serializeLegacyFilePieceBody("1.tsfile", 1L, body))); + Assert.assertEquals("1.tsfile", tsFilePieceReq.getFileName()); + Assert.assertEquals(1L, tsFilePieceReq.getStartWritingOffset()); + Assert.assertArrayEquals(body, tsFilePieceReq.getFilePiece()); + + final PipeTransferTsFilePieceWithModReq tsFilePieceWithModReq = + PipeTransferTsFilePieceWithModReq.fromTPipeTransferReq( + legacyTransferReq( + PipeRequestType.TRANSFER_TS_FILE_PIECE_WITH_MOD, + serializeLegacyFilePieceBody("1.tsfile.mod", 2L, body))); + Assert.assertEquals("1.tsfile.mod", tsFilePieceWithModReq.getFileName()); + Assert.assertEquals(2L, tsFilePieceWithModReq.getStartWritingOffset()); + Assert.assertArrayEquals(body, tsFilePieceWithModReq.getFilePiece()); + + final PipeTransferSchemaSnapshotPieceReq schemaSnapshotPieceReq = + PipeTransferSchemaSnapshotPieceReq.fromTPipeTransferReq( + legacyTransferReq( + PipeRequestType.TRANSFER_SCHEMA_SNAPSHOT_PIECE, + serializeLegacyFilePieceBody("schema.snapshot", 3L, body))); + Assert.assertEquals("schema.snapshot", schemaSnapshotPieceReq.getFileName()); + Assert.assertEquals(3L, schemaSnapshotPieceReq.getStartWritingOffset()); + Assert.assertArrayEquals(body, schemaSnapshotPieceReq.getFilePiece()); + } + @Test public void testPipeTransferTsFileSealReq() throws IOException { final String fileName = "1.tsfile"; @@ -749,6 +981,87 @@ public void testPipeTransferTsFileSealReq() throws IOException { Assert.assertEquals(req.getFileLength(), deserializeReq.getFileLength()); } + @Test + public void testPipeTransferTsFileSealReqFromLegacyV13Body() throws IOException { + final TPipeTransferReq req = + legacyTransferReq( + PipeRequestType.TRANSFER_TS_FILE_SEAL, serializeLegacyFileSealV1Body("1.tsfile", 100L)); + + final PipeTransferTsFileSealReq deserializeReq = + PipeTransferTsFileSealReq.fromTPipeTransferReq(req); + + Assert.assertEquals(req.getVersion(), deserializeReq.getVersion()); + Assert.assertEquals(req.getType(), deserializeReq.getType()); + Assert.assertEquals("1.tsfile", deserializeReq.getFileName()); + Assert.assertEquals(100L, deserializeReq.getFileLength()); + } + + @Test + public void testPipeTransferTsFileSealWithModReq() throws IOException { + final String modFileName = "1.tsfile.mod"; + final String tsFileName = "1.tsfile"; + + final PipeTransferTsFileSealWithModReq req = + PipeTransferTsFileSealWithModReq.toTPipeTransferReq( + modFileName, 10, tsFileName, 100, "root.db"); + final PipeTransferTsFileSealWithModReq deserializeReq = + PipeTransferTsFileSealWithModReq.fromTPipeTransferReq(req); + + Assert.assertEquals(req.getVersion(), deserializeReq.getVersion()); + Assert.assertEquals(req.getType(), deserializeReq.getType()); + Assert.assertEquals(Arrays.asList(modFileName, tsFileName), deserializeReq.getFileNames()); + Assert.assertEquals(Arrays.asList(10L, 100L), deserializeReq.getFileLengths()); + Assert.assertEquals("root.db", deserializeReq.getDatabaseNameByTsFileName()); + } + + @Test + public void testPipeTransferTsFileSealWithModReqFromLegacyV13BodyWithoutDatabaseName() + throws IOException { + final String modFileName = "1.tsfile.mod"; + final String tsFileName = "1.tsfile"; + final TPipeTransferReq req = + legacyTransferReq( + PipeRequestType.TRANSFER_TS_FILE_SEAL_WITH_MOD, + serializeLegacyFileSealV2Body( + Arrays.asList(modFileName, tsFileName), + Arrays.asList(10L, 100L), + Collections.emptyMap())); + + final PipeTransferTsFileSealWithModReq deserializeReq = + PipeTransferTsFileSealWithModReq.fromTPipeTransferReq(req); + + Assert.assertEquals(req.getVersion(), deserializeReq.getVersion()); + Assert.assertEquals(req.getType(), deserializeReq.getType()); + Assert.assertEquals(Arrays.asList(modFileName, tsFileName), deserializeReq.getFileNames()); + Assert.assertEquals(Arrays.asList(10L, 100L), deserializeReq.getFileLengths()); + Assert.assertTrue(deserializeReq.getParameters().isEmpty()); + Assert.assertNull(deserializeReq.getDatabaseNameByTsFileName()); + } + + @Test + public void testPipeTransferTsFileSealWithModReqFromLegacyV13BodyWithNullDatabaseName() + throws IOException { + final String modFileName = "1.tsfile.mod"; + final String tsFileName = "1.tsfile"; + final Map parameters = new HashMap<>(); + parameters.put("DATABASE_NAME_" + tsFileName, null); + final TPipeTransferReq req = + legacyTransferReq( + PipeRequestType.TRANSFER_TS_FILE_SEAL_WITH_MOD, + serializeLegacyFileSealV2Body( + Arrays.asList(modFileName, tsFileName), Arrays.asList(10L, 100L), parameters)); + + final PipeTransferTsFileSealWithModReq deserializeReq = + PipeTransferTsFileSealWithModReq.fromTPipeTransferReq(req); + + Assert.assertEquals(req.getVersion(), deserializeReq.getVersion()); + Assert.assertEquals(req.getType(), deserializeReq.getType()); + Assert.assertEquals(Arrays.asList(modFileName, tsFileName), deserializeReq.getFileNames()); + Assert.assertEquals(Arrays.asList(10L, 100L), deserializeReq.getFileLengths()); + Assert.assertEquals(parameters, deserializeReq.getParameters()); + Assert.assertNull(deserializeReq.getDatabaseNameByTsFileName()); + } + @Test public void testPipeTransferSchemaSnapshotSealReq() throws IOException { final String mTreeSnapshotName = SchemaConstant.MTREE_SNAPSHOT; @@ -784,6 +1097,36 @@ public void testPipeTransferSchemaSnapshotSealReq() throws IOException { Assert.assertEquals(req.getParameters(), deserializeReq.getParameters()); } + @Test + public void testPipeTransferSchemaSnapshotSealReqFromLegacyV13Body() throws IOException { + final String mTreeSnapshotName = SchemaConstant.MTREE_SNAPSHOT; + final String tLogName = SchemaConstant.TAG_LOG; + final Map parameters = new HashMap<>(); + parameters.put(ColumnHeaderConstant.PATH_PATTERN, "root.**"); + parameters.put(ColumnHeaderConstant.DATABASE, "root.db"); + parameters.put(ColumnHeaderConstant.TYPE, "19"); + + final TPipeTransferReq req = + legacyTransferReq( + PipeRequestType.TRANSFER_SCHEMA_SNAPSHOT_SEAL, + serializeLegacyFileSealV2Body( + Arrays.asList(mTreeSnapshotName, tLogName), Arrays.asList(100L, 10L), parameters)); + final PipeTransferSchemaSnapshotSealReq deserializeReq = + PipeTransferSchemaSnapshotSealReq.fromTPipeTransferReq(req); + + Assert.assertEquals(req.getVersion(), deserializeReq.getVersion()); + Assert.assertEquals(req.getType(), deserializeReq.getType()); + Assert.assertEquals(Arrays.asList(mTreeSnapshotName, tLogName), deserializeReq.getFileNames()); + Assert.assertEquals(Arrays.asList(100L, 10L), deserializeReq.getFileLengths()); + Assert.assertEquals(parameters, deserializeReq.getParameters()); + Assert.assertTrue( + PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured( + deserializeReq.getParameters())); + Assert.assertFalse( + PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured( + deserializeReq.getParameters())); + } + @Test public void testPipeTransferFilePieceResp() throws IOException { final PipeTransferFilePieceResp resp = @@ -795,6 +1138,83 @@ public void testPipeTransferFilePieceResp() throws IOException { Assert.assertEquals(resp.getEndWritingOffset(), deserializeResp.getEndWritingOffset()); } + private static TPipeTransferReq legacyTransferReq( + final PipeRequestType requestType, final ByteBuffer body) { + final TPipeTransferReq req = new TPipeTransferReq(); + req.version = IoTDBSinkRequestVersion.VERSION_1.getVersion(); + req.type = requestType.getType(); + req.body = body; + return req; + } + + private static ByteBuffer serializeLegacyFileSealV2Body( + final List fileNames, + final List fileLengths, + final Map parameters) + throws IOException { + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + ReadWriteIOUtils.write(fileNames.size(), outputStream); + for (final String fileName : fileNames) { + ReadWriteIOUtils.write(fileName, outputStream); + } + ReadWriteIOUtils.write(fileLengths.size(), outputStream); + for (final Long fileLength : fileLengths) { + ReadWriteIOUtils.write(fileLength, outputStream); + } + ReadWriteIOUtils.write(parameters.size(), outputStream); + for (final Map.Entry entry : parameters.entrySet()) { + ReadWriteIOUtils.write(entry.getKey(), outputStream); + ReadWriteIOUtils.write(entry.getValue(), outputStream); + } + return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + } + } + + private static ByteBuffer serializeLegacyHandshakeV1Body(final String timestampPrecision) + throws IOException { + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + ReadWriteIOUtils.write(timestampPrecision, outputStream); + return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + } + } + + private static ByteBuffer serializeLegacyHandshakeV2Body(final Map params) + throws IOException { + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + ReadWriteIOUtils.write(params.size(), outputStream); + for (final Map.Entry entry : params.entrySet()) { + ReadWriteIOUtils.write(entry.getKey(), outputStream); + ReadWriteIOUtils.write(entry.getValue(), outputStream); + } + return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + } + } + + private static ByteBuffer serializeLegacyFilePieceBody( + final String fileName, final long startWritingOffset, final byte[] filePiece) + throws IOException { + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + ReadWriteIOUtils.write(fileName, outputStream); + ReadWriteIOUtils.write(startWritingOffset, outputStream); + ReadWriteIOUtils.write(new Binary(filePiece), outputStream); + return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + } + } + + private static ByteBuffer serializeLegacyFileSealV1Body( + final String fileName, final long fileLength) throws IOException { + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + ReadWriteIOUtils.write(fileName, outputStream); + ReadWriteIOUtils.write(fileLength, outputStream); + return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + } + } + private static Tablet createSingleValueTablet(final String deviceId, final String measurement) { final List schemaList = new ArrayList<>(); schemaList.add(new MeasurementSchema(measurement, TSDataType.INT32)); @@ -814,4 +1234,89 @@ private static ByteBuffer serializeTablet(final Tablet tablet, final boolean isA return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); } } + + private static ByteBuffer serializeLegacyTabletRawBuffer(final boolean isAligned) + throws IOException { + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + ReadWriteIOUtils.write("root.sg.d", outputStream); + ReadWriteIOUtils.write(2, outputStream); + + ReadWriteIOUtils.write((byte) 1, outputStream); + ReadWriteIOUtils.write(2, outputStream); + writeLegacyMeasurementSchema(outputStream, "s1", TSDataType.INT32); + writeLegacyMeasurementSchema(outputStream, "s2", TSDataType.TEXT); + + ReadWriteIOUtils.write((byte) 1, outputStream); + ReadWriteIOUtils.write(2L, outputStream); + ReadWriteIOUtils.write(1L, outputStream); + + ReadWriteIOUtils.write((byte) 0, outputStream); + + ReadWriteIOUtils.write((byte) 1, outputStream); + ReadWriteIOUtils.write((byte) 1, outputStream); + ReadWriteIOUtils.write(2, outputStream); + ReadWriteIOUtils.write(1, outputStream); + ReadWriteIOUtils.write((byte) 1, outputStream); + ReadWriteIOUtils.write((byte) 1, outputStream); + ReadWriteIOUtils.write(new Binary("2", TSFileConfig.STRING_CHARSET), outputStream); + ReadWriteIOUtils.write((byte) 1, outputStream); + ReadWriteIOUtils.write(new Binary("1", TSFileConfig.STRING_CHARSET), outputStream); + + ReadWriteIOUtils.write(isAligned, outputStream); + return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + } + } + + private static ByteBuffer serializeLegacyTabletBatchBody( + final List insertNodeBuffers, final List tabletBuffers) + throws IOException { + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + ReadWriteIOUtils.write(0, outputStream); + + ReadWriteIOUtils.write(insertNodeBuffers.size(), outputStream); + for (final ByteBuffer insertNodeBuffer : insertNodeBuffers) { + writeByteBuffer(outputStream, insertNodeBuffer); + } + + ReadWriteIOUtils.write(tabletBuffers.size(), outputStream); + for (final ByteBuffer tabletBuffer : tabletBuffers) { + writeByteBuffer(outputStream, tabletBuffer); + } + + return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + } + } + + private static void writeByteBuffer( + final DataOutputStream outputStream, final ByteBuffer byteBuffer) throws IOException { + outputStream.write(byteBufferToByteArray(byteBuffer)); + } + + private static byte[] byteBufferToByteArray(final ByteBuffer byteBuffer) { + final ByteBuffer duplicatedBuffer = byteBuffer.duplicate(); + final byte[] bytes = new byte[duplicatedBuffer.remaining()]; + duplicatedBuffer.get(bytes); + return bytes; + } + + private static void writeLegacyMeasurementSchema( + final DataOutputStream outputStream, final String measurement, final TSDataType dataType) + throws IOException { + ReadWriteIOUtils.write((byte) 1, outputStream); + ReadWriteIOUtils.write(measurement, outputStream); + ReadWriteIOUtils.write(dataType.serialize(), outputStream); + ReadWriteIOUtils.write(TSEncoding.PLAIN.serialize(), outputStream); + ReadWriteIOUtils.write(CompressionType.UNCOMPRESSED.serialize(), outputStream); + ReadWriteIOUtils.write(0, outputStream); + } + + private static void assertLegacyTabletStatement(final InsertTabletStatement statement) { + Assert.assertEquals("root.sg.d", statement.getDevicePath().getFullPath()); + Assert.assertArrayEquals(new String[] {"s1", "s2"}, statement.getMeasurements()); + Assert.assertArrayEquals( + new TSDataType[] {TSDataType.INT32, TSDataType.TEXT}, statement.getDataTypes()); + Assert.assertEquals(2, statement.getRowCount()); + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferFileSealReqV2.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferFileSealReqV2.java index cf73948c3f14d..227d5871d8b5c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferFileSealReqV2.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferFileSealReqV2.java @@ -54,6 +54,14 @@ public final Map getParameters() { return parameters; } + public static boolean isTreeModelDataAllowedToBeCaptured(final Map parameters) { + return parameters.containsKey(TREE) || !parameters.containsKey(TABLE); + } + + public static boolean isTableModelDataAllowedToBeCaptured(final Map parameters) { + return parameters.containsKey(TABLE); + } + protected abstract PipeRequestType getPlanType(); /////////////////////////////// Thrift /////////////////////////////// diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilderTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilderTest.java index 290ce3979807d..cc836cae945f9 100644 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilderTest.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/common/PipeTransferSliceReqBuilderTest.java @@ -22,14 +22,20 @@ import org.apache.iotdb.commons.conf.CommonConfig; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion; +import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq; import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.PublicBAOS; +import org.apache.tsfile.utils.ReadWriteIOUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.io.DataOutputStream; +import java.io.IOException; import java.nio.ByteBuffer; public class PipeTransferSliceReqBuilderTest { @@ -91,6 +97,32 @@ public void testShouldSliceOnlyForVersion1RequestsAboveThreshold() { createReq(IoTDBSinkRequestVersion.VERSION_1.getVersion(), 4), bodySizeLimit)); } + @Test + public void testPipeTransferSliceReqFromLegacyV13Body() throws IOException { + final TPipeTransferReq req = new TPipeTransferReq(); + req.version = IoTDBSinkRequestVersion.VERSION_1.getVersion(); + req.type = PipeRequestType.TRANSFER_SLICE.getType(); + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + ReadWriteIOUtils.write(7, outputStream); + ReadWriteIOUtils.write(PipeRequestType.TRANSFER_TABLET_RAW.getType(), outputStream); + ReadWriteIOUtils.write(6, outputStream); + ReadWriteIOUtils.write(new Binary(new byte[] {2, 3, 4}), outputStream); + ReadWriteIOUtils.write(1, outputStream); + ReadWriteIOUtils.write(2, outputStream); + req.body = ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + } + + final PipeTransferSliceReq sliceReq = PipeTransferSliceReq.fromTPipeTransferReq(req); + + Assert.assertEquals(7, sliceReq.getOrderId()); + Assert.assertEquals(PipeRequestType.TRANSFER_TABLET_RAW.getType(), sliceReq.getOriginReqType()); + Assert.assertEquals(6, sliceReq.getOriginBodySize()); + Assert.assertArrayEquals(new byte[] {2, 3, 4}, sliceReq.getSliceBody()); + Assert.assertEquals(1, sliceReq.getSliceIndex()); + Assert.assertEquals(2, sliceReq.getSliceCount()); + } + private static TPipeTransferReq createReq(final byte version, final int bodySize) { final byte[] body = new byte[bodySize]; for (int i = 0; i < body.length; ++i) { diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeRequestTypeTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeRequestTypeTest.java new file mode 100644 index 0000000000000..c2e0ba949f984 --- /dev/null +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeRequestTypeTest.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.sink.payload.thrift.request; + +import org.junit.Assert; +import org.junit.Test; + +public class PipeRequestTypeTest { + + @Test + public void testAllV13RequestTypesAreRecognized() { + assertV13RequestType((short) 0, PipeRequestType.HANDSHAKE_CONFIGNODE_V1); + assertV13RequestType((short) 1, PipeRequestType.HANDSHAKE_DATANODE_V1); + assertV13RequestType((short) 50, PipeRequestType.HANDSHAKE_CONFIGNODE_V2); + assertV13RequestType((short) 51, PipeRequestType.HANDSHAKE_DATANODE_V2); + + assertV13RequestType((short) 2, PipeRequestType.TRANSFER_TABLET_INSERT_NODE); + assertV13RequestType((short) 3, PipeRequestType.TRANSFER_TABLET_RAW); + assertV13RequestType((short) 4, PipeRequestType.TRANSFER_TS_FILE_PIECE); + assertV13RequestType((short) 5, PipeRequestType.TRANSFER_TS_FILE_SEAL); + assertV13RequestType((short) 6, PipeRequestType.TRANSFER_TABLET_BATCH); + assertV13RequestType((short) 7, PipeRequestType.TRANSFER_TABLET_BINARY); + assertV13RequestType((short) 8, PipeRequestType.TRANSFER_TS_FILE_PIECE_WITH_MOD); + assertV13RequestType((short) 9, PipeRequestType.TRANSFER_TS_FILE_SEAL_WITH_MOD); + + // 1.3 named this request TRANSFER_SCHEMA_PLAN. 2.0 keeps the same wire type. + assertV13RequestType((short) 100, PipeRequestType.TRANSFER_PLAN_NODE); + assertV13RequestType((short) 101, PipeRequestType.TRANSFER_SCHEMA_SNAPSHOT_PIECE); + assertV13RequestType((short) 102, PipeRequestType.TRANSFER_SCHEMA_SNAPSHOT_SEAL); + + assertV13RequestType((short) 200, PipeRequestType.TRANSFER_CONFIG_PLAN); + assertV13RequestType((short) 201, PipeRequestType.TRANSFER_CONFIG_SNAPSHOT_PIECE); + assertV13RequestType((short) 202, PipeRequestType.TRANSFER_CONFIG_SNAPSHOT_SEAL); + + assertV13RequestType((short) 300, PipeRequestType.TRANSFER_COMPRESSED); + assertV13RequestType((short) 400, PipeRequestType.TRANSFER_SLICE); + } + + private static void assertV13RequestType( + final short type, final PipeRequestType expectedRequestType) { + Assert.assertTrue(PipeRequestType.isValidatedRequestType(type)); + Assert.assertEquals(expectedRequestType, PipeRequestType.valueOf(type)); + } +} diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferCompressedReqTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferCompressedReqTest.java new file mode 100644 index 0000000000000..ccdbc5086a5c9 --- /dev/null +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferCompressedReqTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.sink.payload.thrift.request; + +import org.apache.iotdb.commons.pipe.sink.compressor.PipeCompressor; +import org.apache.iotdb.commons.pipe.sink.compressor.PipeCompressorFactory; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; + +import org.apache.tsfile.utils.BytesUtils; +import org.apache.tsfile.utils.PublicBAOS; +import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +public class PipeTransferCompressedReqTest { + + @Test + public void testPipeTransferCompressedReq() throws IOException { + final TPipeTransferReq originalReq = new TPipeTransferReq(); + originalReq.version = IoTDBSinkRequestVersion.VERSION_1.getVersion(); + originalReq.type = PipeRequestType.TRANSFER_TABLET_BINARY.getType(); + originalReq.body = ByteBuffer.wrap(new byte[] {1, 2, 3, 4}); + + final TPipeTransferReq compressedReq = + PipeTransferCompressedReq.toTPipeTransferReq( + originalReq, + Collections.singletonList( + PipeCompressorFactory.getCompressor( + PipeCompressor.PipeCompressionType.GZIP.getIndex()))); + final TPipeTransferReq decompressedReq = + PipeTransferCompressedReq.fromTPipeTransferReq(compressedReq); + + Assert.assertEquals(IoTDBSinkRequestVersion.VERSION_1.getVersion(), compressedReq.version); + Assert.assertEquals(PipeRequestType.TRANSFER_COMPRESSED.getType(), compressedReq.type); + Assert.assertEquals(originalReq.version, decompressedReq.version); + Assert.assertEquals(originalReq.type, decompressedReq.type); + Assert.assertArrayEquals(originalReq.getBody(), decompressedReq.getBody()); + } + + @Test + public void testPipeTransferCompressedReqFromLegacyV13Body() throws IOException { + final TPipeTransferReq originalReq = new TPipeTransferReq(); + originalReq.version = IoTDBSinkRequestVersion.VERSION_1.getVersion(); + originalReq.type = PipeRequestType.TRANSFER_TABLET_BINARY.getType(); + originalReq.body = ByteBuffer.wrap(new byte[] {1, 2, 3, 4}); + + final TPipeTransferReq compressedReq = new TPipeTransferReq(); + compressedReq.version = IoTDBSinkRequestVersion.VERSION_1.getVersion(); + compressedReq.type = PipeRequestType.TRANSFER_COMPRESSED.getType(); + compressedReq.body = + serializeLegacyCompressedBody( + originalReq, + Collections.singletonList( + PipeCompressorFactory.getCompressor( + PipeCompressor.PipeCompressionType.GZIP.getIndex()))); + + final TPipeTransferReq decompressedReq = + PipeTransferCompressedReq.fromTPipeTransferReq(compressedReq); + + Assert.assertEquals(originalReq.version, decompressedReq.version); + Assert.assertEquals(originalReq.type, decompressedReq.type); + Assert.assertArrayEquals(originalReq.getBody(), decompressedReq.getBody()); + } + + private static ByteBuffer serializeLegacyCompressedBody( + final TPipeTransferReq originalReq, final List compressors) + throws IOException { + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + byte[] body = + BytesUtils.concatByteArrayList( + Arrays.asList( + new byte[] {originalReq.version}, + BytesUtils.shortToBytes(originalReq.type), + originalReq.getBody())); + + ReadWriteIOUtils.write((byte) compressors.size(), outputStream); + for (final PipeCompressor compressor : compressors) { + ReadWriteIOUtils.write(compressor.serialize(), outputStream); + ReadWriteIOUtils.write(body.length, outputStream); + body = compressor.compress(body); + } + outputStream.write(body); + + return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + } + } +} diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferFileSealReqV2Test.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferFileSealReqV2Test.java new file mode 100644 index 0000000000000..4ac0e5cb7c333 --- /dev/null +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/sink/payload/thrift/request/PipeTransferFileSealReqV2Test.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.sink.payload.thrift.request; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class PipeTransferFileSealReqV2Test { + + @Test + public void testLegacyV13SnapshotSealCapturesTreeOnly() { + final Map parameters = new HashMap<>(); + + Assert.assertTrue(PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(parameters)); + Assert.assertFalse(PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(parameters)); + } + + @Test + public void testExplicitTreeOnlySnapshotSealCapturesTreeOnly() { + final Map parameters = new HashMap<>(); + parameters.put(PipeTransferFileSealReqV2.TREE, ""); + + Assert.assertTrue(PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(parameters)); + Assert.assertFalse(PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(parameters)); + } + + @Test + public void testExplicitTableOnlySnapshotSealCapturesTableOnly() { + final Map parameters = new HashMap<>(); + parameters.put(PipeTransferFileSealReqV2.TABLE, ""); + + Assert.assertFalse(PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(parameters)); + Assert.assertTrue(PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(parameters)); + } + + @Test + public void testExplicitTreeAndTableSnapshotSealCapturesBoth() { + final Map parameters = new HashMap<>(); + parameters.put(PipeTransferFileSealReqV2.TREE, ""); + parameters.put(PipeTransferFileSealReqV2.TABLE, ""); + + Assert.assertTrue(PipeTransferFileSealReqV2.isTreeModelDataAllowedToBeCaptured(parameters)); + Assert.assertTrue(PipeTransferFileSealReqV2.isTableModelDataAllowedToBeCaptured(parameters)); + } +}