From f74b1e67cbf4d7645316f60bbd359df83f484d8c Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Fri, 12 Jun 2026 19:16:39 +0800 Subject: [PATCH] Exclude DataNodes being removed from new Region allocation When a `remove datanode` is in progress, the ConfigNode could still allocate brand-new Region replicas onto the DataNode being removed. This was especially likely when the target DataNode had been killed (e.g. `kill -9`) before removal: the failure detector reports such a node as `Unknown` rather than `Removing`, and `RegionBalancer` intentionally keeps `Unknown` DataNodes as allocation candidates (to cope with insufficient online nodes). The new replica could never be created on the dead node (Connection refused), yet the metadata kept the assignment and retried forever, so the removal hung and the target DataNode never disappeared from `show datanodes`. A node-status filter alone cannot fix this, because the killed node is `Unknown`, not `Removing`. Instead, `RegionBalancer` now consults the in-progress `RemoveDataNodesProcedure` (the authoritative, leader-switch durable source of which DataNodes are being removed) via the new `ProcedureManager.getRemovingDataNodeIds()` and drops those DataNodes from the allocation candidates. This mirrors the existing filtering in `RemoveDataNodeHandler.selectedRegionMigrationPlans`. Add IoTDBRemoveDataNodeRegionAllocationIT: it kills a DataNode, submits the removal, and while it is in progress forces a fresh Region allocation via a new database, asserting that none of the newly allocated Regions land on the DataNode being removed and that the removal completes. --- ...IoTDBRemoveDataNodeRegionAllocationIT.java | 332 ++++++++++++++++++ .../confignode/manager/ProcedureManager.java | 23 ++ .../manager/load/balancer/RegionBalancer.java | 24 +- 3 files changed, 376 insertions(+), 3 deletions(-) create mode 100644 integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeRegionAllocationIT.java diff --git a/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeRegionAllocationIT.java b/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeRegionAllocationIT.java new file mode 100644 index 000000000000..a2e6e5f038e6 --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/confignode/it/removedatanode/IoTDBRemoveDataNodeRegionAllocationIT.java @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.it.removedatanode; + +import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; +import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; +import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils; +import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.ClusterIT; +import org.apache.iotdb.itbase.exception.InconsistentDataException; + +import org.awaitility.Awaitility; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getAllRegionMap; +import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getDataRegionMap; +import static org.apache.iotdb.confignode.it.removedatanode.IoTDBRemoveDataNodeUtils.awaitUntilSuccess; +import static org.apache.iotdb.confignode.it.removedatanode.IoTDBRemoveDataNodeUtils.generateRemoveString; +import static org.apache.iotdb.confignode.it.removedatanode.IoTDBRemoveDataNodeUtils.stopDataNodes; +import static org.apache.iotdb.util.MagicUtils.makeItCloseQuietly; + +/** + * Regression test for the bug where, after a {@code remove datanode} has been submitted, the + * ConfigNode still allocated brand-new Region replicas onto the DataNode that was being removed + * (typically a node that had been {@code kill -9}'d and was therefore reported as {@code Unknown}). + * The stranded replica could never be created on the dead node, so the removal hung forever and the + * target DataNode never disappeared from {@code show datanodes}. + * + *

The test kills one DataNode, submits the removal, and — while the removal is still in progress + * — forces a fresh Region allocation by creating a new database and writing to it. It then asserts + * that none of the newly allocated Regions were placed on the DataNode being removed, and + * that the removal eventually completes. + * + *

Note: we must compare against a snapshot of the pre-existing Region ids rather than asserting + * "no Region anywhere references the removing DataNode". The removing node legitimately keeps + * hosting its own pre-existing Regions until each one finishes migrating away (the new replica is + * added first and the old one is dropped last), so those Regions still list the removing node + * during the window. Only freshly created Region groups are expected to exclude it. + */ +@Category({ClusterIT.class}) +@RunWith(IoTDBTestRunner.class) +public class IoTDBRemoveDataNodeRegionAllocationIT { + + private static final Logger LOGGER = + LoggerFactory.getLogger(IoTDBRemoveDataNodeRegionAllocationIT.class); + + private static final String SHOW_DATANODES = "show datanodes"; + + private static final String DEFAULT_SCHEMA_REGION_GROUP_EXTENSION_POLICY = "CUSTOM"; + private static final String DEFAULT_DATA_REGION_GROUP_EXTENSION_POLICY = "CUSTOM"; + + @Before + public void setUp() { + EnvFactory.getEnv() + .getConfig() + .getCommonConfig() + .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setSchemaRegionGroupExtensionPolicy(DEFAULT_SCHEMA_REGION_GROUP_EXTENSION_POLICY) + .setDataRegionGroupExtensionPolicy(DEFAULT_DATA_REGION_GROUP_EXTENSION_POLICY); + } + + @After + public void tearDown() throws InterruptedException { + EnvFactory.getEnv().cleanClusterEnvironment(); + } + + @Test + public void newRegionMustNotBeAllocatedOnRemovingDataNodeTest() throws Exception { + final int configNodeNum = 1; + final int dataNodeNum = 4; + final int dataReplicationFactor = 2; + final int schemaReplicationFactor = 2; + // Place a few DataRegions per DataNode so the node being removed actually owns regions that + // have + // to be migrated, which keeps the RemoveDataNodesProcedure in progress long enough for us to + // race a new allocation against it. + final int dataRegionPerDataNode = 2; + + EnvFactory.getEnv() + .getConfig() + .getCommonConfig() + .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS) + .setSchemaReplicationFactor(schemaReplicationFactor) + .setDataReplicationFactor(dataReplicationFactor) + .setDefaultDataRegionGroupNumPerDatabase( + dataRegionPerDataNode * dataNodeNum / dataReplicationFactor); + EnvFactory.getEnv().initClusterEnvironment(configNodeNum, dataNodeNum); + + final int removeDataNodeId; + final List removeDataNodeLocations = new ArrayList<>(); + + try (final Connection connection = makeItCloseQuietly(EnvFactory.getEnv().getConnection()); + final Statement statement = makeItCloseQuietly(connection.createStatement()); + final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { + + // Seed the cluster with data so that DataRegions are spread across all DataNodes. + ConfigNodeTestUtils.insertTreeModelData(statement); + + final Map> dataRegionMap = getDataRegionMap(statement); + Assert.assertFalse("Expected some DataRegions to exist", dataRegionMap.isEmpty()); + + // Pick a DataNode that currently hosts at least one DataRegion as the removal target. + removeDataNodeId = + dataRegionMap.values().stream() + .flatMap(Set::stream) + .findAny() + .orElseThrow(() -> new AssertionError("No DataNode hosts a DataRegion")); + LOGGER.info("Selected DataNode {} to remove.", removeDataNodeId); + + removeDataNodeLocations.addAll( + client.getDataNodeConfiguration(-1).getDataNodeConfigurationMap().values().stream() + .map(TDataNodeConfiguration::getLocation) + .filter(location -> location.getDataNodeId() == removeDataNodeId) + .collect(Collectors.toList())); + Assert.assertEquals(1, removeDataNodeLocations.size()); + + // kill -9 the target DataNode so that it becomes Unknown (this is the exact condition under + // which the failure detector overrides the Removing status back to Unknown). + final List removeDataNodeWrappers = + List.of(EnvFactory.getEnv().dataNodeIdToWrapper(removeDataNodeId).get()); + stopDataNodes(removeDataNodeWrappers); + LOGGER.info("DataNode {} is stopped.", removeDataNodeId); + } catch (InconsistentDataException e) { + LOGGER.error("Unexpected error during setup:", e); + throw e; + } + + // Re-establish a connection after the DataNode was killed. + try (final Connection connection = makeItCloseQuietly(EnvFactory.getEnv().getConnection()); + final Statement statement = makeItCloseQuietly(connection.createStatement()); + final SyncConfigNodeIServiceClient client = + (SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) { + final AtomicReference clientRef = new AtomicReference<>(client); + + // Wait until the killed DataNode is reported Unknown, then submit the removal. + awaitDataNodeStatus(statement, removeDataNodeId, "Unknown"); + + // Snapshot the Region ids that already exist; region migration only moves replicas of these + // existing groups (it never mints new Region ids), so any Region id appearing after this + // point + // belongs to the allocation we are about to force. + final Set preExistingRegionIds = new HashSet<>(getAllRegionMap(statement).keySet()); + + final String removeDataNodeSQL = generateRemoveString(Set.of(removeDataNodeId)); + LOGGER.info("Submitting: {}", removeDataNodeSQL); + statement.execute(removeDataNodeSQL); + LOGGER.info("Remove DataNode {} submitted.", removeDataNodeId); + + // The removal is asynchronous: the SQL returns once the procedure is submitted, while the + // actual region migration off the (dead) node keeps it in progress. Confirm it is in progress + // before we force a new allocation against it. + Assert.assertTrue( + "Removal completed before we could force a new allocation; cannot exercise the bug", + isRemovalInProgress(clientRef, removeDataNodeLocations)); + + // While the removal is in progress, force a fresh Region allocation by creating a new + // database + // and writing to it. Before the fix, the allocator could still choose the removing (Unknown) + // DataNode as a replica holder for these new regions. + try (final Connection probeConnection = + makeItCloseQuietly(EnvFactory.getEnv().getConnection()); + final Statement probeStatement = makeItCloseQuietly(probeConnection.createStatement())) { + for (int i = 0; i < 64; i++) { + probeStatement.addBatch( + String.format( + "INSERT INTO root.alloc_probe.d%d(timestamp,speed) values(%d, %d)", i, i, i)); + } + probeStatement.executeBatch(); + LOGGER.info("Forced new Region allocation via root.alloc_probe."); + + // The core assertion: none of the newly allocated Regions may land on the removing + // DataNode. + assertNewRegionsExcludeDataNode(probeStatement, preExistingRegionIds, removeDataNodeId); + } + + // The removal must still be able to complete; the original bug left it stuck forever. + awaitUntilSuccess(clientRef, removeDataNodeLocations); + LOGGER.info("Remove DataNode {} completed.", removeDataNodeId); + + // Final guard: after the node is gone, nothing references it any more. + assertNoRegionOnDataNode(statement, removeDataNodeId); + assertDataNodeAbsent(statement, removeDataNodeId); + } catch (InconsistentDataException e) { + LOGGER.error("Unexpected error:", e); + throw e; + } + } + + private static boolean isRemovalInProgress( + final AtomicReference clientRef, + final List removeDataNodeLocations) { + try { + final List remaining = + clientRef + .get() + .getDataNodeConfiguration(-1) + .getDataNodeConfigurationMap() + .values() + .stream() + .map(TDataNodeConfiguration::getLocation) + .collect(Collectors.toList()); + return removeDataNodeLocations.stream().anyMatch(remaining::contains); + } catch (Exception e) { + LOGGER.warn("Failed to query DataNode configuration", e); + return false; + } + } + + private static void awaitDataNodeStatus( + final Statement statement, final int dataNodeId, final String expectedStatus) { + Awaitility.await() + .atMost(2, TimeUnit.MINUTES) + .pollDelay(1, TimeUnit.SECONDS) + .until( + () -> { + try (final ResultSet result = statement.executeQuery(SHOW_DATANODES)) { + while (result.next()) { + if (result.getInt(ColumnHeaderConstant.NODE_ID) == dataNodeId) { + return expectedStatus.equalsIgnoreCase( + result.getString(ColumnHeaderConstant.STATUS)); + } + } + } + return false; + }); + } + + /** + * Wait until the forced allocation has produced at least one new Region (a Region id not present + * in {@code preExistingRegionIds}), then assert that none of those new Regions has a replica on + * {@code dataNodeId}. + */ + private static void assertNewRegionsExcludeDataNode( + final Statement statement, final Set preExistingRegionIds, final int dataNodeId) { + final AtomicReference>> newRegionsRef = new AtomicReference<>(); + Awaitility.await() + .atMost(1, TimeUnit.MINUTES) + .pollDelay(1, TimeUnit.SECONDS) + .until( + () -> { + final Map> newRegions = + getAllRegionMap(statement).entrySet().stream() + .filter(entry -> !preExistingRegionIds.contains(entry.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + newRegionsRef.set(newRegions); + return !newRegions.isEmpty(); + }); + + final Map> newRegions = newRegionsRef.get(); + final Set offendingRegions = + newRegions.entrySet().stream() + .filter(entry -> entry.getValue().contains(dataNodeId)) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + Assert.assertTrue( + String.format( + "Newly allocated Region(s) %s were placed on DataNode %d which is being removed; " + + "new Region map: %s", + offendingRegions, dataNodeId, newRegions), + offendingRegions.isEmpty()); + } + + private static void assertNoRegionOnDataNode(final Statement statement, final int dataNodeId) + throws Exception { + final Map> allRegionMap = getAllRegionMap(statement); + final Set offendingRegions = + allRegionMap.entrySet().stream() + .filter(entry -> entry.getValue().contains(dataNodeId)) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + Assert.assertTrue( + String.format( + "Region(s) %s still reference removed DataNode %d; full map: %s", + offendingRegions, dataNodeId, allRegionMap), + offendingRegions.isEmpty()); + } + + private static void assertDataNodeAbsent(final Statement statement, final int dataNodeId) + throws Exception { + try (final ResultSet result = statement.executeQuery(SHOW_DATANODES)) { + while (result.next()) { + Assert.assertNotEquals( + "DataNode " + dataNodeId + " should have been removed", + dataNodeId, + result.getInt(ColumnHeaderConstant.NODE_ID)); + } + } + } +} diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java index 22dd46317819..2beeea6b4af4 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java @@ -786,6 +786,29 @@ public TSStatus checkRemoveDataNodes(List dataNodeLocations) return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } + /** + * Collect the ids of all DataNodes that an in-progress {@link RemoveDataNodesProcedure} is + * removing. + * + *

A DataNode being removed must not receive any newly allocated Region replica: doing so would + * leave a replica stranded on a node that is about to disappear, blocking the removal from ever + * completing. We cannot rely on the node's {@link NodeStatus} here, because a DataNode that was + * killed (e.g. {@code kill -9}) before removal is reported as {@link NodeStatus#Unknown} by the + * failure detector rather than {@link NodeStatus#Removing}, so a status filter alone would still + * let it through. The authoritative source is therefore the running procedure itself, which holds + * the removing DataNode list and survives leader switches via procedure persistence. + * + * @return the set of DataNode ids currently being removed (empty if no removal is in progress) + */ + public Set getRemovingDataNodeIds() { + return getExecutor().getProcedures().values().stream() + .filter(procedure -> procedure instanceof RemoveDataNodesProcedure) + .filter(procedure -> !procedure.isFinished()) + .flatMap(procedure -> ((RemoveDataNodesProcedure) procedure).getRemovedDataNodes().stream()) + .map(TDataNodeLocation::getDataNodeId) + .collect(Collectors.toSet()); + } + // region region operation related check /** diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java index 1075fc1573da..73583151f981 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RegionBalancer.java @@ -29,6 +29,7 @@ import org.apache.iotdb.confignode.exception.DatabaseNotExistsException; import org.apache.iotdb.confignode.exception.NotEnoughDataNodeException; import org.apache.iotdb.confignode.manager.IManager; +import org.apache.iotdb.confignode.manager.ProcedureManager; import org.apache.iotdb.confignode.manager.load.LoadManager; import org.apache.iotdb.confignode.manager.load.balancer.region.GreedyCopySetRegionGroupAllocator; import org.apache.iotdb.confignode.manager.load.balancer.region.GreedyRegionGroupAllocator; @@ -41,6 +42,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; /** * The {@link RegionBalancer} provides interfaces to generate optimal Region allocation and @@ -80,10 +83,21 @@ public CreateRegionGroupsPlan genRegionGroupsAllocationPlan( final Map allotmentMap, final TConsensusGroupType consensusGroupType) throws NotEnoughDataNodeException, DatabaseNotExistsException { - // Some new RegionGroups will have to occupy unknown DataNodes - // if the number of online DataNodes is insufficient + // Some new RegionGroups will have to occupy unknown DataNodes if the number of online + // DataNodes is insufficient (Unknown DataNodes are intentionally kept as candidates). + // However, DataNodes that an in-progress RemoveDataNodesProcedure is removing must be + // excluded: placing a new replica on a node that is about to disappear would strand that + // replica and stall the removal forever. A status filter is not enough here, because a + // DataNode killed (e.g. kill -9) before removal is reported as Unknown (not Removing) by the + // failure detector, so we additionally drop every DataNode that is currently being removed. + final Set removingDataNodeIds = getProcedureManager().getRemovingDataNodeIds(); final List availableDataNodes = - getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running, NodeStatus.Unknown); + getNodeManager() + .filterDataNodeThroughStatus(NodeStatus.Running, NodeStatus.Unknown) + .stream() + .filter( + dataNode -> !removingDataNodeIds.contains(dataNode.getLocation().getDataNodeId())) + .collect(Collectors.toList()); // Make sure the number of available DataNodes is enough for allocating new RegionGroups for (final String database : allotmentMap.keySet()) { @@ -157,6 +171,10 @@ private LoadManager getLoadManager() { return configManager.getLoadManager(); } + private ProcedureManager getProcedureManager() { + return configManager.getProcedureManager(); + } + public enum RegionGroupAllocatePolicy { GREEDY, GCR,