From 98246cec9063c4a2acf2134c848cf3fea847cc4b Mon Sep 17 00:00:00 2001 From: Yongzao <532741407@qq.com> Date: Fri, 12 Jun 2026 20:05:15 +0800 Subject: [PATCH] Propagate snapshot load failure during IoTConsensus AddPeer During region migration, when the target peer failed to load the transferred snapshot, the failure was silently swallowed: the target's IoTConsensus RPC handler returned SUCCESS regardless, so the coordinator activated the new peer and marked AddRegionPeerProcedure / RegionMigrateProcedure successful. The migration was reported complete while the destination replica actually had no data, leading to silent data loss once the source replica was dropped. The coordinator side already handles a non-SUCCESS triggerSnapshotLoad response correctly (it throws ConsensusGroupModifyPeerException, which fails the AddPeer task and rolls the procedure back without deleting the source replica). The only broken link was that snapshot-load failure was never reportable, because IStateMachine.loadSnapshot returned void and the implementations swallowed errors. Change IStateMachine.loadSnapshot to return boolean (true on success): - DataRegionStateMachine / SchemaRegionStateMachine / ConfigRegionState Machine return false when loading fails (and SchemaRegionStateMachine now guards its body so an exception is reported rather than thrown). - IoTConsensusServerImpl.loadSnapshot returns false if loading any receive folder fails (removing the long-standing TODO). - IoTConsensusRPCServiceProcessor.triggerSnapshotLoad returns a non- SUCCESS status when loadSnapshot fails, so the coordinator's existing error path fires and AddPeer fails instead of falsely succeeding. - SimpleConsensusServerImpl forwards the boolean; the Ratis ApplicationStateMachineProxy logs a failure (its behavior is otherwise unchanged). Test state machines updated accordingly. Add AddPeerSnapshotLoadFailureTest: a real two-node IoTConsensus group where the target's loadSnapshot is forced to fail; it verifies that addRemotePeer reaches the load step, throws ConsensusException, and does not leave the target peer active. The test fails against the old code and passes with the fix. --- .../ConfigRegionStateMachine.java | 4 +- .../apache/iotdb/consensus/IStateMachine.java | 5 +- .../consensus/iot/IoTConsensusServerImpl.java | 19 +- .../IoTConsensusRPCServiceProcessor.java | 10 +- .../ratis/ApplicationStateMachineProxy.java | 4 +- .../simple/SimpleConsensusServerImpl.java | 4 +- .../iotdb/consensus/EmptyStateMachine.java | 4 +- .../iot/AddPeerSnapshotLoadFailureTest.java | 244 ++++++++++++++++++ .../consensus/iot/util/TestStateMachine.java | 4 +- .../iotdb/consensus/ratis/TestUtils.java | 4 +- .../consensus/simple/SimpleConsensusTest.java | 4 +- .../dataregion/DataRegionStateMachine.java | 6 +- .../SchemaRegionStateMachine.java | 22 +- 13 files changed, 306 insertions(+), 28 deletions(-) create mode 100644 iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/AddPeerSnapshotLoadFailureTest.java diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java index b3029e6602808..1c2fb38470b61 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java @@ -246,7 +246,7 @@ public boolean takeSnapshot(File snapshotDir) { } @Override - public void loadSnapshot(final File latestSnapshotRootDir) { + public boolean loadSnapshot(final File latestSnapshotRootDir) { try { executor.loadSnapshot(latestSnapshotRootDir); // We recompute the snapshot for pipe listener when loading snapshot @@ -254,12 +254,14 @@ public void loadSnapshot(final File latestSnapshotRootDir) { PipeConfigNodeAgent.runtime() .listener() .tryListenToSnapshots(ConfigNodeSnapshotParser.getSnapshots()); + return true; } catch (final IOException e) { if (PipeConfigNodeAgent.runtime().listener().isOpened()) { LOGGER.warn( ConfigNodeMessages.CONFIG_REGION_LISTENING_QUEUE_LISTEN_TO_SNAPSHOT_FAILED_WHEN_STARTUP, e); } + return false; } } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java index 3354c83699b54..c7705d93896e8 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java @@ -112,8 +112,11 @@ default boolean clearSnapshot() { * Load the latest snapshot from given dir. * * @param latestSnapshotRootDir dir where the latest snapshot sits + * @return {@code true} if the snapshot was loaded successfully, {@code false} otherwise. Callers + * (e.g. the IoTConsensus AddPeer flow) rely on this to avoid activating a new peer whose + * snapshot failed to load, which would otherwise silently lose data. */ - void loadSnapshot(File latestSnapshotRootDir); + boolean loadSnapshot(File latestSnapshotRootDir); /** * given a snapshot dir, ask statemachine to provide all snapshot files. By default, it will list diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java index f3c7d4e50ae26..27a2691d63136 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java @@ -511,14 +511,17 @@ private void clearOldSnapshot() { } } - public void loadSnapshot(String snapshotId) { - // TODO: (xingtanzjr) throw exception if the snapshot load failed - recvFolderManager - .getFolders() - .forEach( - dir -> { - stateMachine.loadSnapshot(getSnapshotPath(dir, snapshotId)); - }); + public boolean loadSnapshot(String snapshotId) { + // Load the snapshot from every receive folder. If any of them fails, report the failure so the + // AddPeer coordinator does not activate this peer with incomplete data (which would silently + // lose data on this replica). + boolean success = true; + for (String dir : recvFolderManager.getFolders()) { + if (!stateMachine.loadSnapshot(getSnapshotPath(dir, snapshotId))) { + success = false; + } + } + return success; } private File getSnapshotPath(String curStorageDir, String snapshotRelativePath) { diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java index c9a9901dbcf6f..91e17b370ca1e 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java @@ -343,7 +343,15 @@ public TTriggerSnapshotLoadRes triggerSnapshotLoad(TTriggerSnapshotLoadReq req) status.setMessage(message); return new TTriggerSnapshotLoadRes(status); } - impl.loadSnapshot(req.snapshotId); + if (!impl.loadSnapshot(req.snapshotId)) { + String message = + String.format( + "Failed to load snapshot %s for consensus group %s", req.snapshotId, groupId); + LOGGER.error(message); + TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); + status.setMessage(message); + return new TTriggerSnapshotLoadRes(status); + } KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_ADD_PEER_TRANSITION); return new TTriggerSnapshotLoadRes(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java index 1134d8fd6f206..dc009a71887e0 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java @@ -268,7 +268,9 @@ private void loadSnapshot(File latestSnapshotDir) { } // require the application statemachine to load the latest snapshot - applicationStateMachine.loadSnapshot(latestSnapshotDir); + if (!applicationStateMachine.loadSnapshot(latestSnapshotDir)) { + logger.error("{}: failed to load snapshot from {}", this, latestSnapshotDir); + } TermIndex snapshotTermIndex = Utils.getTermIndexFromDir(latestSnapshotDir); updateLastAppliedTermIndex(snapshotTermIndex.getTerm(), snapshotTermIndex.getIndex()); } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java index 4ed4a7b41ce41..b76bcdeaaebbc 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java @@ -89,7 +89,7 @@ public synchronized boolean takeSnapshot(File snapshotDir) { } @Override - public synchronized void loadSnapshot(File latestSnapshotRootDir) { - stateMachine.loadSnapshot(latestSnapshotRootDir); + public synchronized boolean loadSnapshot(File latestSnapshotRootDir) { + return stateMachine.loadSnapshot(latestSnapshotRootDir); } } diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java index aafa0be5bb5ff..997120b00f96d 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java @@ -54,5 +54,7 @@ public boolean takeSnapshot(File snapshotDir) { } @Override - public void loadSnapshot(File latestSnapshotRootDir) {} + public boolean loadSnapshot(File latestSnapshotRootDir) { + return true; + } } diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/AddPeerSnapshotLoadFailureTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/AddPeerSnapshotLoadFailureTest.java new file mode 100644 index 0000000000000..aa69907e6ed4e --- /dev/null +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/AddPeerSnapshotLoadFailureTest.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.consensus.iot; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.commons.consensus.ConsensusGroupId; +import org.apache.iotdb.commons.consensus.DataRegionId; +import org.apache.iotdb.commons.exception.StartupException; +import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.consensus.common.Peer; +import org.apache.iotdb.consensus.config.ConsensusConfig; +import org.apache.iotdb.consensus.exception.ConsensusException; +import org.apache.iotdb.consensus.iot.util.TestEntry; +import org.apache.iotdb.consensus.iot.util.TestStateMachine; + +import org.apache.ratis.util.FileUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * Regression test for the snapshot-load-failure bug (a failed snapshot load on the AddPeer target + * was silently swallowed, so the new peer was activated and the migration falsely reported + * successful, losing data on the new replica). + * + *
It builds a real two-node IoTConsensus group, forces the target peer's {@link + * org.apache.iotdb.consensus.IStateMachine#loadSnapshot} to fail, and verifies that {@code + * addRemotePeer}: + * + *