From f9c342ea94c4964cca9a7ca93f34dbf0d4787072 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 3 Feb 2026 16:29:00 +0800 Subject: [PATCH 1/2] npe-fix --- .../pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java index eefa5e739d5b8..4878facfc877d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java @@ -37,6 +37,7 @@ import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.pipe.sink.payload.legacy.TsFilePipeData; import org.apache.iotdb.db.storageengine.StorageEngine; +import org.apache.iotdb.db.storageengine.dataregion.DataRegion; import org.apache.iotdb.pipe.api.PipeConnector; import org.apache.iotdb.pipe.api.annotation.TreeModel; import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; @@ -203,10 +204,12 @@ public void customize( trustStore = parameters.getString(SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY); trustStorePwd = parameters.getString(SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY); - databaseName = + final DataRegion dataRegion = StorageEngine.getInstance() - .getDataRegion(new DataRegionId(configuration.getRuntimeEnvironment().getRegionId())) - .getDatabaseName(); + .getDataRegion(new DataRegionId(configuration.getRuntimeEnvironment().getRegionId())); + if (Objects.nonNull(dataRegion)) { + databaseName = dataRegion.getDatabaseName(); + } } @Override From a19d8d13f93ee24453ab42868835659ceb8fcddd Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 3 Feb 2026 18:35:30 +0800 Subject: [PATCH 2/2] unb --- .../auto/basic/IoTDBPipeDataSinkIT.java | 34 ++++++------------- .../protocol/legacy/IoTDBLegacyPipeSink.java | 2 +- 2 files changed, 12 insertions(+), 24 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java index 0a927cab2985d..035d5052cb797 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; import org.apache.iotdb.db.it.utils.TestUtils; +import org.apache.iotdb.it.env.EnvFactory; import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.MultiClusterIT2DualTreeAutoBasic; @@ -35,6 +36,8 @@ import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; +import java.sql.Connection; +import java.sql.Statement; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -211,31 +214,16 @@ public void testLegacyConnector() throws Exception { final String receiverIp = receiverDataNode.getIp(); final int receiverPort = receiverDataNode.getPort(); + try (final Connection connection = EnvFactory.getEnv().getConnection(); + final Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "create pipe testPipe ('sink'='iotdb-legacy-pipe-sink', 'ip'='%s', 'port'='%s', 'version'='1.3')", + receiverIp, receiverPort)); + } + try (final SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { - final Map sourceAttributes = new HashMap<>(); - final Map processorAttributes = new HashMap<>(); - final Map sinkAttributes = new HashMap<>(); - - sourceAttributes.put("source.realtime.mode", "log"); - sourceAttributes.put("user", "root"); - - sinkAttributes.put("sink", "iotdb-legacy-pipe-sink"); - sinkAttributes.put("sink.batch.enable", "false"); - sinkAttributes.put("sink.ip", receiverIp); - sinkAttributes.put("sink.port", Integer.toString(receiverPort)); - - // This version does not matter since it's no longer checked by the legacy receiver - sinkAttributes.put("sink.version", "1.3"); - - final TSStatus status = - client.createPipe( - new TCreatePipeReq("testPipe", sinkAttributes) - .setExtractorAttributes(sourceAttributes) - .setProcessorAttributes(processorAttributes)); - - Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); - Assert.assertEquals( TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java index 4878facfc877d..332ca6bab7c8b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java @@ -111,7 +111,7 @@ public class IoTDBLegacyPipeSink implements PipeConnector { private String syncConnectorVersion; private String pipeName; - private String databaseName; + private String databaseName = ""; private IoTDBSyncClient client;