diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala index c44a5d30cafe6..39764eec58230 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcPartitionReaderFactory.scala @@ -23,7 +23,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileSplit import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.orc.{OrcConf, OrcFile, Reader, TypeDescription} import org.apache.orc.mapred.OrcStruct -import org.apache.orc.mapreduce.OrcInputFormat +import org.apache.orc.mapreduce.{OrcInputFormat, OrcMapreduceRecordReader} import org.apache.spark.broadcast.Broadcast import org.apache.spark.memory.MemoryMode @@ -89,7 +89,8 @@ case class OrcPartitionReaderFactory( } val filePath = file.toPath - val orcSchema = Utils.tryWithResource(createORCReader(filePath, conf)._1)(_.getSchema) + val (orcSchemaReader, readerOptions) = createORCReader(filePath, conf) + val orcSchema = Utils.tryWithResource(orcSchemaReader)(_.getSchema) val resultedColPruneInfo = OrcUtils.requestedColumnIds( isCaseSensitive, dataSchema, readDataSchema, orcSchema, conf) @@ -104,11 +105,21 @@ case class OrcPartitionReaderFactory( val taskConf = new Configuration(conf) val fileSplit = new FileSplit(filePath, file.start, file.length, Array.empty) - val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) - val taskAttemptContext = new TaskAttemptContextImpl(taskConf, attemptId) - val orcRecordReader = new OrcInputFormat[OrcStruct] - .createRecordReader(fileSplit, taskAttemptContext) + val orcRecordReader = { + val fs = filePath.getFileSystem(taskConf) + val orcReader = OrcFile.createReader( + filePath, + OrcFile.readerOptions(taskConf) + .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(taskConf)) + .filesystem(fs) + .orcTail(readerOptions.getOrcTail)) + val options = org.apache.orc.mapred.OrcInputFormat + .buildOptions(taskConf, orcReader, fileSplit.getStart, fileSplit.getLength) + .useSelected(true) + new OrcMapreduceRecordReader[OrcStruct](orcReader, options) + } + val deserializer = new OrcDeserializer(readDataSchema, requestedColIds) val fileReader = new PartitionReader[InternalRow] { override def next(): Boolean = orcRecordReader.nextKeyValue()