Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,27 @@ public class IoTDBRegionOperationReliabilityITFramework {
LOGGER.info("Cluster has been restarted");
};

/**
* Gracefully stop (SIGTERM, not a forcible kill) the ConfigNode that hit the kill point, then
* restart it. A graceful stop lets the ConfigNode run its shutdown hooks, which interrupts the
* in-flight region-operation procedure worker. This reproduces a leader switch / graceful
* shutdown during AddRegionPeer: the interrupted {@code waitTaskFinish()} returns PROCESSING
* while the AddRegionPeerTask is still running on the coordinator DataNode. The procedure must
* NOT silently end here, otherwise the parent RegionMigrateProcedure would falsely treat AddPeer
* as complete and remove the source replica before the destination replica is actually Running.
* See AddRegionPeerProcedure#executeFromState DO_ADD_REGION_PEER PROCESSING branch.
*/
public static Consumer<KillPointContext> actionOfGracefullyRestartConfigNode =
context -> {
Assert.assertTrue(context.getNodeWrapper() instanceof ConfigNodeWrapper);
context.getNodeWrapper().stop();
LOGGER.info("ConfigNode {} gracefully stopped.", context.getNodeWrapper().getId());
Assert.assertFalse(context.getNodeWrapper().isAlive());
context.getNodeWrapper().start();
LOGGER.info("ConfigNode {} restarted.", context.getNodeWrapper().getId());
Assert.assertTrue(context.getNodeWrapper().isAlive());
};

@Before
public void setUp() throws Exception {
EnvFactory.getEnv()
Expand Down Expand Up @@ -155,6 +176,28 @@ public void successTest(
killNode);
}

public void successTestWithAction(
final int dataReplicateFactor,
final int schemaReplicationFactor,
final int configNodeNum,
final int dataNodeNum,
KeySetView<String, Boolean> killConfigNodeKeywords,
KeySetView<String, Boolean> killDataNodeKeywords,
Consumer<KillPointContext> actionWhenDetectKeyWords,
KillNode killNode)
throws Exception {
generalTestWithAllOptions(
dataReplicateFactor,
schemaReplicationFactor,
configNodeNum,
dataNodeNum,
killConfigNodeKeywords,
killDataNodeKeywords,
actionWhenDetectKeyWords,
true,
killNode);
}

public void failTest(
final int dataReplicateFactor,
final int schemaReplicationFactor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@

import org.apache.iotdb.commons.utils.KillPoint.KillNode;
import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
import org.apache.iotdb.commons.utils.KillPoint.RegionMaintainKillPoints;
import org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework;
import org.apache.iotdb.confignode.procedure.state.AddRegionPeerState;
import org.apache.iotdb.confignode.procedure.state.RegionTransitionState;
import org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.itbase.category.DailyIT;

import org.junit.Before;
Expand Down Expand Up @@ -92,6 +94,34 @@ public void testCnCrashDuringDoAddPeer() throws Exception {
KillNode.CONFIG_NODE);
}

/**
* Gracefully restart (not forcibly kill) the ConfigNode leader while AddRegionPeer is blocked in
* waitTaskFinish() polling the coordinator. The kill point fires from inside waitTaskFinish()
* (after the first poll confirms the task is still running), so the graceful shutdown
* deterministically interrupts that wait and waitTaskFinish() returns PROCESSING. The migration
* must still finish correctly after a leader switch: previously the AddRegionPeerProcedure
* silently ended on PROCESSING, letting the parent procedure remove the source replica before the
* destination replica was actually Running. Uses 3 ConfigNodes so a real leader switch happens.
* The framework requires the WAIT_TASK_FINISH_POLLING kill point to fire, which can only happen
* once the worker is blocked inside waitTaskFinish(), so the PROCESSING branch is guaranteed to
* be exercised.
*/
@Test
// Temporarily also categorized as ClusterIT so the per-PR Cluster IT (1C3D) job runs it for
// validation; will be narrowed back to DailyIT-only before merge.
@Category({DailyIT.class, ClusterIT.class})
public void cnLeaderSwitchDuringDoAddPeerTest() throws Exception {
successTestWithAction(
1,
1,
3,
2,
buildSet(RegionMaintainKillPoints.WAIT_TASK_FINISH_POLLING),
noKillPoints(),
actionOfGracefullyRestartConfigNode,
KillNode.CONFIG_NODE);
}

@Test
public void cnCrashDuringUpdateCacheTest() throws Exception {
successTest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@

import org.apache.iotdb.commons.utils.KillPoint.KillNode;
import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
import org.apache.iotdb.commons.utils.KillPoint.RegionMaintainKillPoints;
import org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework;
import org.apache.iotdb.confignode.procedure.state.AddRegionPeerState;
import org.apache.iotdb.confignode.procedure.state.RegionTransitionState;
import org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.itbase.category.DailyIT;

import org.junit.Ignore;
Expand Down Expand Up @@ -79,6 +81,34 @@ public void testCnCrashDuringDoAddPeer() throws Exception {
KillNode.CONFIG_NODE);
}

/**
* Gracefully restart (not forcibly kill) the ConfigNode leader while AddRegionPeer is blocked in
* waitTaskFinish() polling the coordinator. The kill point fires from inside waitTaskFinish()
* (after the first poll confirms the task is still running), so the graceful shutdown
* deterministically interrupts that wait and waitTaskFinish() returns PROCESSING. The migration
* must still finish correctly after a leader switch: previously the AddRegionPeerProcedure
* silently ended on PROCESSING, letting the parent procedure remove the source replica before the
* destination replica was actually Running. Uses 3 ConfigNodes so a real leader switch happens.
* The framework requires the WAIT_TASK_FINISH_POLLING kill point to fire, which can only happen
* once the worker is blocked inside waitTaskFinish(), so the PROCESSING branch is guaranteed to
* be exercised.
*/
@Test
// Temporarily also categorized as ClusterIT so the per-PR Cluster IT (1C3D) job runs it for
// validation; will be narrowed back to DailyIT-only before merge.
@Category({DailyIT.class, ClusterIT.class})
public void cnLeaderSwitchDuringDoAddPeerTest() throws Exception {
successTestWithAction(
1,
1,
3,
2,
buildSet(RegionMaintainKillPoints.WAIT_TASK_FINISH_POLLING),
noKillPoints(),
actionOfGracefullyRestartConfigNode,
KillNode.CONFIG_NODE);
}

@Test
public void cnCrashDuringUpdateCacheTest() throws Exception {
successTest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@

import org.apache.iotdb.commons.utils.KillPoint.KillNode;
import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
import org.apache.iotdb.commons.utils.KillPoint.RegionMaintainKillPoints;
import org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework;
import org.apache.iotdb.confignode.procedure.state.AddRegionPeerState;
import org.apache.iotdb.confignode.procedure.state.RegionTransitionState;
import org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.itbase.category.DailyIT;

import org.junit.Before;
Expand Down Expand Up @@ -93,6 +95,34 @@ public void testCnCrashDuringDoAddPeer() throws Exception {
KillNode.CONFIG_NODE);
}

/**
* Gracefully restart (not forcibly kill) the ConfigNode leader while AddRegionPeer is blocked in
* waitTaskFinish() polling the coordinator. The kill point fires from inside waitTaskFinish()
* (after the first poll confirms the task is still running), so the graceful shutdown
* deterministically interrupts that wait and waitTaskFinish() returns PROCESSING. The migration
* must still finish correctly after a leader switch: previously the AddRegionPeerProcedure
* silently ended on PROCESSING, letting the parent procedure remove the source replica before the
* destination replica was actually Running. Uses 3 ConfigNodes so a real leader switch happens.
* The framework requires the WAIT_TASK_FINISH_POLLING kill point to fire, which can only happen
* once the worker is blocked inside waitTaskFinish(), so the PROCESSING branch is guaranteed to
* be exercised.
*/
@Test
// Temporarily also categorized as ClusterIT so the per-PR Cluster IT (1C3D) job runs it for
// validation; will be narrowed back to DailyIT-only before merge.
@Category({DailyIT.class, ClusterIT.class})
public void cnLeaderSwitchDuringDoAddPeerTest() throws Exception {
successTestWithAction(
1,
1,
3,
2,
buildSet(RegionMaintainKillPoints.WAIT_TASK_FINISH_POLLING),
noKillPoints(),
actionOfGracefullyRestartConfigNode,
KillNode.CONFIG_NODE);
}

@Test
public void cnCrashDuringUpdateCacheTest() throws Exception {
successTest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@

import org.apache.iotdb.commons.utils.KillPoint.KillNode;
import org.apache.iotdb.commons.utils.KillPoint.KillPoint;
import org.apache.iotdb.commons.utils.KillPoint.RegionMaintainKillPoints;
import org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionMigrateITFrameworkForRatis;
import org.apache.iotdb.confignode.procedure.state.AddRegionPeerState;
import org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.itbase.category.DailyIT;

import org.junit.Test;
Expand Down Expand Up @@ -76,6 +78,34 @@ public void cnCrashDuringDoAddPeerTest() throws Exception {
KillNode.CONFIG_NODE);
}

/**
* Gracefully restart (not forcibly kill) the ConfigNode leader while AddRegionPeer is blocked in
* waitTaskFinish() polling the coordinator. The kill point fires from inside waitTaskFinish()
* (after the first poll confirms the task is still running), so the graceful shutdown
* deterministically interrupts that wait and waitTaskFinish() returns PROCESSING. The migration
* must still finish correctly after a leader switch: previously the AddRegionPeerProcedure
* silently ended on PROCESSING, letting the parent procedure remove the source replica before the
* destination replica was actually Running. Uses 3 ConfigNodes so a real leader switch happens.
* The framework requires the WAIT_TASK_FINISH_POLLING kill point to fire, which can only happen
* once the worker is blocked inside waitTaskFinish(), so the PROCESSING branch is guaranteed to
* be exercised.
*/
@Test
// Temporarily also categorized as ClusterIT so the per-PR Cluster IT (1C3D) job runs it for
// validation; will be narrowed back to DailyIT-only before merge.
@Category({DailyIT.class, ClusterIT.class})
public void cnLeaderSwitchDuringDoAddPeerTest() throws Exception {
successTestWithAction(
1,
1,
3,
2,
buildSet(RegionMaintainKillPoints.WAIT_TASK_FINISH_POLLING),
noKillPoints(),
actionOfGracefullyRestartConfigNode,
KillNode.CONFIG_NODE);
}

@Test
public void cnCrashDuringUpdateCacheTest() throws Exception {
successTest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1001,7 +1001,7 @@ public final class ProcedureMessages {
public static final String VALIDATE_TABLE_FOR_TABLE_WHEN_SETTING_PROPERTIES =
"Validate table for table {}.{} when setting properties";
public static final String WAITTASKFINISH_RETURNS_PROCESSING_WHICH_MEANS_THE_WAITING_HAS_BEEN_INTERRUPTED =
"waitTaskFinish() returns PROCESSING, which means the waiting has been interrupted, this procedure will end without rollback";
"waitTaskFinish() returns PROCESSING, which means the waiting has been interrupted (ConfigNode shutdown or leader change); the AddRegionPeer task is still running on the coordinator, this procedure will stay at DO_ADD_REGION_PEER and resume polling after recovery";

public static final String FAILED_TO_CREATE_DATABASE_THE_TTL_SHOULD_BE_NON_NEGATIVE = "Failed to create database. The TTL should be non-negative.";
public static final String FAILED_TO_CREATE_DATABASE_THE_DATAREGIONGROUPNUM_SHOULD_BE_POSITIVE = "Failed to create database. The dataRegionGroupNum should be positive.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -999,7 +999,7 @@ public final class ProcedureMessages {
public static final String VALIDATE_TABLE_FOR_TABLE_WHEN_SETTING_PROPERTIES =
"Validate table for table {}.{} when setting properties";
public static final String WAITTASKFINISH_RETURNS_PROCESSING_WHICH_MEANS_THE_WAITING_HAS_BEEN_INTERRUPTED =
"waitTaskFinish() returns PROCESSING, which means the waiting has been interrupted, this procedure will end without rollback";
"waitTaskFinish() 返回 PROCESSING,表示等待被中断(ConfigNode 关闭或主节点切换);AddRegionPeer 任务仍在协调者上运行,该流程将停留在 DO_ADD_REGION_PEER 状态,恢复后继续轮询";

public static final String FAILED_TO_CREATE_DATABASE_THE_TTL_SHOULD_BE_NON_NEGATIVE = "创建数据库失败。TTL 不能为负数。";
public static final String FAILED_TO_CREATE_DATABASE_THE_DATAREGIONGROUPNUM_SHOULD_BE_POSITIVE = "创建数据库失败。dataRegionGroupNum 应为正数。";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,15 @@ private void executeProcedure(RootProcedureStack rootProcStack, Procedure<Env> p

updateStoreOnExecution(rootProcStack, proc, subprocs);

if (!store.isRunning()) {
// Stop the in-place re-execution loop once this executor is shutting down (e.g. ConfigNode
// leader switch / restart). Checking store.isRunning() alone is not enough: stopExecutor()
// calls executor.stop() and executor.join() before store.stop(), so the store is still
// running while join() waits for this very worker to finish. Without also checking the
// executor's own running flag, a procedure that keeps returning HAS_MORE_STATE for the same
// state (e.g. AddRegionPeerProcedure parking at DO_ADD_REGION_PEER after waitTaskFinish() is
// interrupted) would re-execute forever here and join() would hang. The persisted state lets
// the next leader resume from where it stopped.
if (!isRunning() || !store.isRunning()) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_KEY;
import static org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant.EXTRACTOR_REALTIME_MODE_KEY;
import static org.apache.iotdb.commons.utils.KillPoint.KillPoint.setKillPoint;
import static org.apache.iotdb.confignode.conf.ConfigNodeConstant.REGION_MIGRATE_PROCESS;
import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS;
import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS_V2;
Expand Down Expand Up @@ -360,10 +361,26 @@ public Map<Integer, TSStatus> resetPeerList(

// TODO: will use 'procedure yield' to refactor later
public TRegionMigrateResult waitTaskFinish(long taskId, TDataNodeLocation dataNodeLocation) {
return waitTaskFinish(taskId, dataNodeLocation, null);
}

/**
* Poll the coordinator DataNode until the region-maintain task identified by {@code taskId}
* reaches a terminal state.
*
* @param killPoint if non-null, fired once right after the first poll confirms the task is still
* PROCESSING. At that point the worker thread is provably blocked inside this method, so
* tests can use the kill point to deterministically interrupt the wait (e.g. by gracefully
* stopping the ConfigNode leader) and exercise the interrupted-PROCESSING path. It is a no-op
* outside integration tests.
*/
public <T extends Enum<T>> TRegionMigrateResult waitTaskFinish(
long taskId, TDataNodeLocation dataNodeLocation, T killPoint) {
final long MAX_DISCONNECTION_TOLERATE_MS = 600_000;
final long INITIAL_DISCONNECTION_TOLERATE_MS = 60_000;
long startTime = System.nanoTime();
long lastReportTime = System.nanoTime();
boolean killPointTriggered = false;
while (true) {
try (SyncDataNodeInternalServiceClient dataNodeClient =
dataNodeClientManager.borrowClient(dataNodeLocation.getInternalEndPoint())) {
Expand All @@ -372,6 +389,12 @@ public TRegionMigrateResult waitTaskFinish(long taskId, TDataNodeLocation dataNo
if (report.getTaskStatus() != TRegionMaintainTaskStatus.PROCESSING) {
return report;
}
// The task is confirmed still running and this thread is blocked here, so it is now safe to
// fire the kill point that tests use to interrupt waitTaskFinish() deterministically.
if (killPoint != null && !killPointTriggered) {
setKillPoint(killPoint);
killPointTriggered = true;
}
} catch (Exception ignore) {

}
Expand Down
Loading
Loading