diff --git a/common/utils/src/main/scala/org/apache/spark/util/SparkFileUtils.scala b/common/utils/src/main/scala/org/apache/spark/util/SparkFileUtils.scala index 3f1f9c1f9df7f..6629c8bf7e7c1 100644 --- a/common/utils/src/main/scala/org/apache/spark/util/SparkFileUtils.scala +++ b/common/utils/src/main/scala/org/apache/spark/util/SparkFileUtils.scala @@ -21,6 +21,8 @@ import java.net.{URI, URISyntaxException, URL} import java.nio.file.{Files, Path, StandardCopyOption} import java.nio.file.attribute.FileTime +import scala.collection.mutable + import org.apache.spark.internal.{Logging, LogKeys} import org.apache.spark.network.util.JavaUtils @@ -62,13 +64,22 @@ private[spark] trait SparkFileUtils extends Logging { */ def recursiveList(f: File): Array[File] = { require(f.isDirectory) - val result = f.listFiles.toBuffer - val dirList = result.filter(_.isDirectory) - while (dirList.nonEmpty) { - val curDir = dirList.remove(0) - val files = curDir.listFiles() - result ++= files - dirList ++= files.filter(_.isDirectory) + val result = mutable.ArrayBuffer[File]() + // Use a queue with O(1) dequeue rather than removing from the head of a buffer (O(n)), so the + // walk stays linear in the number of entries. + val dirs = mutable.Queue[File](f) + while (dirs.nonEmpty) { + val dir = dirs.dequeue() + // `File.listFiles` returns null when the directory cannot be read (an IO error, or it was + // removed during the walk); skip it instead of throwing an NPE, but log it so a partial + // result is traceable. + val entries = dir.listFiles() + if (entries != null) { + result ++= entries + dirs ++= entries.filter(_.isDirectory) + } else { + logWarning(log"Failed to list directory ${MDC(LogKeys.PATH, dir)}; skipping it.") + } } result.toArray } diff --git a/common/utils/src/test/scala/org/apache/spark/util/SparkFileUtilsSuite.scala b/common/utils/src/test/scala/org/apache/spark/util/SparkFileUtilsSuite.scala new file mode 100644 index 0000000000000..45fcb170eb0ed --- /dev/null +++ b/common/utils/src/test/scala/org/apache/spark/util/SparkFileUtilsSuite.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.util + +import java.io.File +import java.nio.file.Files + +import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite + +class SparkFileUtilsSuite extends AnyFunSuite { // scalastyle:ignore funsuite + + test("recursiveList lists all nested files and directories") { + val root = Files.createTempDirectory("spark-recursive-list").toFile + try { + val sub = new File(root, "sub") + assert(sub.mkdir()) + val nested = new File(sub, "nested") + assert(nested.mkdir()) + val topFile = new File(root, "top.txt") + assert(topFile.createNewFile()) + val subFile = new File(sub, "sub.txt") + assert(subFile.createNewFile()) + + assert(SparkFileUtils.recursiveList(root).toSet === Set(sub, nested, topFile, subFile)) + } finally { + SparkFileUtils.deleteQuietly(root) + } + } + + test("recursiveList returns empty instead of throwing when a directory cannot be listed") { + // A directory whose listFiles returns null must yield an empty result, not an NPE. + val unreadable = new File("spark-unreadable-dir") { + override def isDirectory: Boolean = true + override def listFiles(): Array[File] = null + } + assert(SparkFileUtils.recursiveList(unreadable).isEmpty) + } + + test("recursiveList skips a subdirectory whose listFiles returns null mid-walk") { + // The null directory is not the root but one discovered during the walk, so the guard must + // hold at depth > 0. `unreadableSub` is a directory whose listFiles returns null; `leaf` is a + // plain file that must be returned but never recursed into (a spy verifies that). + var leafListed = false + val leaf = new File("leaf.txt") { + override def isDirectory: Boolean = false + override def listFiles(): Array[File] = { leafListed = true; super.listFiles() } + } + val unreadableSub = new File("unreadable-sub") { + override def isDirectory: Boolean = true + override def listFiles(): Array[File] = null + } + val root = new File("root") { + override def isDirectory: Boolean = true + override def listFiles(): Array[File] = Array(leaf, unreadableSub) + } + assert(SparkFileUtils.recursiveList(root).toSet === Set(leaf, unreadableSub)) + assert(!leafListed, "a non-directory entry must not be recursed into") + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index 593b2ef7951fd..ea28e34c48202 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -1048,7 +1048,7 @@ class RocksDBFileManager( /** Log the files present in a directory. This is useful for debugging. */ private def logFilesInDir(dir: File, msg: MessageWithContext): Unit = { - lazy val files = Option(Utils.recursiveList(dir)).getOrElse(Array.empty).map { f => + lazy val files = Utils.recursiveList(dir).map { f => s"${f.getAbsolutePath} - ${f.length()} bytes" } logDebug(msg + log" - ${MDC(LogKeys.NUM_FILES, files.length)} files\n\t" +