diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java index b3029e6602808..1c2fb38470b61 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java @@ -246,7 +246,7 @@ public boolean takeSnapshot(File snapshotDir) { } @Override - public void loadSnapshot(final File latestSnapshotRootDir) { + public boolean loadSnapshot(final File latestSnapshotRootDir) { try { executor.loadSnapshot(latestSnapshotRootDir); // We recompute the snapshot for pipe listener when loading snapshot @@ -254,12 +254,14 @@ public void loadSnapshot(final File latestSnapshotRootDir) { PipeConfigNodeAgent.runtime() .listener() .tryListenToSnapshots(ConfigNodeSnapshotParser.getSnapshots()); + return true; } catch (final IOException e) { if (PipeConfigNodeAgent.runtime().listener().isOpened()) { LOGGER.warn( ConfigNodeMessages.CONFIG_REGION_LISTENING_QUEUE_LISTEN_TO_SNAPSHOT_FAILED_WHEN_STARTUP, e); } + return false; } } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java index 3354c83699b54..c7705d93896e8 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/IStateMachine.java @@ -112,8 +112,11 @@ default boolean clearSnapshot() { * Load the latest snapshot from given dir. * * @param latestSnapshotRootDir dir where the latest snapshot sits + * @return {@code true} if the snapshot was loaded successfully, {@code false} otherwise. Callers + * (e.g. the IoTConsensus AddPeer flow) rely on this to avoid activating a new peer whose + * snapshot failed to load, which would otherwise silently lose data. */ - void loadSnapshot(File latestSnapshotRootDir); + boolean loadSnapshot(File latestSnapshotRootDir); /** * given a snapshot dir, ask statemachine to provide all snapshot files. By default, it will list diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java index f3c7d4e50ae26..27a2691d63136 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java @@ -511,14 +511,17 @@ private void clearOldSnapshot() { } } - public void loadSnapshot(String snapshotId) { - // TODO: (xingtanzjr) throw exception if the snapshot load failed - recvFolderManager - .getFolders() - .forEach( - dir -> { - stateMachine.loadSnapshot(getSnapshotPath(dir, snapshotId)); - }); + public boolean loadSnapshot(String snapshotId) { + // Load the snapshot from every receive folder. If any of them fails, report the failure so the + // AddPeer coordinator does not activate this peer with incomplete data (which would silently + // lose data on this replica). + boolean success = true; + for (String dir : recvFolderManager.getFolders()) { + if (!stateMachine.loadSnapshot(getSnapshotPath(dir, snapshotId))) { + success = false; + } + } + return success; } private File getSnapshotPath(String curStorageDir, String snapshotRelativePath) { diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java index c9a9901dbcf6f..91e17b370ca1e 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java @@ -343,7 +343,15 @@ public TTriggerSnapshotLoadRes triggerSnapshotLoad(TTriggerSnapshotLoadReq req) status.setMessage(message); return new TTriggerSnapshotLoadRes(status); } - impl.loadSnapshot(req.snapshotId); + if (!impl.loadSnapshot(req.snapshotId)) { + String message = + String.format( + "Failed to load snapshot %s for consensus group %s", req.snapshotId, groupId); + LOGGER.error(message); + TSStatus status = new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode()); + status.setMessage(message); + return new TTriggerSnapshotLoadRes(status); + } KillPoint.setKillPoint(DataNodeKillPoints.DESTINATION_ADD_PEER_TRANSITION); return new TTriggerSnapshotLoadRes(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java index 1134d8fd6f206..dc009a71887e0 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java @@ -268,7 +268,9 @@ private void loadSnapshot(File latestSnapshotDir) { } // require the application statemachine to load the latest snapshot - applicationStateMachine.loadSnapshot(latestSnapshotDir); + if (!applicationStateMachine.loadSnapshot(latestSnapshotDir)) { + logger.error("{}: failed to load snapshot from {}", this, latestSnapshotDir); + } TermIndex snapshotTermIndex = Utils.getTermIndexFromDir(latestSnapshotDir); updateLastAppliedTermIndex(snapshotTermIndex.getTerm(), snapshotTermIndex.getIndex()); } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java index 4ed4a7b41ce41..b76bcdeaaebbc 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/simple/SimpleConsensusServerImpl.java @@ -89,7 +89,7 @@ public synchronized boolean takeSnapshot(File snapshotDir) { } @Override - public synchronized void loadSnapshot(File latestSnapshotRootDir) { - stateMachine.loadSnapshot(latestSnapshotRootDir); + public synchronized boolean loadSnapshot(File latestSnapshotRootDir) { + return stateMachine.loadSnapshot(latestSnapshotRootDir); } } diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java index aafa0be5bb5ff..997120b00f96d 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/EmptyStateMachine.java @@ -54,5 +54,7 @@ public boolean takeSnapshot(File snapshotDir) { } @Override - public void loadSnapshot(File latestSnapshotRootDir) {} + public boolean loadSnapshot(File latestSnapshotRootDir) { + return true; + } } diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/AddPeerSnapshotLoadFailureTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/AddPeerSnapshotLoadFailureTest.java new file mode 100644 index 0000000000000..aa69907e6ed4e --- /dev/null +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/AddPeerSnapshotLoadFailureTest.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.consensus.iot; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.commons.consensus.ConsensusGroupId; +import org.apache.iotdb.commons.consensus.DataRegionId; +import org.apache.iotdb.commons.exception.StartupException; +import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.consensus.common.Peer; +import org.apache.iotdb.consensus.config.ConsensusConfig; +import org.apache.iotdb.consensus.exception.ConsensusException; +import org.apache.iotdb.consensus.iot.util.TestEntry; +import org.apache.iotdb.consensus.iot.util.TestStateMachine; + +import org.apache.ratis.util.FileUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * Regression test for the snapshot-load-failure bug (a failed snapshot load on the AddPeer target + * was silently swallowed, so the new peer was activated and the migration falsely reported + * successful, losing data on the new replica). + * + *

It builds a real two-node IoTConsensus group, forces the target peer's {@link + * org.apache.iotdb.consensus.IStateMachine#loadSnapshot} to fail, and verifies that {@code + * addRemotePeer}: + * + *

+ */ +public class AddPeerSnapshotLoadFailureTest { + + private final Logger logger = LoggerFactory.getLogger(AddPeerSnapshotLoadFailureTest.class); + + private final ConsensusGroupId gid = new DataRegionId(1); + + private final int basePort = 9200; + + private final List peers = + Arrays.asList( + new Peer(gid, 1, new TEndPoint("127.0.0.1", basePort - 1)), + new Peer(gid, 2, new TEndPoint("127.0.0.1", basePort))); + + private final List peersStorage = + Arrays.asList( + new File("target" + File.separator + "snapshot-load-fail-1"), + new File("target" + File.separator + "snapshot-load-fail-2")); + + private final List> peersRecvSnapshotDirs = + Arrays.asList( + Arrays.asList( + "target" + File.separator + "snapshot-load-fail-1-recv-1", + "target" + File.separator + "snapshot-load-fail-1-recv-2"), + Arrays.asList( + "target" + File.separator + "snapshot-load-fail-2-recv-1", + "target" + File.separator + "snapshot-load-fail-2-recv-2")); + + private final List servers = new ArrayList<>(); + private final List stateMachines = new ArrayList<>(); + + /** A {@link TestStateMachine} whose snapshot load can be made to fail on demand. */ + private static class ControllableStateMachine extends TestStateMachine { + private volatile boolean failLoadSnapshot = false; + private volatile boolean loadSnapshotInvoked = false; + + void setFailLoadSnapshot(boolean failLoadSnapshot) { + this.failLoadSnapshot = failLoadSnapshot; + } + + boolean isLoadSnapshotInvoked() { + return loadSnapshotInvoked; + } + + @Override + public boolean loadSnapshot(File latestSnapshotRootDir) { + loadSnapshotInvoked = true; + if (failLoadSnapshot) { + return false; + } + return super.loadSnapshot(latestSnapshotRootDir); + } + + @Override + public boolean takeSnapshot(File snapshotDir) { + return true; + } + + // TestStateMachine does not implement clearSnapshot (the IStateMachine default throws). The + // AddPeer flow calls it in a finally block to clean up the local snapshot, so we provide a + // no-op here; otherwise that cleanup would mask the ConsensusException we are asserting on. + @Override + public boolean clearSnapshot() { + return true; + } + } + + @Before + public void setUp() throws Exception { + for (File file : peersStorage) { + file.mkdirs(); + stateMachines.add(new ControllableStateMachine()); + } + peersRecvSnapshotDirs.forEach(innerList -> innerList.forEach(dir -> new File(dir).mkdirs())); + initServer(); + } + + @After + public void tearDown() throws Exception { + servers.parallelStream().forEach(IoTConsensus::stop); + servers.clear(); + for (File file : peersStorage) { + FileUtils.deleteFully(file); + } + peersRecvSnapshotDirs.forEach( + innerList -> + innerList.forEach( + dir -> { + try { + FileUtils.deleteFully(new File(dir)); + } catch (IOException e) { + throw new RuntimeException(e); + } + })); + } + + private void initServer() throws IOException { + Assume.assumeTrue(checkPortAvailable()); + try { + for (int i = 0; i < peers.size(); i++) { + int finalI = i; + servers.add( + (IoTConsensus) + ConsensusFactory.getConsensusImpl( + ConsensusFactory.IOT_CONSENSUS, + ConsensusConfig.newBuilder() + .setThisNodeId(peers.get(i).getNodeId()) + .setThisNode(peers.get(i).getEndpoint()) + .setStorageDir(peersStorage.get(i).getAbsolutePath()) + .setRecvSnapshotDirs(peersRecvSnapshotDirs.get(i)) + .setConsensusGroupType(TConsensusGroupType.DataRegion) + .build(), + groupId -> stateMachines.get(finalI)) + .orElseThrow( + () -> + new IllegalArgumentException( + String.format( + ConsensusFactory.CONSTRUCT_FAILED_MSG, + ConsensusFactory.IOT_CONSENSUS)))); + } + for (int i = 0; i < peers.size(); i++) { + servers.get(i).start(); + } + } catch (IOException e) { + if (e.getCause() instanceof StartupException) { + // just succeed when can not bind socket + logger.info("Can not start IoTConsensus because", e); + Assume.assumeTrue(false); + } else { + logger.error("Failed because", e); + Assert.fail("Failed because " + e.getMessage()); + } + } + } + + @Test + public void addRemotePeerMustFailWhenTargetSnapshotLoadFails() throws Exception { + // node 0 is the sole initial member; node 1 will be added as a new peer. Mirroring the real + // region-migration flow, the destination peer (node 1) is pre-created locally with the full + // target peer list (IoTConsensus, unlike Ratis, requires a non-empty peer list here). + servers.get(0).createLocalPeer(gid, peers.subList(0, 1)); + servers.get(1).createLocalPeer(gid, peers); + + // Put some data into the group so the snapshot transfer is meaningful. + for (int i = 0; i < 10; i++) { + servers.get(0).write(gid, new TestEntry(i, peers.get(0))); + } + + // Force the target peer (node 1) to fail loading the transferred snapshot. + stateMachines.get(1).setFailLoadSnapshot(true); + + // Before the fix, addRemotePeer swallowed the load failure and returned normally, leaving the + // target peer active with incomplete data. It must now surface the failure. + Assert.assertThrows( + ConsensusException.class, () -> servers.get(0).addRemotePeer(gid, peers.get(1))); + + // The failure must be the snapshot load itself, i.e. the AddPeer flow actually reached the + // load step on the target rather than aborting earlier. + Assert.assertTrue( + "Target peer's loadSnapshot was never invoked; the failure came from an earlier step", + stateMachines.get(1).isLoadSnapshotInvoked()); + + // The target peer must not be left active with an incompletely-loaded snapshot. + Assert.assertFalse( + "Target peer was activated despite a failed snapshot load", + servers.get(1).getImpl(gid).isActive()); + } + + private boolean checkPortAvailable() { + for (Peer peer : this.peers) { + try (ServerSocket ignored = new ServerSocket(peer.getEndpoint().port)) { + logger.info("check port {} success for node {}", peer.getEndpoint().port, peer.getNodeId()); + } catch (IOException e) { + logger.error("check port {} failed for node {}", peer.getEndpoint().port, peer.getNodeId()); + return false; + } + } + return true; + } +} diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/TestStateMachine.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/TestStateMachine.java index a879a03478457..9454dcb0c4392 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/TestStateMachine.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/util/TestStateMachine.java @@ -142,5 +142,7 @@ public boolean takeSnapshot(File snapshotDir) { } @Override - public void loadSnapshot(File latestSnapshotRootDir) {} + public boolean loadSnapshot(File latestSnapshotRootDir) { + return true; + } } diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java index 5cac2173396a3..3217d1cc58e0c 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/TestUtils.java @@ -177,13 +177,15 @@ public boolean takeSnapshot(File snapshotDir) { } @Override - public void loadSnapshot(File latestSnapshotRootDir) { + public boolean loadSnapshot(File latestSnapshotRootDir) { File snapshot = new File(latestSnapshotRootDir.getAbsolutePath() + File.separator + "snapshot"); try (Scanner scanner = new Scanner(snapshot)) { integer.set(Integer.parseInt(scanner.next())); + return true; } catch (FileNotFoundException e) { logger.error("cannot find snapshot file {}", snapshot); + return false; } } diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java index d50ab992f6f9c..ff062fb63a85b 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/simple/SimpleConsensusTest.java @@ -123,7 +123,9 @@ public boolean takeSnapshot(File snapshotDir) { } @Override - public void loadSnapshot(File latestSnapshotRootDir) {} + public boolean loadSnapshot(File latestSnapshotRootDir) { + return true; + } } @Before diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java index 2de1ec9fdc0ce..62bb498afa19a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/dataregion/DataRegionStateMachine.java @@ -124,7 +124,7 @@ public boolean clearSnapshot() { } @Override - public void loadSnapshot(File latestSnapshotRootDir) { + public boolean loadSnapshot(File latestSnapshotRootDir) { String databaseName = region.getDatabaseName(); String dataRegionIdString = region.getDataRegionIdString(); DataRegionId regionId = new DataRegionId(Integer.parseInt(dataRegionIdString)); @@ -141,14 +141,16 @@ public void loadSnapshot(File latestSnapshotRootDir) { .loadSnapshotForStateMachine()); if (newRegion == null) { logger.error(DataNodeMiscMessages.FAIL_LOAD_SNAPSHOT, latestSnapshotRootDir); - return; + return false; } this.region = newRegion; ChunkCache.getInstance().clear(); TimeSeriesMetadataCache.getInstance().clear(); BloomFilterCache.getInstance().clear(); + return true; } catch (Exception e) { logger.error(DataNodeMiscMessages.EXCEPTION_REPLACING_DATA_REGION, e); + return false; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java index edab5862d11e8..3f647ced7a5e7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java @@ -136,14 +136,20 @@ public boolean takeSnapshot(final File snapshotDir) { } @Override - public void loadSnapshot(final File latestSnapshotRootDir) { - schemaRegion.loadSnapshot(latestSnapshotRootDir); - PipeDataNodeAgent.runtime() - .schemaListener(schemaRegion.getSchemaRegionId()) - .loadSnapshot(latestSnapshotRootDir); - // We recompute the snapshot for pipe listener when loading snapshot - // to recover the newest snapshot in cache - listen2Snapshot4PipeListener(false); + public boolean loadSnapshot(final File latestSnapshotRootDir) { + try { + schemaRegion.loadSnapshot(latestSnapshotRootDir); + PipeDataNodeAgent.runtime() + .schemaListener(schemaRegion.getSchemaRegionId()) + .loadSnapshot(latestSnapshotRootDir); + // We recompute the snapshot for pipe listener when loading snapshot + // to recover the newest snapshot in cache + listen2Snapshot4PipeListener(false); + return true; + } catch (Exception e) { + logger.error("Failed to load snapshot from {}", latestSnapshotRootDir, e); + return false; + } } public void listen2Snapshot4PipeListener(final boolean isTmp) {