From 2039473d8fedaf288efa11739b679ebd9717e041 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 12 Jun 2026 16:55:24 +0800 Subject: [PATCH] Pipe: avoid blocking startup on hardlink dir cleanup --- .../iotdb/db/i18n/DataNodePipeMessages.java | 12 + .../iotdb/db/i18n/DataNodePipeMessages.java | 12 + .../runtime/PipeDataNodeRuntimeAgent.java | 2 +- ...HardlinkOrCopiedFileDirStartupCleaner.java | 254 +++++++++++++++++- 4 files changed, 269 insertions(+), 11 deletions(-) diff --git a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java index f669934149d9e..fe924fbbaf9d7 100644 --- a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java +++ b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodePipeMessages.java @@ -1266,6 +1266,18 @@ public final class DataNodePipeMessages { "PipeTsFileResource's reference count is decreased to below 0."; public static final String PIPE_HARDLINK_DIR_FOUND_DELETING_IT_RESULT = "Pipe hardlink dir found, deleting it: {}, result: {}"; + public static final String PIPE_HARDLINK_DIR_FOUND_MOVED_TO_PERIODICAL_DELETE = + "Pipe hardlink dir found, moved it from {} to {} for throttled periodical deletion."; + public static final String PIPE_STALE_HARDLINK_DIR_FOUND_REGISTERING_PERIODICAL_DELETE = + "Stale pipe hardlink dir found, registering it for throttled periodical deletion: {}"; + public static final String PIPE_HARDLINK_DIR_PERIODICAL_DELETE_FINISHED = + "Finished deleting stale pipe hardlink dir {} by periodical job, result: {}"; + public static final String PIPE_HARDLINK_DIR_PERIODICAL_DELETE_PROGRESS = + "Periodically deleted {} paths from stale pipe hardlink dirs, current dir: {}, current round result: {}"; + public static final String PIPE_HARDLINK_DIR_PERIODICAL_DELETE_ALL_FINISHED = + "Finished deleting all stale pipe hardlink dirs by periodical job."; + public static final String PIPE_HARDLINK_DIR_MOVE_FAILED_DELETING_SYNC = + "Failed to move pipe hardlink dir {} for periodical deletion, deleting it synchronously."; public static final String PIPE_SNAPSHOT_DIR_FOUND_DELETING_IT = "Pipe snapshot dir found, deleting it: {},"; public static final String SHRINK_CALLBACK_IS_NOT_SUPPORTED_IN_PIPEFIXEDMEMORYBLOCK = diff --git a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java index 131a83cf808d3..b286d7607f88c 100644 --- a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java +++ b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodePipeMessages.java @@ -1221,6 +1221,18 @@ public final class DataNodePipeMessages { "PipeTsFileResource's reference count is decreased to below 0."; public static final String PIPE_HARDLINK_DIR_FOUND_DELETING_IT_RESULT = "Pipe hardlink dir found, deleting it: {}, result: {}"; + public static final String PIPE_HARDLINK_DIR_FOUND_MOVED_TO_PERIODICAL_DELETE = + "Pipe hardlink dir found, moved it from {} to {} for throttled periodical deletion."; + public static final String PIPE_STALE_HARDLINK_DIR_FOUND_REGISTERING_PERIODICAL_DELETE = + "Stale pipe hardlink dir found, registering it for throttled periodical deletion: {}"; + public static final String PIPE_HARDLINK_DIR_PERIODICAL_DELETE_FINISHED = + "Finished deleting stale pipe hardlink dir {} by periodical job, result: {}"; + public static final String PIPE_HARDLINK_DIR_PERIODICAL_DELETE_PROGRESS = + "Periodically deleted {} paths from stale pipe hardlink dirs, current dir: {}, current round result: {}"; + public static final String PIPE_HARDLINK_DIR_PERIODICAL_DELETE_ALL_FINISHED = + "Finished deleting all stale pipe hardlink dirs by periodical job."; + public static final String PIPE_HARDLINK_DIR_MOVE_FAILED_DELETING_SYNC = + "Failed to move pipe hardlink dir {} for periodical deletion, deleting it synchronously."; public static final String PIPE_SNAPSHOT_DIR_FOUND_DELETING_IT = "Pipe snapshot dir found, deleting it: {},"; public static final String SHRINK_CALLBACK_IS_NOT_SUPPORTED_IN_PIPEFIXEDMEMORYBLOCK = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java index 3cdf92d53925b..80c7f16a0b735 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java @@ -81,7 +81,7 @@ public class PipeDataNodeRuntimeAgent implements IService { public synchronized void preparePipeResources( final ResourcesInformationHolder resourcesInformationHolder) throws StartupException { // Clean sender (connector) hardlink file dir and snapshot dir - PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner.clean(); + PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner.clean(this::registerPeriodicalJob); // Clean receiver file dir PipeDataNodeAgent.receiver().cleanPipeReceiverDirs(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner.java index 0ee642e7d52e7..1302134b6cebc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner.java @@ -29,33 +29,267 @@ import org.slf4j.LoggerFactory; import java.io.File; +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; public class PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner { private static final Logger LOGGER = LoggerFactory.getLogger(PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner.class); + private static final String STALE_PIPE_DIR_SUFFIX = ".startup-cleaning-"; + private static final String PERIODICAL_CLEANUP_JOB_ID = + "PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner#cleanTsFileDir()"; + private static final long DELETE_MAX_PATH_COUNT_PER_ROUND = 100_000L; + private static final long DELETE_MAX_TIME_PER_ROUND_MS = 1_000L; /** * Delete the data directory and all of its subdirectories that contain the * PipeConfig.PIPE_TSFILE_DIR_NAME directory. */ - public static void clean() { - cleanTsFileDir(); + public static void clean(final PeriodicalJobRegistrar periodicalJobRegistrar) { + cleanTsFileDir(periodicalJobRegistrar); cleanSnapshotDir(); } - private static void cleanTsFileDir() { + private static void cleanTsFileDir(final PeriodicalJobRegistrar periodicalJobRegistrar) { + final String pipeHardlinkBaseDirName = PipeConfig.getInstance().getPipeHardlinkBaseDirName(); + final List stalePipeDirs = new ArrayList<>(); for (final String dataDir : IoTDBDescriptor.getInstance().getConfig().getDataDirs()) { - final File pipeHardLinkDir = - new File( - dataDir + File.separator + PipeConfig.getInstance().getPipeHardlinkBaseDirName()); + final File localDataDir = new File(dataDir); + collectInterruptedStalePipeDirs(localDataDir, pipeHardlinkBaseDirName, stalePipeDirs); + + final File pipeHardLinkDir = new File(localDataDir, pipeHardlinkBaseDirName); if (pipeHardLinkDir.isDirectory()) { - LOGGER.info( - DataNodePipeMessages.PIPE_HARDLINK_DIR_FOUND_DELETING_IT_RESULT, - pipeHardLinkDir, - FileUtils.deleteQuietly(pipeHardLinkDir)); + moveAsideAndCollect(pipeHardLinkDir, pipeHardlinkBaseDirName, stalePipeDirs); } } + registerPeriodicalCleanupJob(periodicalJobRegistrar, stalePipeDirs); + } + + private static void collectInterruptedStalePipeDirs( + final File localDataDir, + final String pipeHardlinkBaseDirName, + final List stalePipeDirs) { + final File[] stalePipeDirFiles = + localDataDir.listFiles( + file -> + file.isDirectory() + && file.getName().startsWith(pipeHardlinkBaseDirName + STALE_PIPE_DIR_SUFFIX)); + if (stalePipeDirFiles == null) { + return; + } + + for (final File stalePipeDir : stalePipeDirFiles) { + LOGGER.info( + DataNodePipeMessages.PIPE_STALE_HARDLINK_DIR_FOUND_REGISTERING_PERIODICAL_DELETE, + stalePipeDir); + stalePipeDirs.add(stalePipeDir); + } + } + + private static void moveAsideAndCollect( + final File pipeHardLinkDir, + final String pipeHardlinkBaseDirName, + final List stalePipeDirs) { + try { + final File stalePipeDir = moveAside(pipeHardLinkDir, pipeHardlinkBaseDirName); + LOGGER.info( + DataNodePipeMessages.PIPE_HARDLINK_DIR_FOUND_MOVED_TO_PERIODICAL_DELETE, + pipeHardLinkDir, + stalePipeDir); + stalePipeDirs.add(stalePipeDir); + } catch (final IOException e) { + LOGGER.warn( + DataNodePipeMessages.PIPE_HARDLINK_DIR_MOVE_FAILED_DELETING_SYNC, pipeHardLinkDir, e); + LOGGER.info( + DataNodePipeMessages.PIPE_HARDLINK_DIR_FOUND_DELETING_IT_RESULT, + pipeHardLinkDir, + FileUtils.deleteQuietly(pipeHardLinkDir)); + } + } + + private static File moveAside(final File pipeHardLinkDir, final String pipeHardlinkBaseDirName) + throws IOException { + final File parentDir = pipeHardLinkDir.getParentFile(); + if (parentDir == null) { + throw new IOException("Failed to get parent dir of " + pipeHardLinkDir); + } + + final long timestamp = System.currentTimeMillis(); + for (int i = 0; ; ++i) { + final File stalePipeDir = + new File( + parentDir, pipeHardlinkBaseDirName + STALE_PIPE_DIR_SUFFIX + timestamp + "-" + i); + if (!stalePipeDir.exists()) { + Files.move(pipeHardLinkDir.toPath(), stalePipeDir.toPath()); + return stalePipeDir; + } + } + } + + private static void registerPeriodicalCleanupJob( + final PeriodicalJobRegistrar periodicalJobRegistrar, final List stalePipeDirs) { + if (stalePipeDirs.isEmpty()) { + return; + } + + periodicalJobRegistrar.register( + PERIODICAL_CLEANUP_JOB_ID, + new PeriodicalStalePipeDirCleaner(stalePipeDirs)::cleanOneRound, + PipeConfig.getInstance().getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds()); + } + + private static CleanupRoundResult deleteQuietlyWithThrottle(final File stalePipeDir) { + if (!stalePipeDir.exists()) { + return CleanupRoundResult.finished(); + } + + final AtomicBoolean deleteResult = new AtomicBoolean(true); + final AtomicLong deletedPathCount = new AtomicLong(0); + final long deadlineNanos = System.nanoTime() + DELETE_MAX_TIME_PER_ROUND_MS * 1_000_000L; + try { + Files.walkFileTree( + stalePipeDir.toPath(), + new SimpleFileVisitor() { + @Override + public FileVisitResult preVisitDirectory( + final Path dir, final BasicFileAttributes attrs) { + return shouldStop(deletedPathCount, deadlineNanos) + ? FileVisitResult.TERMINATE + : FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult visitFile(final Path file, final BasicFileAttributes attrs) { + return deletePath(file, deleteResult, deletedPathCount, deadlineNanos); + } + + @Override + public FileVisitResult visitFileFailed(final Path file, final IOException exc) { + deleteResult.set(false); + return deletePath(file, deleteResult, deletedPathCount, deadlineNanos); + } + + @Override + public FileVisitResult postVisitDirectory(final Path dir, final IOException exc) { + if (exc != null) { + deleteResult.set(false); + } + return deletePath(dir, deleteResult, deletedPathCount, deadlineNanos); + } + }); + } catch (final IOException e) { + deleteResult.set(false); + } + + return new CleanupRoundResult( + deletedPathCount.get(), deleteResult.get() && !stalePipeDir.exists(), deleteResult.get()); + } + + private static FileVisitResult deletePath( + final Path path, + final AtomicBoolean deleteResult, + final AtomicLong deletedPathCount, + final long deadlineNanos) { + if (shouldStop(deletedPathCount, deadlineNanos)) { + return FileVisitResult.TERMINATE; + } + + try { + if (Files.deleteIfExists(path)) { + deletedPathCount.incrementAndGet(); + } + } catch (final IOException e) { + deleteResult.set(false); + } + return shouldStop(deletedPathCount, deadlineNanos) + ? FileVisitResult.TERMINATE + : FileVisitResult.CONTINUE; + } + + private static boolean shouldStop(final AtomicLong deletedPathCount, final long deadlineNanos) { + return Thread.currentThread().isInterrupted() + || deletedPathCount.get() >= DELETE_MAX_PATH_COUNT_PER_ROUND + || System.nanoTime() >= deadlineNanos; + } + + @FunctionalInterface + public interface PeriodicalJobRegistrar { + + void register(String id, Runnable periodicalJob, long intervalInSeconds); + } + + private static class PeriodicalStalePipeDirCleaner { + + private final List stalePipeDirs; + private int currentDirIndex; + private boolean finished; + + private PeriodicalStalePipeDirCleaner(final List stalePipeDirs) { + this.stalePipeDirs = stalePipeDirs; + currentDirIndex = 0; + finished = false; + } + + private void cleanOneRound() { + if (finished) { + return; + } + + long deletedPathCount = 0; + while (currentDirIndex < stalePipeDirs.size()) { + final File stalePipeDir = stalePipeDirs.get(currentDirIndex); + final CleanupRoundResult result = deleteQuietlyWithThrottle(stalePipeDir); + deletedPathCount += result.deletedPathCount; + + if (result.finished) { + LOGGER.info( + DataNodePipeMessages.PIPE_HARDLINK_DIR_PERIODICAL_DELETE_FINISHED, + stalePipeDir, + result.success); + ++currentDirIndex; + continue; + } + + if (deletedPathCount > 0 || !result.success) { + LOGGER.info( + DataNodePipeMessages.PIPE_HARDLINK_DIR_PERIODICAL_DELETE_PROGRESS, + deletedPathCount, + stalePipeDir, + result.success); + } + return; + } + + finished = true; + LOGGER.info(DataNodePipeMessages.PIPE_HARDLINK_DIR_PERIODICAL_DELETE_ALL_FINISHED); + } + } + + private static class CleanupRoundResult { + + private final long deletedPathCount; + private final boolean finished; + private final boolean success; + + private CleanupRoundResult( + final long deletedPathCount, final boolean finished, final boolean success) { + this.deletedPathCount = deletedPathCount; + this.finished = finished; + this.success = success; + } + + private static CleanupRoundResult finished() { + return new CleanupRoundResult(0, true, true); + } } private static void cleanSnapshotDir() {