diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java index 9b9c4ad41a1f6..4ab85264fc826 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java @@ -355,6 +355,17 @@ private void generalTestWithAllOptions( Assert.fail(); } + // The kill point is detected by a background thread tailing the node log, so the migration + // result (observed by awaitUntilSuccess above) can become visible before that thread has read + // and processed the kill-point line of the last migration phase (e.g. + // RemoveRegionLocationCache). Give that thread a short grace period to catch up before + // asserting, otherwise checkKillPointsAllTriggered may fail spuriously with "Some kill points + // was not triggered". This is best-effort: the authoritative assertion remains + // checkKillPointsAllTriggered, which still fails the test if a kill point genuinely never + // triggers (e.g. the badKillPoint test). + graceWaitForKillPointsTriggered(configNodeKeywords); + graceWaitForKillPointsTriggered(dataNodeKeywords); + // make sure all kill points have been triggered checkKillPointsAllTriggered(configNodeKeywords); checkKillPointsAllTriggered(dataNodeKeywords); @@ -520,6 +531,26 @@ private static void awaitKillPointsTriggered(KeySetView killPoi Awaitility.await().atMost(2, TimeUnit.MINUTES).until(killPoints::isEmpty); } + /** + * Best-effort wait for all kill points to be triggered. The kill point is detected by a + * background thread tailing the node log, so there can be a short lag between the migration + * result becoming visible and that thread processing the kill-point line. This gives it a brief + * grace period to catch up. Unlike {@link #awaitKillPointsTriggered}, it never throws: the + * authoritative check is {@link #checkKillPointsAllTriggered}, so a kill point that genuinely + * never triggers (e.g. the badKillPoint test) is still caught there as an AssertionError rather + * than masked here. + */ + private static void graceWaitForKillPointsTriggered(KeySetView killPoints) { + if (killPoints.isEmpty()) { + return; + } + try { + Awaitility.await().atMost(1, TimeUnit.MINUTES).until(killPoints::isEmpty); + } catch (ConditionTimeoutException ignored) { + // Fall through to checkKillPointsAllTriggered, which makes the real assertion. + } + } + private static String buildRegionMigrateCommand(int who, int from, int to) { String result = String.format(REGION_MIGRATE_COMMAND_FORMAT, who, from, to); LOGGER.info(result); diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/datanodecrash/iotv2/batch/IoTDBRegionMigrateDataNodeCrashForIoTV2BatchIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/datanodecrash/iotv2/batch/IoTDBRegionMigrateDataNodeCrashForIoTV2BatchIT.java index ad9738144981e..ea9a40e4699e5 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/datanodecrash/iotv2/batch/IoTDBRegionMigrateDataNodeCrashForIoTV2BatchIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/datanodecrash/iotv2/batch/IoTDBRegionMigrateDataNodeCrashForIoTV2BatchIT.java @@ -25,11 +25,10 @@ import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.DailyIT; +import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; -// TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein - @Category({DailyIT.class}) @RunWith(IoTDBTestRunner.class) public class IoTDBRegionMigrateDataNodeCrashForIoTV2BatchIT @@ -41,7 +40,7 @@ public class IoTDBRegionMigrateDataNodeCrashForIoTV2BatchIT private final int configNodeNum = 1; private final int dataNodeNum = 3; - // @Test + @Test public void coordinatorCrashDuringAddPeerTransition() throws Exception { failTest( 2, @@ -53,7 +52,7 @@ public void coordinatorCrashDuringAddPeerTransition() throws Exception { KillNode.COORDINATOR_DATANODE); } - // @Test + @Test public void coordinatorCrashDuringAddPeerDone() throws Exception { failTest( 2, @@ -69,9 +68,13 @@ public void coordinatorCrashDuringAddPeerDone() throws Exception { // region Original DataNode crash tests - // @Test + @Test public void originalCrashDuringAddPeerDone() throws Exception { - failTest( + // Once the add-peer phase is done, the new peer already holds the data, so the migration is + // designed to tolerate the original (source) DataNode crashing afterwards: it completes + // successfully and merely leaves the region files on the dead node to be cleaned up later. + // Hence this is a successTest, not a failTest. + successTest( 2, 2, 1, @@ -85,7 +88,7 @@ public void originalCrashDuringAddPeerDone() throws Exception { // region Destination DataNode crash tests - // @Test + @Test public void destinationCrashDuringCreateLocalPeer() throws Exception { failTest( 2, @@ -97,7 +100,7 @@ public void destinationCrashDuringCreateLocalPeer() throws Exception { KillNode.DESTINATION_DATANODE); } - // @Test + @Test public void destinationCrashDuringAddPeerDone() throws Exception { failTest( 2, diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/datanodecrash/iotv2/stream/IoTDBRegionMigrateDataNodeCrashForIoTV2StreamIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/datanodecrash/iotv2/stream/IoTDBRegionMigrateDataNodeCrashForIoTV2StreamIT.java index eeca6dacc1998..a8aa62f81b0ca 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/datanodecrash/iotv2/stream/IoTDBRegionMigrateDataNodeCrashForIoTV2StreamIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/datanodecrash/iotv2/stream/IoTDBRegionMigrateDataNodeCrashForIoTV2StreamIT.java @@ -28,11 +28,10 @@ import org.apache.iotdb.itbase.category.DailyIT; import org.junit.Before; +import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; -// TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein - @Category({DailyIT.class}) @RunWith(IoTDBTestRunner.class) public class IoTDBRegionMigrateDataNodeCrashForIoTV2StreamIT @@ -54,7 +53,7 @@ public void setUp() throws Exception { .setIoTConsensusV2Mode(ConsensusFactory.IOT_CONSENSUS_V2_STREAM_MODE); } - // @Test + @Test public void coordinatorCrashDuringAddPeerTransition() throws Exception { failTest( 2, @@ -66,7 +65,7 @@ public void coordinatorCrashDuringAddPeerTransition() throws Exception { KillNode.COORDINATOR_DATANODE); } - // @Test + @Test public void coordinatorCrashDuringAddPeerDone() throws Exception { failTest( 2, @@ -82,9 +81,13 @@ public void coordinatorCrashDuringAddPeerDone() throws Exception { // region Original DataNode crash tests - // @Test + @Test public void originalCrashDuringAddPeerDone() throws Exception { - failTest( + // Once the add-peer phase is done, the new peer already holds the data, so the migration is + // designed to tolerate the original (source) DataNode crashing afterwards: it completes + // successfully and merely leaves the region files on the dead node to be cleaned up later. + // Hence this is a successTest, not a failTest. + successTest( 2, 2, 1, @@ -98,7 +101,7 @@ public void originalCrashDuringAddPeerDone() throws Exception { // region Destination DataNode crash tests - // @Test + @Test public void destinationCrashDuringCreateLocalPeer() throws Exception { failTest( 2, @@ -110,7 +113,7 @@ public void destinationCrashDuringCreateLocalPeer() throws Exception { KillNode.DESTINATION_DATANODE); } - // @Test + @Test public void destinationCrashDuringAddPeerDone() throws Exception { failTest( 2, diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/batch/IoTDBRegionMigrateClusterCrashIoTV2BatchIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/batch/IoTDBRegionMigrateClusterCrashIoTV2BatchIT.java index a276acc4d0123..6a74d99c09672 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/batch/IoTDBRegionMigrateClusterCrashIoTV2BatchIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/batch/IoTDBRegionMigrateClusterCrashIoTV2BatchIT.java @@ -39,8 +39,7 @@ public void clusterCrash1() throws Exception { killClusterTest(buildSet(AddRegionPeerState.CREATE_NEW_REGION_PEER), true); } - // TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein - // @Test + @Test public void clusterCrash2() throws Exception { killClusterTest(buildSet(AddRegionPeerState.DO_ADD_REGION_PEER), false); } @@ -60,8 +59,7 @@ public void clusterCrash6() throws Exception { killClusterTest(buildSet(RemoveRegionPeerState.REMOVE_REGION_PEER), true); } - // TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein - // @Test + @Test public void clusterCrash7() throws Exception { killClusterTest(buildSet(RemoveRegionPeerState.DELETE_OLD_REGION_PEER), true); } diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/batch/IoTDBRegionMigrateConfigNodeCrashIoTV2BatchIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/batch/IoTDBRegionMigrateConfigNodeCrashIoTV2BatchIT.java index bc4f477b6bd84..853d834966614 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/batch/IoTDBRegionMigrateConfigNodeCrashIoTV2BatchIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/batch/IoTDBRegionMigrateConfigNodeCrashIoTV2BatchIT.java @@ -66,8 +66,7 @@ public void cnCrashDuringCreatePeerTest() throws Exception { KillNode.CONFIG_NODE); } - // TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein - // @Test + @Test public void testCnCrashDuringDoAddPeer() throws Exception { successTest( 1, @@ -127,8 +126,7 @@ public void cnCrashDuringDeleteOldRegionPeerTest() throws Exception { KillNode.CONFIG_NODE); } - // TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein - // @Test + @Test public void cnCrashDuringRemoveRegionLocationCacheTest() throws Exception { successTest( 1, @@ -140,8 +138,7 @@ public void cnCrashDuringRemoveRegionLocationCacheTest() throws Exception { KillNode.CONFIG_NODE); } - // TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein - // @Test + @Test public void cnCrashTest() throws Exception { ConcurrentHashMap.KeySetView killConfigNodeKeywords = noKillPoints(); killConfigNodeKeywords.addAll( diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/stream/IoTDBRegionMigrateClusterCrashIoTV2StreamIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/stream/IoTDBRegionMigrateClusterCrashIoTV2StreamIT.java index 384f5e61dd76e..5f0f2fe3cee4f 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/stream/IoTDBRegionMigrateClusterCrashIoTV2StreamIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/stream/IoTDBRegionMigrateClusterCrashIoTV2StreamIT.java @@ -47,14 +47,12 @@ public void setUp() throws Exception { .setIoTConsensusV2Mode(ConsensusFactory.IOT_CONSENSUS_V2_STREAM_MODE); } - // TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein - // @Test + @Test public void clusterCrash1() throws Exception { killClusterTest(buildSet(AddRegionPeerState.CREATE_NEW_REGION_PEER), true); } - // TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein - // @Test + @Test public void clusterCrash2() throws Exception { killClusterTest(buildSet(AddRegionPeerState.DO_ADD_REGION_PEER), false); } @@ -74,8 +72,7 @@ public void clusterCrash6() throws Exception { killClusterTest(buildSet(RemoveRegionPeerState.REMOVE_REGION_PEER), true); } - // TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein - // @Test + @Test public void clusterCrash7() throws Exception { killClusterTest(buildSet(RemoveRegionPeerState.DELETE_OLD_REGION_PEER), true); } diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/stream/IoTDBRegionMigrateConfigNodeCrashIoTV2StreamIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/stream/IoTDBRegionMigrateConfigNodeCrashIoTV2StreamIT.java index 39b5953de4a15..f29482811f0a0 100644 --- a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/stream/IoTDBRegionMigrateConfigNodeCrashIoTV2StreamIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/pass/daily/iotv2/stream/IoTDBRegionMigrateConfigNodeCrashIoTV2StreamIT.java @@ -80,8 +80,7 @@ public void cnCrashDuringCreatePeerTest() throws Exception { KillNode.CONFIG_NODE); } - // TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein - // @Test + @Test public void testCnCrashDuringDoAddPeer() throws Exception { successTest( 1, @@ -141,8 +140,7 @@ public void cnCrashDuringDeleteOldRegionPeerTest() throws Exception { KillNode.CONFIG_NODE); } - // TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein - // @Test + @Test public void cnCrashDuringRemoveRegionLocationCacheTest() throws Exception { successTest( 1, @@ -154,8 +152,7 @@ public void cnCrashDuringRemoveRegionLocationCacheTest() throws Exception { KillNode.CONFIG_NODE); } - // TODO: @Yongzao Dan, reopen this CI after discussion with @HxpSerein - // @Test + @Test public void cnCrashTest() throws Exception { ConcurrentHashMap.KeySetView killConfigNodeKeywords = noKillPoints(); killConfigNodeKeywords.addAll( diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java index 74391b99bb3c4..42d91eb74092b 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java @@ -25,11 +25,13 @@ import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.DailyIT; +import org.apache.iotdb.itbase.exception.InconsistentDataException; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.session.Session; +import org.awaitility.Awaitility; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -64,22 +66,42 @@ public void testRepeatedlyRestartWholeClusterWithWrite() throws Exception { testRepeatedlyRestartWholeCluster( (s, i, env) -> { if (i != 0) { - ResultSet resultSet = s.executeQuery("SELECT last s1 FROM root.**"); - ResultSetMetaData metaData = resultSet.getMetaData(); - assertEquals(4, metaData.getColumnCount()); - int cnt = 0; - while (resultSet.next()) { - cnt++; - StringBuilder result = new StringBuilder(); - for (int j = 0; j < metaData.getColumnCount(); j++) { - result - .append(metaData.getColumnName(j + 1)) - .append(":") - .append(resultSet.getString(j + 1)) - .append(","); - } - System.out.println(result); - } + // This query is fanned out to every DataNode and the results are compared across + // replicas. Right after a restart the last cache on each coordinator is reloaded + // lazily, so the cross-replica comparison may transiently observe an inconsistent + // result until the cluster converges. ORDER BY TIMESERIES makes the row order + // deterministic across coordinators (the root cause of the observed flakiness), and the + // retry tolerates the brief convergence window (e.g. a replica that has not finished + // recovering yet) without masking a genuine, persistent inconsistency. + // + // ignoreExceptionsMatching(InconsistentDataException) is required: a mismatch surfaces + // as InconsistentDataException (a RuntimeException) thrown from getString(), and + // untilAsserted() only retries on AssertionError by default, so without it the retry + // would not actually cover this failure. We match only InconsistentDataException so a + // genuine error (e.g. a real SQLException) still fails fast instead of being retried. + Awaitility.await() + .atMost(60, TimeUnit.SECONDS) + .pollInterval(2, TimeUnit.SECONDS) + .ignoreExceptionsMatching(e -> e instanceof InconsistentDataException) + .untilAsserted( + () -> { + try (ResultSet resultSet = + s.executeQuery("SELECT last s1 FROM root.** ORDER BY TIMESERIES ASC")) { + ResultSetMetaData metaData = resultSet.getMetaData(); + assertEquals(4, metaData.getColumnCount()); + while (resultSet.next()) { + StringBuilder result = new StringBuilder(); + for (int j = 0; j < metaData.getColumnCount(); j++) { + result + .append(metaData.getColumnName(j + 1)) + .append(":") + .append(resultSet.getString(j + 1)) + .append(","); + } + System.out.println(result); + } + } + }); } s.execute("INSERT INTO root.db1.d1 (time, s1) VALUES (1, 1)"); s.execute("INSERT INTO root.db2.d1 (time, s1) VALUES (1, 1)"); diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java index ec04aab39bd7b..e07e6e4ebc138 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/iotconsensusv2/IoTDBIoTConsensusV23C3DBasicITBase.java @@ -342,11 +342,12 @@ public void testDeleteTimeSeriesReplicaConsistency() throws Exception { // Restart the stopped node before moving to the next iteration LOGGER.info("Restarting {}", stoppedDesc); stoppedNode.start(); - // Wait for the restarted node to rejoin - Awaitility.await() - .atMost(120, TimeUnit.SECONDS) - .pollInterval(2, TimeUnit.SECONDS) - .until(stoppedNode::isAlive); + // Wait for the restarted node to actually be able to serve queries again, not just for + // its process to be up. The next loop iteration will treat this node as a surviving node + // and connect to it, so if we only waited for isAlive() (process started) the node might + // still be in startup (RPC port not yet open / not registered), causing a spurious + // "Connection refused" failure. + waitUntilDataNodeQueryable(stoppedNode, stoppedDesc); } } @@ -356,6 +357,43 @@ public void testDeleteTimeSeriesReplicaConsistency() throws Exception { } } + /** + * Wait until the given DataNode can actually serve queries again after a restart. A node's + * process being alive ({@link DataNodeWrapper#isAlive()}) does not mean its client RPC service is + * open and it has rejoined the cluster, so we poll a real connection plus a trivial query until + * it succeeds. + */ + private void waitUntilDataNodeQueryable(DataNodeWrapper node, String nodeDesc) { + Awaitility.await() + .atMost(120, TimeUnit.SECONDS) + .pollDelay(1, TimeUnit.SECONDS) + .pollInterval(2, TimeUnit.SECONDS) + .until( + () -> { + if (!node.isAlive()) { + return false; + } + try (Connection conn = + EnvFactory.getEnv() + .getConnection( + node, + SessionConfig.DEFAULT_USER, + SessionConfig.DEFAULT_PASSWORD, + BaseEnv.TREE_SQL_DIALECT); + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery(SHOW_TIMESERIES_D1)) { + // Drain the result set to make sure the query fully executes. + while (rs.next()) { + // no-op + } + return true; + } catch (Exception e) { + LOGGER.info("{} not queryable yet, retrying: {}", nodeDesc, e.getMessage()); + return false; + } + }); + } + /** * Verify that after deleting root.sg.d1.speed, only temperature and power timeseries remain, and * that data queries do not return the deleted timeseries.