Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,332 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.confignode.it.removedatanode;

import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
import org.apache.iotdb.confignode.it.utils.ConfigNodeTestUtils;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.itbase.exception.InconsistentDataException;

import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getAllRegionMap;
import static org.apache.iotdb.confignode.it.regionmigration.IoTDBRegionOperationReliabilityITFramework.getDataRegionMap;
import static org.apache.iotdb.confignode.it.removedatanode.IoTDBRemoveDataNodeUtils.awaitUntilSuccess;
import static org.apache.iotdb.confignode.it.removedatanode.IoTDBRemoveDataNodeUtils.generateRemoveString;
import static org.apache.iotdb.confignode.it.removedatanode.IoTDBRemoveDataNodeUtils.stopDataNodes;
import static org.apache.iotdb.util.MagicUtils.makeItCloseQuietly;

/**
* Regression test for the bug where, after a {@code remove datanode} has been submitted, the
* ConfigNode still allocated brand-new Region replicas onto the DataNode that was being removed
* (typically a node that had been {@code kill -9}'d and was therefore reported as {@code Unknown}).
* The stranded replica could never be created on the dead node, so the removal hung forever and the
* target DataNode never disappeared from {@code show datanodes}.
*
* <p>The test kills one DataNode, submits the removal, and — while the removal is still in progress
* — forces a fresh Region allocation by creating a new database and writing to it. It then asserts
* that none of the <em>newly allocated</em> Regions were placed on the DataNode being removed, and
* that the removal eventually completes.
*
* <p>Note: we must compare against a snapshot of the pre-existing Region ids rather than asserting
* "no Region anywhere references the removing DataNode". The removing node legitimately keeps
* hosting its own pre-existing Regions until each one finishes migrating away (the new replica is
* added first and the old one is dropped last), so those Regions still list the removing node
* during the window. Only freshly created Region groups are expected to exclude it.
*/
@Category({ClusterIT.class})
@RunWith(IoTDBTestRunner.class)
public class IoTDBRemoveDataNodeRegionAllocationIT {

private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBRemoveDataNodeRegionAllocationIT.class);

private static final String SHOW_DATANODES = "show datanodes";

private static final String DEFAULT_SCHEMA_REGION_GROUP_EXTENSION_POLICY = "CUSTOM";
private static final String DEFAULT_DATA_REGION_GROUP_EXTENSION_POLICY = "CUSTOM";

@Before
public void setUp() {
EnvFactory.getEnv()
.getConfig()
.getCommonConfig()
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
.setSchemaRegionGroupExtensionPolicy(DEFAULT_SCHEMA_REGION_GROUP_EXTENSION_POLICY)
.setDataRegionGroupExtensionPolicy(DEFAULT_DATA_REGION_GROUP_EXTENSION_POLICY);
}

@After
public void tearDown() throws InterruptedException {
EnvFactory.getEnv().cleanClusterEnvironment();
}

@Test
public void newRegionMustNotBeAllocatedOnRemovingDataNodeTest() throws Exception {
final int configNodeNum = 1;
final int dataNodeNum = 4;
final int dataReplicationFactor = 2;
final int schemaReplicationFactor = 2;
// Place a few DataRegions per DataNode so the node being removed actually owns regions that
// have
// to be migrated, which keeps the RemoveDataNodesProcedure in progress long enough for us to
// race a new allocation against it.
final int dataRegionPerDataNode = 2;

EnvFactory.getEnv()
.getConfig()
.getCommonConfig()
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
.setSchemaReplicationFactor(schemaReplicationFactor)
.setDataReplicationFactor(dataReplicationFactor)
.setDefaultDataRegionGroupNumPerDatabase(
dataRegionPerDataNode * dataNodeNum / dataReplicationFactor);
EnvFactory.getEnv().initClusterEnvironment(configNodeNum, dataNodeNum);

final int removeDataNodeId;
final List<TDataNodeLocation> removeDataNodeLocations = new ArrayList<>();

try (final Connection connection = makeItCloseQuietly(EnvFactory.getEnv().getConnection());
final Statement statement = makeItCloseQuietly(connection.createStatement());
final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {

// Seed the cluster with data so that DataRegions are spread across all DataNodes.
ConfigNodeTestUtils.insertTreeModelData(statement);

final Map<Integer, Set<Integer>> dataRegionMap = getDataRegionMap(statement);
Assert.assertFalse("Expected some DataRegions to exist", dataRegionMap.isEmpty());

// Pick a DataNode that currently hosts at least one DataRegion as the removal target.
removeDataNodeId =
dataRegionMap.values().stream()
.flatMap(Set::stream)
.findAny()
.orElseThrow(() -> new AssertionError("No DataNode hosts a DataRegion"));
LOGGER.info("Selected DataNode {} to remove.", removeDataNodeId);

removeDataNodeLocations.addAll(
client.getDataNodeConfiguration(-1).getDataNodeConfigurationMap().values().stream()
.map(TDataNodeConfiguration::getLocation)
.filter(location -> location.getDataNodeId() == removeDataNodeId)
.collect(Collectors.toList()));
Assert.assertEquals(1, removeDataNodeLocations.size());

// kill -9 the target DataNode so that it becomes Unknown (this is the exact condition under
// which the failure detector overrides the Removing status back to Unknown).
final List<DataNodeWrapper> removeDataNodeWrappers =
List.of(EnvFactory.getEnv().dataNodeIdToWrapper(removeDataNodeId).get());
stopDataNodes(removeDataNodeWrappers);
LOGGER.info("DataNode {} is stopped.", removeDataNodeId);
} catch (InconsistentDataException e) {
LOGGER.error("Unexpected error during setup:", e);
throw e;
}

// Re-establish a connection after the DataNode was killed.
try (final Connection connection = makeItCloseQuietly(EnvFactory.getEnv().getConnection());
final Statement statement = makeItCloseQuietly(connection.createStatement());
final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
final AtomicReference<SyncConfigNodeIServiceClient> clientRef = new AtomicReference<>(client);

// Wait until the killed DataNode is reported Unknown, then submit the removal.
awaitDataNodeStatus(statement, removeDataNodeId, "Unknown");

// Snapshot the Region ids that already exist; region migration only moves replicas of these
// existing groups (it never mints new Region ids), so any Region id appearing after this
// point
// belongs to the allocation we are about to force.
final Set<Integer> preExistingRegionIds = new HashSet<>(getAllRegionMap(statement).keySet());

final String removeDataNodeSQL = generateRemoveString(Set.of(removeDataNodeId));
LOGGER.info("Submitting: {}", removeDataNodeSQL);
statement.execute(removeDataNodeSQL);
LOGGER.info("Remove DataNode {} submitted.", removeDataNodeId);

// The removal is asynchronous: the SQL returns once the procedure is submitted, while the
// actual region migration off the (dead) node keeps it in progress. Confirm it is in progress
// before we force a new allocation against it.
Assert.assertTrue(
"Removal completed before we could force a new allocation; cannot exercise the bug",
isRemovalInProgress(clientRef, removeDataNodeLocations));

// While the removal is in progress, force a fresh Region allocation by creating a new
// database
// and writing to it. Before the fix, the allocator could still choose the removing (Unknown)
// DataNode as a replica holder for these new regions.
try (final Connection probeConnection =
makeItCloseQuietly(EnvFactory.getEnv().getConnection());
final Statement probeStatement = makeItCloseQuietly(probeConnection.createStatement())) {
for (int i = 0; i < 64; i++) {
probeStatement.addBatch(
String.format(
"INSERT INTO root.alloc_probe.d%d(timestamp,speed) values(%d, %d)", i, i, i));
}
probeStatement.executeBatch();
LOGGER.info("Forced new Region allocation via root.alloc_probe.");

// The core assertion: none of the newly allocated Regions may land on the removing
// DataNode.
assertNewRegionsExcludeDataNode(probeStatement, preExistingRegionIds, removeDataNodeId);
}

// The removal must still be able to complete; the original bug left it stuck forever.
awaitUntilSuccess(clientRef, removeDataNodeLocations);
LOGGER.info("Remove DataNode {} completed.", removeDataNodeId);

// Final guard: after the node is gone, nothing references it any more.
assertNoRegionOnDataNode(statement, removeDataNodeId);
assertDataNodeAbsent(statement, removeDataNodeId);
} catch (InconsistentDataException e) {
LOGGER.error("Unexpected error:", e);
throw e;
}
}

private static boolean isRemovalInProgress(
final AtomicReference<SyncConfigNodeIServiceClient> clientRef,
final List<TDataNodeLocation> removeDataNodeLocations) {
try {
final List<TDataNodeLocation> remaining =
clientRef
.get()
.getDataNodeConfiguration(-1)
.getDataNodeConfigurationMap()
.values()
.stream()
.map(TDataNodeConfiguration::getLocation)
.collect(Collectors.toList());
return removeDataNodeLocations.stream().anyMatch(remaining::contains);
} catch (Exception e) {
LOGGER.warn("Failed to query DataNode configuration", e);
return false;
}
}

private static void awaitDataNodeStatus(
final Statement statement, final int dataNodeId, final String expectedStatus) {
Awaitility.await()
.atMost(2, TimeUnit.MINUTES)
.pollDelay(1, TimeUnit.SECONDS)
.until(
() -> {
try (final ResultSet result = statement.executeQuery(SHOW_DATANODES)) {
while (result.next()) {
if (result.getInt(ColumnHeaderConstant.NODE_ID) == dataNodeId) {
return expectedStatus.equalsIgnoreCase(
result.getString(ColumnHeaderConstant.STATUS));
}
}
}
return false;
});
}

/**
* Wait until the forced allocation has produced at least one new Region (a Region id not present
* in {@code preExistingRegionIds}), then assert that none of those new Regions has a replica on
* {@code dataNodeId}.
*/
private static void assertNewRegionsExcludeDataNode(
final Statement statement, final Set<Integer> preExistingRegionIds, final int dataNodeId) {
final AtomicReference<Map<Integer, Set<Integer>>> newRegionsRef = new AtomicReference<>();
Awaitility.await()
.atMost(1, TimeUnit.MINUTES)
.pollDelay(1, TimeUnit.SECONDS)
.until(
() -> {
final Map<Integer, Set<Integer>> newRegions =
getAllRegionMap(statement).entrySet().stream()
.filter(entry -> !preExistingRegionIds.contains(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
newRegionsRef.set(newRegions);
return !newRegions.isEmpty();
});

final Map<Integer, Set<Integer>> newRegions = newRegionsRef.get();
final Set<Integer> offendingRegions =
newRegions.entrySet().stream()
.filter(entry -> entry.getValue().contains(dataNodeId))
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
Assert.assertTrue(
String.format(
"Newly allocated Region(s) %s were placed on DataNode %d which is being removed; "
+ "new Region map: %s",
offendingRegions, dataNodeId, newRegions),
offendingRegions.isEmpty());
}

private static void assertNoRegionOnDataNode(final Statement statement, final int dataNodeId)
throws Exception {
final Map<Integer, Set<Integer>> allRegionMap = getAllRegionMap(statement);
final Set<Integer> offendingRegions =
allRegionMap.entrySet().stream()
.filter(entry -> entry.getValue().contains(dataNodeId))
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
Assert.assertTrue(
String.format(
"Region(s) %s still reference removed DataNode %d; full map: %s",
offendingRegions, dataNodeId, allRegionMap),
offendingRegions.isEmpty());
}

private static void assertDataNodeAbsent(final Statement statement, final int dataNodeId)
throws Exception {
try (final ResultSet result = statement.executeQuery(SHOW_DATANODES)) {
while (result.next()) {
Assert.assertNotEquals(
"DataNode " + dataNodeId + " should have been removed",
dataNodeId,
result.getInt(ColumnHeaderConstant.NODE_ID));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,29 @@
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}

/**
* Collect the ids of all DataNodes that an in-progress {@link RemoveDataNodesProcedure} is
* removing.
*
* <p>A DataNode being removed must not receive any newly allocated Region replica: doing so would
* leave a replica stranded on a node that is about to disappear, blocking the removal from ever
* completing. We cannot rely on the node's {@link NodeStatus} here, because a DataNode that was
* killed (e.g. {@code kill -9}) before removal is reported as {@link NodeStatus#Unknown} by the
* failure detector rather than {@link NodeStatus#Removing}, so a status filter alone would still
* let it through. The authoritative source is therefore the running procedure itself, which holds
* the removing DataNode list and survives leader switches via procedure persistence.
*
* @return the set of DataNode ids currently being removed (empty if no removal is in progress)
*/
public Set<Integer> getRemovingDataNodeIds() {
return getExecutor().getProcedures().values().stream()
.filter(procedure -> procedure instanceof RemoveDataNodesProcedure)

Check warning on line 805 in iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace this lambda with method reference 'RemoveDataNodesProcedure.class::isInstance'.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ670g74ChKmA1FeTn5r&open=AZ670g74ChKmA1FeTn5r&pullRequest=17934
.filter(procedure -> !procedure.isFinished())
.flatMap(procedure -> ((RemoveDataNodesProcedure) procedure).getRemovedDataNodes().stream())
.map(TDataNodeLocation::getDataNodeId)
.collect(Collectors.toSet());
}

// region region operation related check

/**
Expand Down
Loading
Loading