Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1266,6 +1266,18 @@ public final class DataNodePipeMessages {
"PipeTsFileResource's reference count is decreased to below 0.";
public static final String PIPE_HARDLINK_DIR_FOUND_DELETING_IT_RESULT =
"Pipe hardlink dir found, deleting it: {}, result: {}";
public static final String PIPE_HARDLINK_DIR_FOUND_MOVED_TO_PERIODICAL_DELETE =
"Pipe hardlink dir found, moved it from {} to {} for throttled periodical deletion.";
public static final String PIPE_STALE_HARDLINK_DIR_FOUND_REGISTERING_PERIODICAL_DELETE =
"Stale pipe hardlink dir found, registering it for throttled periodical deletion: {}";
public static final String PIPE_HARDLINK_DIR_PERIODICAL_DELETE_FINISHED =
"Finished deleting stale pipe hardlink dir {} by periodical job, result: {}";
public static final String PIPE_HARDLINK_DIR_PERIODICAL_DELETE_PROGRESS =
"Periodically deleted {} paths from stale pipe hardlink dirs, current dir: {}, current round result: {}";
public static final String PIPE_HARDLINK_DIR_PERIODICAL_DELETE_ALL_FINISHED =
"Finished deleting all stale pipe hardlink dirs by periodical job.";
public static final String PIPE_HARDLINK_DIR_MOVE_FAILED_DELETING_SYNC =
"Failed to move pipe hardlink dir {} for periodical deletion, deleting it synchronously.";
public static final String PIPE_SNAPSHOT_DIR_FOUND_DELETING_IT =
"Pipe snapshot dir found, deleting it: {},";
public static final String SHRINK_CALLBACK_IS_NOT_SUPPORTED_IN_PIPEFIXEDMEMORYBLOCK =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1221,6 +1221,18 @@ public final class DataNodePipeMessages {
"PipeTsFileResource's reference count is decreased to below 0.";
public static final String PIPE_HARDLINK_DIR_FOUND_DELETING_IT_RESULT =
"Pipe hardlink dir found, deleting it: {}, result: {}";
public static final String PIPE_HARDLINK_DIR_FOUND_MOVED_TO_PERIODICAL_DELETE =
"Pipe hardlink dir found, moved it from {} to {} for throttled periodical deletion.";
public static final String PIPE_STALE_HARDLINK_DIR_FOUND_REGISTERING_PERIODICAL_DELETE =
"Stale pipe hardlink dir found, registering it for throttled periodical deletion: {}";
public static final String PIPE_HARDLINK_DIR_PERIODICAL_DELETE_FINISHED =
"Finished deleting stale pipe hardlink dir {} by periodical job, result: {}";
public static final String PIPE_HARDLINK_DIR_PERIODICAL_DELETE_PROGRESS =
"Periodically deleted {} paths from stale pipe hardlink dirs, current dir: {}, current round result: {}";
public static final String PIPE_HARDLINK_DIR_PERIODICAL_DELETE_ALL_FINISHED =
"Finished deleting all stale pipe hardlink dirs by periodical job.";
public static final String PIPE_HARDLINK_DIR_MOVE_FAILED_DELETING_SYNC =
"Failed to move pipe hardlink dir {} for periodical deletion, deleting it synchronously.";
public static final String PIPE_SNAPSHOT_DIR_FOUND_DELETING_IT =
"Pipe snapshot dir found, deleting it: {},";
public static final String SHRINK_CALLBACK_IS_NOT_SUPPORTED_IN_PIPEFIXEDMEMORYBLOCK =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public class PipeDataNodeRuntimeAgent implements IService {
public synchronized void preparePipeResources(
final ResourcesInformationHolder resourcesInformationHolder) throws StartupException {
// Clean sender (connector) hardlink file dir and snapshot dir
PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner.clean();
PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner.clean(this::registerPeriodicalJob);

// Clean receiver file dir
PipeDataNodeAgent.receiver().cleanPipeReceiverDirs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,33 +29,267 @@
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

public class PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner {

private static final Logger LOGGER =
LoggerFactory.getLogger(PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner.class);
private static final String STALE_PIPE_DIR_SUFFIX = ".startup-cleaning-";
private static final String PERIODICAL_CLEANUP_JOB_ID =
"PipeDataNodeHardlinkOrCopiedFileDirStartupCleaner#cleanTsFileDir()";
private static final long DELETE_MAX_PATH_COUNT_PER_ROUND = 100_000L;
private static final long DELETE_MAX_TIME_PER_ROUND_MS = 1_000L;

/**
* Delete the data directory and all of its subdirectories that contain the
* PipeConfig.PIPE_TSFILE_DIR_NAME directory.
*/
public static void clean() {
cleanTsFileDir();
public static void clean(final PeriodicalJobRegistrar periodicalJobRegistrar) {
cleanTsFileDir(periodicalJobRegistrar);
cleanSnapshotDir();
}

private static void cleanTsFileDir() {
private static void cleanTsFileDir(final PeriodicalJobRegistrar periodicalJobRegistrar) {
final String pipeHardlinkBaseDirName = PipeConfig.getInstance().getPipeHardlinkBaseDirName();
final List<File> stalePipeDirs = new ArrayList<>();
for (final String dataDir : IoTDBDescriptor.getInstance().getConfig().getDataDirs()) {
final File pipeHardLinkDir =
new File(
dataDir + File.separator + PipeConfig.getInstance().getPipeHardlinkBaseDirName());
final File localDataDir = new File(dataDir);
collectInterruptedStalePipeDirs(localDataDir, pipeHardlinkBaseDirName, stalePipeDirs);

final File pipeHardLinkDir = new File(localDataDir, pipeHardlinkBaseDirName);
if (pipeHardLinkDir.isDirectory()) {
LOGGER.info(
DataNodePipeMessages.PIPE_HARDLINK_DIR_FOUND_DELETING_IT_RESULT,
pipeHardLinkDir,
FileUtils.deleteQuietly(pipeHardLinkDir));
moveAsideAndCollect(pipeHardLinkDir, pipeHardlinkBaseDirName, stalePipeDirs);
}
}
registerPeriodicalCleanupJob(periodicalJobRegistrar, stalePipeDirs);
}

private static void collectInterruptedStalePipeDirs(
final File localDataDir,
final String pipeHardlinkBaseDirName,
final List<File> stalePipeDirs) {
final File[] stalePipeDirFiles =
localDataDir.listFiles(
file ->
file.isDirectory()
&& file.getName().startsWith(pipeHardlinkBaseDirName + STALE_PIPE_DIR_SUFFIX));
if (stalePipeDirFiles == null) {
return;
}

for (final File stalePipeDir : stalePipeDirFiles) {
LOGGER.info(
DataNodePipeMessages.PIPE_STALE_HARDLINK_DIR_FOUND_REGISTERING_PERIODICAL_DELETE,
stalePipeDir);
stalePipeDirs.add(stalePipeDir);
}
}

private static void moveAsideAndCollect(
final File pipeHardLinkDir,
final String pipeHardlinkBaseDirName,
final List<File> stalePipeDirs) {
try {
final File stalePipeDir = moveAside(pipeHardLinkDir, pipeHardlinkBaseDirName);
LOGGER.info(
DataNodePipeMessages.PIPE_HARDLINK_DIR_FOUND_MOVED_TO_PERIODICAL_DELETE,
pipeHardLinkDir,
stalePipeDir);
stalePipeDirs.add(stalePipeDir);
} catch (final IOException e) {
LOGGER.warn(
DataNodePipeMessages.PIPE_HARDLINK_DIR_MOVE_FAILED_DELETING_SYNC, pipeHardLinkDir, e);
LOGGER.info(
DataNodePipeMessages.PIPE_HARDLINK_DIR_FOUND_DELETING_IT_RESULT,
pipeHardLinkDir,
FileUtils.deleteQuietly(pipeHardLinkDir));
}
}

private static File moveAside(final File pipeHardLinkDir, final String pipeHardlinkBaseDirName)
throws IOException {
final File parentDir = pipeHardLinkDir.getParentFile();
if (parentDir == null) {
throw new IOException("Failed to get parent dir of " + pipeHardLinkDir);
}

final long timestamp = System.currentTimeMillis();
for (int i = 0; ; ++i) {
final File stalePipeDir =
new File(
parentDir, pipeHardlinkBaseDirName + STALE_PIPE_DIR_SUFFIX + timestamp + "-" + i);
if (!stalePipeDir.exists()) {
Files.move(pipeHardLinkDir.toPath(), stalePipeDir.toPath());
return stalePipeDir;
}
}
}

private static void registerPeriodicalCleanupJob(
final PeriodicalJobRegistrar periodicalJobRegistrar, final List<File> stalePipeDirs) {
if (stalePipeDirs.isEmpty()) {
return;
}

periodicalJobRegistrar.register(
PERIODICAL_CLEANUP_JOB_ID,
new PeriodicalStalePipeDirCleaner(stalePipeDirs)::cleanOneRound,
PipeConfig.getInstance().getPipeSubtaskExecutorCronHeartbeatEventIntervalSeconds());
}

private static CleanupRoundResult deleteQuietlyWithThrottle(final File stalePipeDir) {
if (!stalePipeDir.exists()) {
return CleanupRoundResult.finished();
}

final AtomicBoolean deleteResult = new AtomicBoolean(true);
final AtomicLong deletedPathCount = new AtomicLong(0);
final long deadlineNanos = System.nanoTime() + DELETE_MAX_TIME_PER_ROUND_MS * 1_000_000L;
try {
Files.walkFileTree(
stalePipeDir.toPath(),
new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult preVisitDirectory(
final Path dir, final BasicFileAttributes attrs) {
return shouldStop(deletedPathCount, deadlineNanos)
? FileVisitResult.TERMINATE
: FileVisitResult.CONTINUE;
}

@Override
public FileVisitResult visitFile(final Path file, final BasicFileAttributes attrs) {
return deletePath(file, deleteResult, deletedPathCount, deadlineNanos);
}

@Override
public FileVisitResult visitFileFailed(final Path file, final IOException exc) {
deleteResult.set(false);
return deletePath(file, deleteResult, deletedPathCount, deadlineNanos);
}

@Override
public FileVisitResult postVisitDirectory(final Path dir, final IOException exc) {
if (exc != null) {
deleteResult.set(false);
}
return deletePath(dir, deleteResult, deletedPathCount, deadlineNanos);
}
});
} catch (final IOException e) {
deleteResult.set(false);
}

return new CleanupRoundResult(
deletedPathCount.get(), deleteResult.get() && !stalePipeDir.exists(), deleteResult.get());
}

private static FileVisitResult deletePath(
final Path path,
final AtomicBoolean deleteResult,
final AtomicLong deletedPathCount,
final long deadlineNanos) {
if (shouldStop(deletedPathCount, deadlineNanos)) {
return FileVisitResult.TERMINATE;
}

try {
if (Files.deleteIfExists(path)) {
deletedPathCount.incrementAndGet();
}
} catch (final IOException e) {
deleteResult.set(false);
}
return shouldStop(deletedPathCount, deadlineNanos)
? FileVisitResult.TERMINATE
: FileVisitResult.CONTINUE;
}

private static boolean shouldStop(final AtomicLong deletedPathCount, final long deadlineNanos) {
return Thread.currentThread().isInterrupted()
|| deletedPathCount.get() >= DELETE_MAX_PATH_COUNT_PER_ROUND
|| System.nanoTime() >= deadlineNanos;
}

@FunctionalInterface
public interface PeriodicalJobRegistrar {

void register(String id, Runnable periodicalJob, long intervalInSeconds);
}

private static class PeriodicalStalePipeDirCleaner {

private final List<File> stalePipeDirs;
private int currentDirIndex;
private boolean finished;

private PeriodicalStalePipeDirCleaner(final List<File> stalePipeDirs) {
this.stalePipeDirs = stalePipeDirs;
currentDirIndex = 0;
finished = false;
}

private void cleanOneRound() {
if (finished) {
return;
}

long deletedPathCount = 0;
while (currentDirIndex < stalePipeDirs.size()) {
final File stalePipeDir = stalePipeDirs.get(currentDirIndex);
final CleanupRoundResult result = deleteQuietlyWithThrottle(stalePipeDir);
deletedPathCount += result.deletedPathCount;

if (result.finished) {
LOGGER.info(
DataNodePipeMessages.PIPE_HARDLINK_DIR_PERIODICAL_DELETE_FINISHED,
stalePipeDir,
result.success);
++currentDirIndex;
continue;
}

if (deletedPathCount > 0 || !result.success) {
LOGGER.info(
DataNodePipeMessages.PIPE_HARDLINK_DIR_PERIODICAL_DELETE_PROGRESS,
deletedPathCount,
stalePipeDir,
result.success);
}
return;
}

finished = true;
LOGGER.info(DataNodePipeMessages.PIPE_HARDLINK_DIR_PERIODICAL_DELETE_ALL_FINISHED);
}
}

private static class CleanupRoundResult {

private final long deletedPathCount;
private final boolean finished;
private final boolean success;

private CleanupRoundResult(
final long deletedPathCount, final boolean finished, final boolean success) {
this.deletedPathCount = deletedPathCount;
this.finished = finished;
this.success = success;
}

private static CleanupRoundResult finished() {
return new CleanupRoundResult(0, true, true);
}
}

private static void cleanSnapshotDir() {
Expand Down
Loading