Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileSplit
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.orc.{OrcConf, OrcFile, Reader, TypeDescription}
import org.apache.orc.mapred.OrcStruct
import org.apache.orc.mapreduce.OrcInputFormat
import org.apache.orc.mapreduce.{OrcInputFormat, OrcMapreduceRecordReader}

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.memory.MemoryMode
Expand Down Expand Up @@ -89,7 +89,8 @@ case class OrcPartitionReaderFactory(
}
val filePath = file.toPath

val orcSchema = Utils.tryWithResource(createORCReader(filePath, conf)._1)(_.getSchema)
val (orcSchemaReader, readerOptions) = createORCReader(filePath, conf)
val orcSchema = Utils.tryWithResource(orcSchemaReader)(_.getSchema)
val resultedColPruneInfo = OrcUtils.requestedColumnIds(
isCaseSensitive, dataSchema, readDataSchema, orcSchema, conf)

Expand All @@ -104,11 +105,21 @@ case class OrcPartitionReaderFactory(
val taskConf = new Configuration(conf)

val fileSplit = new FileSplit(filePath, file.start, file.length, Array.empty)
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val taskAttemptContext = new TaskAttemptContextImpl(taskConf, attemptId)

val orcRecordReader = new OrcInputFormat[OrcStruct]
.createRecordReader(fileSplit, taskAttemptContext)
val orcRecordReader = {
val fs = filePath.getFileSystem(taskConf)
val orcReader = OrcFile.createReader(
filePath,
OrcFile.readerOptions(taskConf)
.maxLength(OrcConf.MAX_FILE_LENGTH.getLong(taskConf))
.filesystem(fs)
.orcTail(readerOptions.getOrcTail))
val options = org.apache.orc.mapred.OrcInputFormat
.buildOptions(taskConf, orcReader, fileSplit.getStart, fileSplit.getLength)
.useSelected(true)
new OrcMapreduceRecordReader[OrcStruct](orcReader, options)
}

val deserializer = new OrcDeserializer(readDataSchema, requestedColIds)
val fileReader = new PartitionReader[InternalRow] {
override def next(): Boolean = orcRecordReader.nextKeyValue()
Expand Down