diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeIsolationIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeIsolationIT.java index 6924f17cedc43..749ac0a271ebf 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeIsolationIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeIsolationIT.java @@ -272,7 +272,7 @@ public void testCaptureTreeAndTableIsolation() throws Exception { Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT)); // Show table pipe by table session - Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT)); + Assert.assertEquals(0, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT)); // 2. Create table pipe by table session try (final Connection connection = senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT); @@ -292,22 +292,28 @@ public void testCaptureTreeAndTableIsolation() throws Exception { } // Show tree pipe by tree session - Assert.assertEquals(2, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT)); + Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT)); // Show table pipe by table session - Assert.assertEquals(2, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT)); + Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT)); // 3. Drop pipe try (final SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { Assert.assertEquals( - TSStatusCode.SUCCESS_STATUS.getStatusCode(), + TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(), client.dropPipeExtended(new TDropPipeReq(treePipeName).setIsTableModel(true)).getCode()); Assert.assertEquals( - TSStatusCode.SUCCESS_STATUS.getStatusCode(), + TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(), client .dropPipeExtended(new TDropPipeReq(tablePipeName).setIsTableModel(false)) .getCode()); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + client.dropPipeExtended(new TDropPipeReq(treePipeName).setIsTableModel(false)).getCode()); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + client.dropPipeExtended(new TDropPipeReq(tablePipeName).setIsTableModel(true)).getCode()); } } @@ -333,10 +339,10 @@ public void testCaptureCornerCases() { } // Show tree pipe by tree session - Assert.assertEquals(0, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT)); + Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT)); // Show table pipe by table session - Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT)); + Assert.assertEquals(0, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT)); // 2. Create table pipe but capture tree data try (final Connection connection = senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT); diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java index f233596fc60cd..158c2dde7e240 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipePermissionIT.java @@ -21,6 +21,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; +import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq; import org.apache.iotdb.confignode.rpc.thrift.TStartPipeReq; @@ -287,6 +288,7 @@ public void testReceiverPermission() throws Exception { final String dbName = "test"; final String tbName = "test"; + sourceAttributes.put(SystemConstant.SQL_DIALECT_KEY, SystemConstant.SQL_DIALECT_TABLE_VALUE); sourceAttributes.put("source.inclusion", "all"); sourceAttributes.put("source.capture.tree", "false"); sourceAttributes.put("source.capture.table", "true"); diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeDoubleLivingIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeDoubleLivingIT.java index 4d5f350a7c7b5..550b67eb375b4 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeDoubleLivingIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeDoubleLivingIT.java @@ -294,7 +294,7 @@ public void testDoubleLivingIsolation() throws Exception { Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT)); // Show table pipe by table session - Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT)); + Assert.assertEquals(0, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT)); // Create table pipe try (final Connection connection = senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT); @@ -313,22 +313,28 @@ public void testDoubleLivingIsolation() throws Exception { } // Show tree pipe by tree session - Assert.assertEquals(2, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT)); + Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TREE_SQL_DIALECT)); // Show table pipe by table session - Assert.assertEquals(2, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT)); + Assert.assertEquals(1, TableModelUtils.showPipesCount(senderEnv, BaseEnv.TABLE_SQL_DIALECT)); try (final SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { // Drop pipe Assert.assertEquals( - TSStatusCode.SUCCESS_STATUS.getStatusCode(), + TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(), client.dropPipeExtended(new TDropPipeReq(treePipeName).setIsTableModel(true)).getCode()); Assert.assertEquals( - TSStatusCode.SUCCESS_STATUS.getStatusCode(), + TSStatusCode.PIPE_NOT_EXIST_ERROR.getStatusCode(), client .dropPipeExtended(new TDropPipeReq(tablePipeName).setIsTableModel(false)) .getCode()); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + client.dropPipeExtended(new TDropPipeReq(treePipeName).setIsTableModel(false)).getCode()); + Assert.assertEquals( + TSStatusCode.SUCCESS_STATUS.getStatusCode(), + client.dropPipeExtended(new TDropPipeReq(tablePipeName).setIsTableModel(true)).getCode()); } } } diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeMetaIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeMetaIT.java index f26aa171ae11b..6b3765527b190 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeMetaIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeMetaIT.java @@ -21,6 +21,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; +import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; import org.apache.iotdb.confignode.rpc.thrift.TStartPipeReq; import org.apache.iotdb.db.it.utils.TestUtils; @@ -73,6 +74,8 @@ public void testTableSync() throws Exception { final Map processorAttributes = new HashMap<>(); final Map connectorAttributes = new HashMap<>(); + extractorAttributes.put( + SystemConstant.SQL_DIALECT_KEY, SystemConstant.SQL_DIALECT_TABLE_VALUE); extractorAttributes.put("extractor.inclusion", "all"); extractorAttributes.put("extractor.capture.tree", "false"); extractorAttributes.put("extractor.capture.table", "true"); @@ -209,6 +212,8 @@ public void testNoTree() throws Exception { final Map processorAttributes = new HashMap<>(); final Map connectorAttributes = new HashMap<>(); + extractorAttributes.put( + SystemConstant.SQL_DIALECT_KEY, SystemConstant.SQL_DIALECT_TABLE_VALUE); extractorAttributes.put("extractor.inclusion", "all"); extractorAttributes.put("extractor.capture.tree", "false"); extractorAttributes.put("extractor.capture.table", "true"); @@ -327,6 +332,8 @@ public void testAuth() throws Exception { final Map processorAttributes = new HashMap<>(); final Map connectorAttributes = new HashMap<>(); + extractorAttributes.put( + SystemConstant.SQL_DIALECT_KEY, SystemConstant.SQL_DIALECT_TABLE_VALUE); extractorAttributes.put("extractor.inclusion", "all"); extractorAttributes.put("extractor.capture.tree", "false"); extractorAttributes.put("extractor.capture.table", "true"); diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java index 547db349e2fa7..7353be0c1b777 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeSinkCompressionIT.java @@ -332,7 +332,9 @@ public void testZstdCompressorLevel() throws Exception { } final List showPipeResult = - client.showPipe(new TShowPipeReq().setUserName(SessionConfig.DEFAULT_USER)).pipeInfoList; + client.showPipe( + new TShowPipeReq().setIsTableModel(true).setUserName(SessionConfig.DEFAULT_USER)) + .pipeInfoList; showPipeResult.removeIf(i -> i.getId().startsWith("__consensus")); Assert.assertEquals( 3, diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java index 3900506a658b5..46ddfed538ada 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeSinkCompressionIT.java @@ -221,17 +221,6 @@ public void testZstdCompressorLevel() throws Exception { try (final SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { - TestUtils.executeNonQueries( - senderEnv, - Arrays.asList( - "insert into root.db.d1(time, s1) values (1, 1)", - "insert into root.db.d1(time, s2) values (1, 1)", - "insert into root.db.d1(time, s3) values (1, 1)", - "insert into root.db.d1(time, s4) values (1, 1)", - "insert into root.db.d1(time, s5) values (1, 1)", - "flush"), - null); - // Create 5 pipes with different zstd compression levels, p4 and p5 should fail. try (final Connection connection = senderEnv.getConnection(); @@ -285,6 +274,17 @@ public void testZstdCompressorLevel() throws Exception { fail(e.getMessage()); } + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList( + "insert into root.db.d1(time, s1) values (1, 1)", + "insert into root.db.d1(time, s2) values (1, 1)", + "insert into root.db.d1(time, s3) values (1, 1)", + "insert into root.db.d1(time, s4) values (1, 1)", + "insert into root.db.d1(time, s5) values (1, 1)", + "flush"), + null); + try (final Connection connection = senderEnv.getConnection(); final Statement statement = connection.createStatement()) { statement.execute( @@ -332,9 +332,9 @@ public void testZstdCompressorLevel() throws Exception { TestUtils.assertDataEventuallyOnEnv( receiverEnv, - "count timeseries root.db.**", - "count(timeseries),", - Collections.singleton("3,"), + "select count(*) from root.db.**", + "count(root.db.d1.s1),count(root.db.d1.s2),count(root.db.d1.s3),", + Collections.singleton("1,1,1,"), handleFailure); } } diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipeTableRespTest.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipeTableRespTest.java index 94189a19d9977..13eb6fa074c95 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipeTableRespTest.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/response/pipe/PipeTableRespTest.java @@ -24,6 +24,8 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; +import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; +import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; import org.apache.iotdb.confignode.consensus.response.pipe.task.PipeTableResp; import org.apache.iotdb.rpc.TSStatusCode; @@ -118,4 +120,61 @@ public void testFilter() { PipeTableResp allPipeTableResp = pipeTableResp.filter(true, null); Assert.assertEquals(3, allPipeTableResp.getAllPipeMeta().size()); } + + @Test + public void testFilterBySqlDialectIgnoresCaptureOptions() { + TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); + List pipeMetaList = new ArrayList<>(); + + Map treeExtractorAttributes = new HashMap<>(); + treeExtractorAttributes.put( + SystemConstant.SQL_DIALECT_KEY, SystemConstant.SQL_DIALECT_TREE_VALUE); + treeExtractorAttributes.put(PipeSourceConstant.SOURCE_CAPTURE_TREE_KEY, "false"); + treeExtractorAttributes.put(PipeSourceConstant.SOURCE_CAPTURE_TABLE_KEY, "true"); + treeExtractorAttributes.put(PipeSourceConstant.SOURCE_MODE_DOUBLE_LIVING_KEY, "true"); + pipeMetaList.add(constructPipeMeta("treePipe", 121, treeExtractorAttributes)); + + Map tableExtractorAttributes = new HashMap<>(); + tableExtractorAttributes.put( + SystemConstant.SQL_DIALECT_KEY, SystemConstant.SQL_DIALECT_TABLE_VALUE); + tableExtractorAttributes.put(PipeSourceConstant.SOURCE_CAPTURE_TREE_KEY, "true"); + tableExtractorAttributes.put(PipeSourceConstant.SOURCE_CAPTURE_TABLE_KEY, "true"); + tableExtractorAttributes.put(PipeSourceConstant.SOURCE_MODE_DOUBLE_LIVING_KEY, "true"); + pipeMetaList.add(constructPipeMeta("tablePipe", 122, tableExtractorAttributes)); + + PipeTableResp treePipeTableResp = + new PipeTableResp(status, new ArrayList<>(pipeMetaList)).filter(false, null, false, null); + Assert.assertEquals(1, treePipeTableResp.getAllPipeMeta().size()); + Assert.assertEquals( + "treePipe", treePipeTableResp.getAllPipeMeta().get(0).getStaticMeta().getPipeName()); + + PipeTableResp tablePipeTableResp = + new PipeTableResp(status, new ArrayList<>(pipeMetaList)).filter(false, null, true, null); + Assert.assertEquals(1, tablePipeTableResp.getAllPipeMeta().size()); + Assert.assertEquals( + "tablePipe", tablePipeTableResp.getAllPipeMeta().get(0).getStaticMeta().getPipeName()); + } + + private PipeMeta constructPipeMeta( + final String pipeName, + final long creationTime, + final Map extractorAttributes) { + Map processorAttributes = new HashMap<>(); + Map connectorAttributes = new HashMap<>(); + + extractorAttributes.put("extractor", "iotdb-extractor"); + processorAttributes.put("processor", "do-nothing-processor"); + connectorAttributes.put("connector", "iotdb-thrift-connector"); + connectorAttributes.put("host", "127.0.0.1"); + connectorAttributes.put("port", "6667"); + + PipeTaskMeta pipeTaskMeta = new PipeTaskMeta(MinimumProgressIndex.INSTANCE, 1); + ConcurrentMap pipeTasks = new ConcurrentHashMap<>(); + pipeTasks.put(1, pipeTaskMeta); + PipeStaticMeta pipeStaticMeta = + new PipeStaticMeta( + pipeName, creationTime, extractorAttributes, processorAttributes, connectorAttributes); + PipeRuntimeMeta pipeRuntimeMeta = new PipeRuntimeMeta(pipeTasks); + return new PipeMeta(pipeStaticMeta, pipeRuntimeMeta); + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java index 4b57cd64e43b2..efb0fdd310936 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/meta/PipeStaticMeta.java @@ -246,7 +246,7 @@ public static boolean isSubscriptionPipe(final String pipeName) { public boolean visibleUnder(final boolean isTableModel) { final Visibility visibility = - VisibilityUtils.calculateFromExtractorParameters(sourceParameters); + VisibilityUtils.calculateFromPipeSourceParameters(sourceParameters); return VisibilityUtils.isCompatible(visibility, isTableModel); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/visibility/VisibilityUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/visibility/VisibilityUtils.java index 235a22e23adb2..920fd933f5284 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/visibility/VisibilityUtils.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/datastructure/visibility/VisibilityUtils.java @@ -132,6 +132,16 @@ public static Visibility calculateFromExtractorParameters( return Visibility.NONE; } + public static Visibility calculateFromPipeSourceParameters( + final PipeParameters sourceParameters) { + final boolean isTreeDialect = + sourceParameters + .getStringOrDefault( + SystemConstant.SQL_DIALECT_KEY, SystemConstant.SQL_DIALECT_TREE_VALUE) + .equals(SystemConstant.SQL_DIALECT_TREE_VALUE); + return isTreeDialect ? Visibility.TREE_ONLY : Visibility.TABLE_ONLY; + } + public static Visibility calculateFromTopicConfig(final TopicConfig config) { final boolean isTreeDialect = config diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/visibility/VisibilityUtilsTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/visibility/VisibilityUtilsTest.java new file mode 100644 index 0000000000000..82a0261193ee4 --- /dev/null +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/pipe/datastructure/visibility/VisibilityUtilsTest.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.commons.pipe.datastructure.visibility; + +import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; +import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; +import org.apache.iotdb.commons.pipe.config.constant.SystemConstant; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class VisibilityUtilsTest { + + @Test + public void testPipeSourceParametersVisibilityDefaultsToTreeDialect() { + Assert.assertEquals( + Visibility.TREE_ONLY, + VisibilityUtils.calculateFromPipeSourceParameters( + new PipeParameters(Collections.emptyMap()))); + } + + @Test + public void testPipeSourceParametersVisibilityUsesSqlDialectOnly() { + final Map treeSourceAttributes = new HashMap<>(); + treeSourceAttributes.put(SystemConstant.SQL_DIALECT_KEY, SystemConstant.SQL_DIALECT_TREE_VALUE); + treeSourceAttributes.put(PipeSourceConstant.SOURCE_CAPTURE_TREE_KEY, "false"); + treeSourceAttributes.put(PipeSourceConstant.SOURCE_CAPTURE_TABLE_KEY, "true"); + treeSourceAttributes.put(PipeSourceConstant.SOURCE_MODE_DOUBLE_LIVING_KEY, "true"); + + Assert.assertEquals( + Visibility.TREE_ONLY, + VisibilityUtils.calculateFromPipeSourceParameters( + new PipeParameters(treeSourceAttributes))); + + final Map tableSourceAttributes = new HashMap<>(); + tableSourceAttributes.put( + SystemConstant.SQL_DIALECT_KEY, SystemConstant.SQL_DIALECT_TABLE_VALUE); + tableSourceAttributes.put(PipeSourceConstant.SOURCE_CAPTURE_TREE_KEY, "true"); + tableSourceAttributes.put(PipeSourceConstant.SOURCE_CAPTURE_TABLE_KEY, "true"); + tableSourceAttributes.put(PipeSourceConstant.SOURCE_MODE_DOUBLE_LIVING_KEY, "true"); + + Assert.assertEquals( + Visibility.TABLE_ONLY, + VisibilityUtils.calculateFromPipeSourceParameters( + new PipeParameters(tableSourceAttributes))); + } + + @Test + public void testExtractorParametersVisibilityStillUsesCaptureAndDoubleLiving() { + final Map treeDialectCaptureTableAttributes = new HashMap<>(); + treeDialectCaptureTableAttributes.put( + SystemConstant.SQL_DIALECT_KEY, SystemConstant.SQL_DIALECT_TREE_VALUE); + treeDialectCaptureTableAttributes.put(PipeSourceConstant.SOURCE_CAPTURE_TREE_KEY, "false"); + treeDialectCaptureTableAttributes.put(PipeSourceConstant.SOURCE_CAPTURE_TABLE_KEY, "true"); + + Assert.assertEquals( + Visibility.TABLE_ONLY, + VisibilityUtils.calculateFromExtractorParameters( + new PipeParameters(treeDialectCaptureTableAttributes))); + Assert.assertEquals( + Visibility.TREE_ONLY, + VisibilityUtils.calculateFromPipeSourceParameters( + new PipeParameters(treeDialectCaptureTableAttributes))); + + treeDialectCaptureTableAttributes.put(PipeSourceConstant.SOURCE_MODE_DOUBLE_LIVING_KEY, "true"); + Assert.assertEquals( + Visibility.BOTH, + VisibilityUtils.calculateFromExtractorParameters( + new PipeParameters(treeDialectCaptureTableAttributes))); + Assert.assertEquals( + Visibility.TREE_ONLY, + VisibilityUtils.calculateFromPipeSourceParameters( + new PipeParameters(treeDialectCaptureTableAttributes))); + } + + @Test + public void testPipeStaticMetaVisibilityUsesSqlDialect() { + final Map treeSourceAttributes = new HashMap<>(); + treeSourceAttributes.put(SystemConstant.SQL_DIALECT_KEY, SystemConstant.SQL_DIALECT_TREE_VALUE); + treeSourceAttributes.put(PipeSourceConstant.SOURCE_CAPTURE_TREE_KEY, "false"); + treeSourceAttributes.put(PipeSourceConstant.SOURCE_CAPTURE_TABLE_KEY, "true"); + + final Map tableSourceAttributes = new HashMap<>(); + tableSourceAttributes.put( + SystemConstant.SQL_DIALECT_KEY, SystemConstant.SQL_DIALECT_TABLE_VALUE); + tableSourceAttributes.put(PipeSourceConstant.SOURCE_CAPTURE_TREE_KEY, "true"); + tableSourceAttributes.put(PipeSourceConstant.SOURCE_CAPTURE_TABLE_KEY, "true"); + + final PipeStaticMeta treePipeStaticMeta = + new PipeStaticMeta( + "treePipe", 1, treeSourceAttributes, Collections.emptyMap(), Collections.emptyMap()); + Assert.assertTrue(treePipeStaticMeta.visibleUnder(false)); + Assert.assertFalse(treePipeStaticMeta.visibleUnder(true)); + + final PipeStaticMeta tablePipeStaticMeta = + new PipeStaticMeta( + "tablePipe", 1, tableSourceAttributes, Collections.emptyMap(), Collections.emptyMap()); + Assert.assertFalse(tablePipeStaticMeta.visibleUnder(false)); + Assert.assertTrue(tablePipeStaticMeta.visibleUnder(true)); + } + + @Test + public void testPipeStaticMetaVisibilityDefaultsToTreeDialect() { + final Map sourceAttributes = new HashMap<>(); + sourceAttributes.put(PipeSourceConstant.SOURCE_CAPTURE_TREE_KEY, "false"); + sourceAttributes.put(PipeSourceConstant.SOURCE_CAPTURE_TABLE_KEY, "true"); + sourceAttributes.put(PipeSourceConstant.SOURCE_MODE_DOUBLE_LIVING_KEY, "true"); + + final PipeStaticMeta pipeStaticMeta = + new PipeStaticMeta( + "defaultTreePipe", 1, sourceAttributes, Collections.emptyMap(), Collections.emptyMap()); + + Assert.assertTrue(pipeStaticMeta.visibleUnder(false)); + Assert.assertFalse(pipeStaticMeta.visibleUnder(true)); + } + + @Test + public void testPipeStaticMetaVisibilityIgnoresDoubleLiving() { + final Map treeSourceAttributes = new HashMap<>(); + treeSourceAttributes.put(SystemConstant.SQL_DIALECT_KEY, SystemConstant.SQL_DIALECT_TREE_VALUE); + treeSourceAttributes.put(PipeSourceConstant.SOURCE_CAPTURE_TREE_KEY, "true"); + treeSourceAttributes.put(PipeSourceConstant.SOURCE_CAPTURE_TABLE_KEY, "true"); + treeSourceAttributes.put(PipeSourceConstant.SOURCE_MODE_DOUBLE_LIVING_KEY, "true"); + + final PipeStaticMeta treePipeStaticMeta = + new PipeStaticMeta( + "treeDoubleLivingPipe", + 1, + treeSourceAttributes, + Collections.emptyMap(), + Collections.emptyMap()); + Assert.assertTrue(treePipeStaticMeta.visibleUnder(false)); + Assert.assertFalse(treePipeStaticMeta.visibleUnder(true)); + + final Map tableSourceAttributes = new HashMap<>(); + tableSourceAttributes.put( + SystemConstant.SQL_DIALECT_KEY, SystemConstant.SQL_DIALECT_TABLE_VALUE); + tableSourceAttributes.put(PipeSourceConstant.SOURCE_CAPTURE_TREE_KEY, "true"); + tableSourceAttributes.put(PipeSourceConstant.SOURCE_CAPTURE_TABLE_KEY, "true"); + tableSourceAttributes.put(PipeSourceConstant.SOURCE_MODE_DOUBLE_LIVING_KEY, "true"); + + final PipeStaticMeta tablePipeStaticMeta = + new PipeStaticMeta( + "tableDoubleLivingPipe", + 1, + tableSourceAttributes, + Collections.emptyMap(), + Collections.emptyMap()); + Assert.assertFalse(tablePipeStaticMeta.visibleUnder(false)); + Assert.assertTrue(tablePipeStaticMeta.visibleUnder(true)); + } + + @Test + public void testPipeStaticMetaVisibilitySurvivesSerialization() throws IOException { + final Map tableSourceAttributes = new HashMap<>(); + tableSourceAttributes.put( + SystemConstant.SQL_DIALECT_KEY, SystemConstant.SQL_DIALECT_TABLE_VALUE); + tableSourceAttributes.put(PipeSourceConstant.SOURCE_CAPTURE_TREE_KEY, "true"); + tableSourceAttributes.put(PipeSourceConstant.SOURCE_CAPTURE_TABLE_KEY, "true"); + + final PipeStaticMeta tablePipeStaticMeta = + new PipeStaticMeta( + "tablePipe", 1, tableSourceAttributes, Collections.emptyMap(), Collections.emptyMap()); + + final PipeStaticMeta byteBufferDeserializedMeta = + PipeStaticMeta.deserialize(tablePipeStaticMeta.serialize()); + Assert.assertFalse(byteBufferDeserializedMeta.visibleUnder(false)); + Assert.assertTrue(byteBufferDeserializedMeta.visibleUnder(true)); + + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + tablePipeStaticMeta.serialize(outputStream); + final PipeStaticMeta inputStreamDeserializedMeta = + PipeStaticMeta.deserialize(new ByteArrayInputStream(outputStream.toByteArray())); + Assert.assertFalse(inputStreamDeserializedMeta.visibleUnder(false)); + Assert.assertTrue(inputStreamDeserializedMeta.visibleUnder(true)); + } +}