Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ public void testCaptureTreeAndTableIsolation() throws Exception {
Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT));

// Show table pipe by table session
Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT));
Assert.assertEquals(0, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT));

// 2. Create table pipe by table session
try (final Connection connection = senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
Expand All @@ -292,22 +292,28 @@ public void testCaptureTreeAndTableIsolation() throws Exception {
}

// Show tree pipe by tree session
Assert.assertEquals(2, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT));
Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT));

// Show table pipe by table session
Assert.assertEquals(2, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT));
Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT));

// 3. Drop pipe
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
client.dropPipeExtended(new TDropPipeReq(treePipeName).setIsTableModel(true)).getCode());
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
client
.dropPipeExtended(new TDropPipeReq(tablePipeName).setIsTableModel(false))
.getCode());
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.dropPipeExtended(new TDropPipeReq(treePipeName).setIsTableModel(false)).getCode());
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.dropPipeExtended(new TDropPipeReq(tablePipeName).setIsTableModel(true)).getCode());
}
}

Expand All @@ -333,10 +339,10 @@ public void testCaptureCornerCases() {
}

// Show tree pipe by tree session
Assert.assertEquals(0, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT));
Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT));

// Show table pipe by table session
Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT));
Assert.assertEquals(0, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT));

// 2. Create table pipe but capture tree data
try (final Connection connection = senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TStartPipeReq;
Expand Down Expand Up @@ -287,6 +288,7 @@ public void testReceiverPermission() throws Exception {
final String dbName = "test";
final String tbName = "test";

sourceAttributes.put(SystemConstant.SQL_DIALECT_KEY, SystemConstant.SQL_DIALECT_TABLE_VALUE);
sourceAttributes.put("source.inclusion", "all");
sourceAttributes.put("source.capture.tree", "false");
sourceAttributes.put("source.capture.table", "true");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ public void testDoubleLivingIsolation() throws Exception {
Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT));

// Show table pipe by table session
Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT));
Assert.assertEquals(0, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT));

// Create table pipe
try (final Connection connection = senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
Expand All @@ -313,22 +313,28 @@ public void testDoubleLivingIsolation() throws Exception {
}

// Show tree pipe by tree session
Assert.assertEquals(2, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT));
Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT));

// Show table pipe by table session
Assert.assertEquals(2, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT));
Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT));

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
// Drop pipe
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
client.dropPipeExtended(new TDropPipeReq(treePipeName).setIsTableModel(true)).getCode());
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(),
client
.dropPipeExtended(new TDropPipeReq(tablePipeName).setIsTableModel(false))
.getCode());
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.dropPipeExtended(new TDropPipeReq(treePipeName).setIsTableModel(false)).getCode());
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.dropPipeExtended(new TDropPipeReq(tablePipeName).setIsTableModel(true)).getCode());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TStartPipeReq;
import org.apache.iotdb.db.it.utils.TestUtils;
Expand Down Expand Up @@ -73,6 +74,8 @@ public void testTableSync() throws Exception {
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put(
SystemConstant.SQL_DIALECT_KEY, SystemConstant.SQL_DIALECT_TABLE_VALUE);
extractorAttributes.put("extractor.inclusion", "all");
extractorAttributes.put("extractor.capture.tree", "false");
extractorAttributes.put("extractor.capture.table", "true");
Expand Down Expand Up @@ -209,6 +212,8 @@ public void testNoTree() throws Exception {
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put(
SystemConstant.SQL_DIALECT_KEY, SystemConstant.SQL_DIALECT_TABLE_VALUE);
extractorAttributes.put("extractor.inclusion", "all");
extractorAttributes.put("extractor.capture.tree", "false");
extractorAttributes.put("extractor.capture.table", "true");
Expand Down Expand Up @@ -327,6 +332,8 @@ public void testAuth() throws Exception {
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put(
SystemConstant.SQL_DIALECT_KEY, SystemConstant.SQL_DIALECT_TABLE_VALUE);
extractorAttributes.put("extractor.inclusion", "all");
extractorAttributes.put("extractor.capture.tree", "false");
extractorAttributes.put("extractor.capture.table", "true");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,9 @@ public void testZstdCompressorLevel() throws Exception {
}

final List<TShowPipeInfo> showPipeResult =
client.showPipe(new TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList;
client.showPipe(
new TShowPipeReq().setIsTableModel(true).setUserName(SessionConfig.DEFAULT_USER))
.pipeInfoList;
showPipeResult.removeIf(i -> i.getId().startsWith("__consensus"));
Assert.assertEquals(
3,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,17 +221,6 @@ public void testZstdCompressorLevel() throws Exception {

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
TestUtils.executeNonQueries(
senderEnv,
Arrays.asList(
"insert into root.db.d1(time, s1) values (1, 1)",
"insert into root.db.d1(time, s2) values (1, 1)",
"insert into root.db.d1(time, s3) values (1, 1)",
"insert into root.db.d1(time, s4) values (1, 1)",
"insert into root.db.d1(time, s5) values (1, 1)",
"flush"),
null);

// Create 5 pipes with different zstd compression levels, p4 and p5 should fail.

try (final Connection connection = senderEnv.getConnection();
Expand Down Expand Up @@ -285,6 +274,17 @@ public void testZstdCompressorLevel() throws Exception {
fail(e.getMessage());
}

TestUtils.executeNonQueries(
senderEnv,
Arrays.asList(
"insert into root.db.d1(time, s1) values (1, 1)",
"insert into root.db.d1(time, s2) values (1, 1)",
"insert into root.db.d1(time, s3) values (1, 1)",
"insert into root.db.d1(time, s4) values (1, 1)",
"insert into root.db.d1(time, s5) values (1, 1)",
"flush"),
null);

try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
statement.execute(
Expand Down Expand Up @@ -332,9 +332,9 @@ public void testZstdCompressorLevel() throws Exception {

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"count timeseries root.db.**",
"count(timeseries),",
Collections.singleton("3,"),
"select count(*) from root.db.**",
"count(root.db.d1.s1),count(root.db.d1.s2),count(root.db.d1.s3),",
Collections.singleton("1,1,1,"),
handleFailure);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant;
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp;
import org.apache.iotdb.rpc.TSStatusCode;

Expand Down Expand Up @@ -118,4 +120,61 @@ public void testFilter() {
PipeTableResp allPipeTableResp = pipeTableResp.filter(true, null);
Assert.assertEquals(3, allPipeTableResp.getAllPipeMeta().size());
}

@Test
public void testFilterBySqlDialectIgnoresCaptureOptions() {
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
List<PipeMeta> pipeMetaList = new ArrayList<>();

Map<String, String> treeExtractorAttributes = new HashMap<>();
treeExtractorAttributes.put(
SystemConstant.SQL_DIALECT_KEY, SystemConstant.SQL_DIALECT_TREE_VALUE);
treeExtractorAttributes.put(PipeSourceConstant.SOURCE_CAPTURE_TREE_KEY, "false");
treeExtractorAttributes.put(PipeSourceConstant.SOURCE_CAPTURE_TABLE_KEY, "true");
treeExtractorAttributes.put(PipeSourceConstant.SOURCE_MODE_DOUBLE_LIVING_KEY, "true");
pipeMetaList.add(constructPipeMeta("treePipe", 121, treeExtractorAttributes));

Map<String, String> tableExtractorAttributes = new HashMap<>();
tableExtractorAttributes.put(
SystemConstant.SQL_DIALECT_KEY, SystemConstant.SQL_DIALECT_TABLE_VALUE);
tableExtractorAttributes.put(PipeSourceConstant.SOURCE_CAPTURE_TREE_KEY, "true");
tableExtractorAttributes.put(PipeSourceConstant.SOURCE_CAPTURE_TABLE_KEY, "true");
tableExtractorAttributes.put(PipeSourceConstant.SOURCE_MODE_DOUBLE_LIVING_KEY, "true");
pipeMetaList.add(constructPipeMeta("tablePipe", 122, tableExtractorAttributes));

PipeTableResp treePipeTableResp =
new PipeTableResp(status, new ArrayList<>(pipeMetaList)).filter(false, null, false, null);
Assert.assertEquals(1, treePipeTableResp.getAllPipeMeta().size());
Assert.assertEquals(
"treePipe", treePipeTableResp.getAllPipeMeta().get(0).getStaticMeta().getPipeName());

PipeTableResp tablePipeTableResp =
new PipeTableResp(status, new ArrayList<>(pipeMetaList)).filter(false, null, true, null);
Assert.assertEquals(1, tablePipeTableResp.getAllPipeMeta().size());
Assert.assertEquals(
"tablePipe", tablePipeTableResp.getAllPipeMeta().get(0).getStaticMeta().getPipeName());
}

private PipeMeta constructPipeMeta(
final String pipeName,
final long creationTime,
final Map<String, String> extractorAttributes) {
Map<String, String> processorAttributes = new HashMap<>();
Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor", "iotdb-extractor");
processorAttributes.put("processor", "do-nothing-processor");
connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("host", "127.0.0.1");
connectorAttributes.put("port", "6667");

PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1);
ConcurrentMap<Integer, PipeTaskMeta> pipeTasks = new ConcurrentHashMap<>();
pipeTasks.put(1, pipeTaskMeta);
PipeStaticMeta pipeStaticMeta =
new PipeStaticMeta(
pipeName, creationTime, extractorAttributes, processorAttributes, connectorAttributes);
PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(pipeTasks);
return new PipeMeta(pipeStaticMeta, pipeRuntimeMeta);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ public static boolean isSubscriptionPipe(final String pipeName) {

public boolean visibleUnder(final boolean isTableModel) {
final Visibility visibility =
VisibilityUtils.calculateFromExtractorParameters(sourceParameters);
VisibilityUtils.calculateFromPipeSourceParameters(sourceParameters);
return VisibilityUtils.isCompatible(visibility, isTableModel);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,16 @@ public static Visibility calculateFromExtractorParameters(
return Visibility.NONE;
}

public static Visibility calculateFromPipeSourceParameters(
final PipeParameters sourceParameters) {
final boolean isTreeDialect =
sourceParameters
.getStringOrDefault(
SystemConstant.SQL_DIALECT_KEY, SystemConstant.SQL_DIALECT_TREE_VALUE)
.equals(SystemConstant.SQL_DIALECT_TREE_VALUE);
return isTreeDialect ? Visibility.TREE_ONLY : Visibility.TABLE_ONLY;
}

public static Visibility calculateFromTopicConfig(final TopicConfig config) {
final boolean isTreeDialect =
config
Expand Down
Loading
Loading