/** * Read a text file from HDFS, a local file system (available on all nodes), or any * Hadoop-supported file system URI, and return it as an RDD of Strings. */ deftextFile( path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { assertNotStopped() hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString).setName(path) }
textFile函数读取数据的n种姿势:
读取当前目录下的一个文件
1 2
val path = "hello.txt" val rdd = sc.textFile(path)
读取当前目录下的多个文件
1 2
val path = "hello1.txt,hello2.txt" val rdd = sc.textFile(path)
从本地文件系统读取一个文件
1 2
val path = "file:///usr/local/spark/hello.txt"//local file val rdd1 = sc.textFile(path)
读取hdfs的一个目录
1 2
val path = "hdfs://xxx/xxx/traindata/" val rdd = sc.textFile(path)
通配符
1 2
val path = "hdfs://xxx/xxx/traindata/2019051120*" val rdd = sc.textFile(path)
/** Get an RDD for a Hadoop file with an arbitrary InputFormat * * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle * operation will create many references to the same object. * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first * copy them using a `map` function. */ defhadoopFile[K, V]( path: String, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope { assertNotStopped() // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. val confBroadcast = broadcast(newSerializableConfiguration(hadoopConfiguration)) val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path) newHadoopRDD( this, confBroadcast, Some(setInputPathsFunc), inputFormatClass, keyClass, valueClass, minPartitions).setName(path) }
abstractclassRDD[T: ClassTag]( @transient private var _sc: SparkContext, @transient private var deps: Seq[Dependency[_]] ) extendsSerializablewithLogging{
/** * Return a new RDD by applying a function to all elements of this RDD. */ defmap[U: ClassTag](f: T => U): RDD[U] = withScope { // clean方法实际上调用了ClosureCleaner的clean方法,旨在清除闭包中的不能序列化的变量,防止RDD在网络传输过程中反序列化失败[1] val cleanF = sc.clean(f) newMapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) } }
/** * An RDD that applies the provided function to every partition of the parent RDD. */ private[spark] classMapPartitionsRDD[U: ClassTag, T: ClassTag]( var prev: RDD[T], f: (TaskContext, Int, Iterator[T]) =>Iterator[U], // (TaskContext, partition index, iterator) preservesPartitioning: Boolean = false) extendsRDD[U](prev) {
overrideval partitioner = if (preservesPartitioning) firstParent[T].partitioner elseNone
/** * Internal method to this RDD; will read from cache if applicable, or otherwise compute it. * This should ''not'' be called by users directly, but is available for implementors of custom * subclasses of RDD. */ finaldefiterator(split: Partition, context: TaskContext): Iterator[T] = { if (storageLevel != StorageLevel.NONE) { SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel) } else { computeOrReadCheckpoint(split, context) } }
/** * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing. */ private[spark] defcomputeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = { if (isCheckpointedAndMaterialized) { firstParent[T].iterator(split, context) } else { compute(split, context) } }
overridedefcompute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = { val iter = newNextIterator[(K, V)] {
val split = theSplit.asInstanceOf[HadoopPartition] logInfo("Input split: " + split.inputSplit) val jobConf = getJobConf()
val inputMetrics = context.taskMetrics.getInputMetricsForReadMethod(DataReadMethod.Hadoop)
// Sets the thread local variable for the file's name split.inputSplit.value match { case fs: FileSplit => SqlNewHadoopRDDState.setInputFileName(fs.getPath.toString) case _ => SqlNewHadoopRDDState.unsetInputFileName() }
// Find a function that will return the FileSystem bytes read by this thread. Do this before // creating RecordReader, because RecordReader's constructor might read some bytes val bytesReadCallback = inputMetrics.bytesReadCallback.orElse { split.inputSplit.value match { case _: FileSplit | _: CombineFileSplit => SparkHadoopUtil.get.getFSBytesReadOnThreadCallback() case _ => None } } inputMetrics.setBytesReadCallback(bytesReadCallback)
var reader: RecordReader[K, V] = null val inputFormat = getInputFormat(jobConf) HadoopRDD.addLocalConfiguration(newSimpleDateFormat("yyyyMMddHHmm").format(createTime), context.stageId, theSplit.index, context.attemptNumber, jobConf) reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
// Register an on-task-completion callback to close the input stream. context.addTaskCompletionListener{ context => closeIfNeeded() } val key: K = reader.createKey() val value: V = reader.createValue()
overridedefclose() { if (reader != null) { SqlNewHadoopRDDState.unsetInputFileName() // Close the reader and release it. Note: it's very important that we don't close the // reader more than once, since that exposes us to MAPREDUCE-5918 when running against // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic // corruption issues when reading compressed input. try { reader.close() } catch { case e: Exception => if (!ShutdownHookManager.inShutdown()) { logWarning("Exception in RecordReader.close()", e) } } finally { reader = null } if (bytesReadCallback.isDefined) { inputMetrics.updateBytesRead() } elseif (split.inputSplit.value.isInstanceOf[FileSplit] || split.inputSplit.value.isInstanceOf[CombineFileSplit]) { // If we can't get the bytes read from the FS stats, fall back to the split size, // which may be inaccurate. try { inputMetrics.incBytesRead(split.inputSplit.value.getLength) } catch { case e: java.io.IOException => logWarning("Unable to get input size to set InputMetrics for task", e) } } } } } newInterruptibleIterator[(K, V)](context, iter) }