最近因为工作需要,打算阅读一下spark的源码。首先研究一下spark是如何读入数据的。关于spark读取数据的机制,其实知乎上已经有大牛回答了,见@连城(内存有限的情况下 Spark 如何处理 T 级别的数据?),如果对spark源码已经有一定的了解,该回答还是非常形象易懂的。

一 阅读spark源码的姿势

因为我并非系统地研究spark,只是想在需要的时候查询一下某个函数的实现,所以可以直接在IDEA中,选中某个函数,使用快捷键ctrl+b(mac下是cmd+b),查看该函数的实现,这时候会提示 download source code,按照提示下载之后,再使用该命令就可以查看函数实现。

二 spark数据读取机制

2.1 textFile函数

textFile函数是spark中的数据读取函数,其path参数可以是HDFS,本地文件,或者其它hadoop支持的文件系统地URL,其返回类型是 RDD[String]。

minPartitions= math.min(defaultParallelism, 2) 是指定数据的分区,如果不指定分区,当你的核数大于2的时候,不指定分区数那么就是 2

1
2
3
4
5
6
7
8
9
10
11
/**
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}

textFile函数读取数据的n种姿势:

  1. 读取当前目录下的一个文件
1
2
val path = "hello.txt"
val rdd = sc.textFile(path)
  1. 读取当前目录下的多个文件
1
2
val path = "hello1.txt,hello2.txt"
val rdd = sc.textFile(path)
  1. 从本地文件系统读取一个文件
1
2
val path = "file:///usr/local/spark/hello.txt"  //local file
val rdd1 = sc.textFile(path)
  1. 读取hdfs的一个目录
1
2
val path = "hdfs://xxx/xxx/traindata/"
val rdd = sc.textFile(path)
  1. 通配符
1
2
val path = "hdfs://xxx/xxx/traindata/2019051120*"
val rdd = sc.textFile(path)

2.2 RDD 的转换

之所以要讲spark rdd的转换和计算流程(章节2.3),是因为spark数据读取机制是基于spark rdd转换和计算的。众所周知,spark是惰性计算的,在spark中有两种操作:transform 和 action。spark会将所有的transform操作连接成图,如果遇到action操作就计算该图。也就是说,使用textFile函数并非立即读取数据的,而是等到执行action操作的时候才会真正地读取数据。

首先给出结论:在spark内部,单个executor进程内rdd的分片数据是流式访问的。以下面代码为例:

1
2
3
var rdd = sc.textFile("hdfs://xxx/hello.txt");  
var rdd_new = rdd.map(_.split(","));
print(rdd_new.count());

上面代码比较简单,不再介绍。我们再看一下textFile函数的实现:

1
2
3
4
5
6
7
8
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String]
= withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}

可以看出,textFile函数的核心是,调用了一个hadoopFile函数,然后进行了一个map操作,获取了第二个元素。hadoopFile的参数除了path之外,还有 classOf[TextInputFormat], classOf[LongWritable], classOf[Text]。如果有过hadoop的开发经验,肯定对这三个参数比较熟悉。最后还有一个minPartitions参数。

然后我们看一下hadoopFile对象是什么样子的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/** Get an RDD for a Hadoop file with an arbitrary InputFormat
*
* '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
assertNotStopped()
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)
}

可以看到,hadoopFile返回了k-v格式的rdd。在hadoopFile内部,首先对hadoopConfiguration进行了广播,然后设置了一个setInputPathsFunc 函数。最重要的,在hadoopFile函数内部,新建了一个HadoopRDD对象并返回。也就是说,textFile函数中实际上是调用了HadoopRDD对象的map函数。而通过查看HadoopRDD的源码可以发现,class HadoopRDD是继承自抽象类RDD的。此外,在类HadoopRDD中,并没有重写类RDD中的map,reduce函数,也就是说,所有继承了RDD类的对象都是直接调用抽象类RDD中的map,reduce等方法。

在抽象类中的map方法长这个样子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
abstract class RDD[T: ClassTag](
@transient private var _sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {

/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
// clean方法实际上调用了ClosureCleaner的clean方法,旨在清除闭包中的不能序列化的变量,防止RDD在网络传输过程中反序列化失败[1]
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
}

可以看出,map方法首先对穿传进来的函数进行clean操作,然后构造了一个MapPartitionsRDD对象,它的参数我们一会再分析。这里可以确定一点,即到目前为止并没有进行数据读取,计算操作。

再看第二行代码:

1
var rdd_new = rdd.map(_.split(","));

通过上面的分析可知,这里的rdd其实是一个MapPartitionsRDD对象,同样地,在执行了map操作之后,也返回了一个MapPartitionsRDD对象。

再看最后一行代码:

1
print(rdd_new.count());

这里的count是一个action操作。前面提到,action操作会触发spark的图计算操作。我们可以看一下count的实现:

1
2
3
4
/**
* Return the number of elements in the RDD.
*/
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

可以看到,在count内部执行了sc.runJob()操作。runJob函数会触发DagScheduler去分解任务并提交到集群执行。

2.3 RDD的计算

在action操作触发了spark的图计算之后,该任务将会被分解执行。具体地,task首先会执行最后一个rdd的compute方法。在上述代码中,最后一个rdd是一个MapPartitionsRDD对象。我们看一下MapPartitionsRDD的compute函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* An RDD that applies the provided function to every partition of the parent RDD.
*/
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
var prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
preservesPartitioning: Boolean = false)
extends RDD[U](prev) {

override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None

override def getPartitions: Array[Partition] = firstParent[T].partitions

override def compute(split: Partition, context: TaskContext): Iterator[U] =
f(context, split.index, firstParent[T].iterator(split, context))

override def clearDependencies() {
super.clearDependencies()
prev = null
}
}

可以看到,compute函数的参数有两个:分区 split 和 Task上下文,在compute函数内部实际上调用了 f 函数。f函数是怎么来的呢?它其实是类MapPartitionsRDD的构造函数的参数,在构造MapPartitionsRDD对象的时候被传递进来的。上面分析提到,MapPartitionsRDD对象是在抽象类RDD的map函数内部构造的,f 函数也是在该地方实现的。f函数的参数有三个:第一个是task上下文,第二个是分区索引,第三个是父rdd的迭代器。

1
2
3
4
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f);
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF));
}

可以看到,f 函数的实现就是 (this, (context, pid, iter) => iter.map(cleanF) 部分,f函数实际上调用了第三个函数参数的map方法,map方法的参数是用户调用map函数时传入的函数,本例中是split()。刚刚已经提到,f函数的第三个参数是父rdd(因为该MapPartitionsRDD的父rdd也是MapPartitionsRDD,而MapPartitionsRDD中没有iterator函数的实现,所以MapPartitionsRDD实际上调用了抽象类RDD中的)的迭代器方法iterator,我们可以看一下它是什么样子。

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
* This should ''not'' be called by users directly, but is available for implementors of custom
* subclasses of RDD.
*/

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
} else {
computeOrReadCheckpoint(split, context)
}
}

可以看到,iterator函数首先判断,该rdd的storageLevel是否为NONE,如果不为NONE,则尝试从缓存中读取数据,如果缓存中没有,则通过计算获取对应分区数据的迭代器。如果该rdd的storageLevel为NONE,则尝试从checkpoint获取对应分区数据的迭代器,如果checkpoint不存在则通过计算获取。

iterator会返回一个迭代器,可以通过该迭代器访问父rdd的某个分区中的每个元素。如果内存在没有父rdd的数据,则调用父rdd的compute方法进行计算。

我们可以再看一下computeOrReadCheckpoint这个方法 ( 这个方法比较简单,好分析 ) 。

1
2
3
4
5
6
7
8
9
10
11
/**
* Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
*/
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
if (isCheckpointedAndMaterialized) {
firstParent[T].iterator(split, context)
} else {
compute(split, context)
}
}

可以看到,在computeOrReadCheckpoint内部,通过调用父rdd的compute方法获取父rdd的split分区的迭代器。

可以想象,经过一层层transform操作溯源之后,最终会调用类HadoopRDD中的compute函数,它长这个样子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
val iter = new NextIterator[(K, V)] {

val split = theSplit.asInstanceOf[HadoopPartition]
logInfo("Input split: " + split.inputSplit)
val jobConf = getJobConf()

val inputMetrics = context.taskMetrics.getInputMetricsForReadMethod(DataReadMethod.Hadoop)

// Sets the thread local variable for the file's name
split.inputSplit.value match {
case fs: FileSplit => SqlNewHadoopRDDState.setInputFileName(fs.getPath.toString)
case _ => SqlNewHadoopRDDState.unsetInputFileName()
}

// Find a function that will return the FileSystem bytes read by this thread. Do this before
// creating RecordReader, because RecordReader's constructor might read some bytes
val bytesReadCallback = inputMetrics.bytesReadCallback.orElse {
split.inputSplit.value match {
case _: FileSplit | _: CombineFileSplit =>
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
case _ => None
}
}
inputMetrics.setBytesReadCallback(bytesReadCallback)

var reader: RecordReader[K, V] = null
val inputFormat = getInputFormat(jobConf)
HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime),
context.stageId, theSplit.index, context.attemptNumber, jobConf)
reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)

// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener{ context => closeIfNeeded() }
val key: K = reader.createKey()
val value: V = reader.createValue()

override def getNext(): (K, V) = {
try {
finished = !reader.next(key, value)
} catch {
case eof: EOFException =>
finished = true
}
if (!finished) {
inputMetrics.incRecordsRead(1)
}
(key, value)
}

override def close() {
if (reader != null) {
SqlNewHadoopRDDState.unsetInputFileName()
// Close the reader and release it. Note: it's very important that we don't close the
// reader more than once, since that exposes us to MAPREDUCE-5918 when running against
// Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic
// corruption issues when reading compressed input.
try {
reader.close()
} catch {
case e: Exception =>
if (!ShutdownHookManager.inShutdown()) {
logWarning("Exception in RecordReader.close()", e)
}
} finally {
reader = null
}
if (bytesReadCallback.isDefined) {
inputMetrics.updateBytesRead()
} else if (split.inputSplit.value.isInstanceOf[FileSplit] ||
split.inputSplit.value.isInstanceOf[CombineFileSplit]) {
// If we can't get the bytes read from the FS stats, fall back to the split size,
// which may be inaccurate.
try {
inputMetrics.incBytesRead(split.inputSplit.value.getLength)
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
}
}
}
}
}
new InterruptibleIterator[(K, V)](context, iter)
}

三 Spark在内存有限情况下的执行机制

正如知乎连城所说,spark内部使用迭代器流式访问数据,用这个Iterator访问整个数据集,空间复杂度是O(1)。可见,Spark RDD的immutable语义并不会造成大数据内存计算任务的庞大内存开销。然而,如果spark任务被划分为多个stage,那么在 stage 0 中,通过流式访问的机制,如果内存足够大,可以容纳经过各种transform操作之后的数据,那么 stage 0 并不需要考虑内存的问题,但是如果 stage 0 和 stage 1 是通过reduce操作划分的,那么就会涉及shuffle。

在 shuffle 过程中,前一个 stage 的 ShuffleMapTask 进行 shuffle write, 把数据存储在 blockManager 上面,并且把数据位置元信息上报到 driver 的 mapOutTrack 组件中,下一个 stage 根据数据位置元信息,进行 shuffle read, 拉取上个 stage 的输出数据。例如:在原始数据中有10个字段(大小100g),在 stage 0 中经过各种transform后只留下4个字段(大小40g),那么stage 0 的ShuffleMapTask 进行 shuffle write,将40g的数据写到blockManager中,供 stage 1 进行shuffle读。

引用 知乎连城的回答:

在Spark内部,单个executor进程内RDD的分片数据是用Iterator流式访问的,Iterator的hasNext方法和next方法是由RDD lineage上各个transformation携带的闭包函数复合而成的。该复合Iterator每访问一个元素,就对该元素应用相应的复合函数,得到的结果再流式地落地(对于shuffle stage是落地到本地文件系统留待后续stage访问,对于result stage是落地到HDFS或送回driver端等等,视选用的action而定)。如果用户没有要求Spark cache该RDD的结果,那么这个过程占用的内存是很小的,一个元素处理完毕后就落地或扔掉了(概念上如此,实现上有buffer),并不会长久地占用内存。只有在用户要求Spark cache该RDD,且storage level要求在内存中cache时,Iterator计算出的结果才会被保留,通过cache manager放入内存池。

参考文献

  1. Spark从外部读取数据之textFile
  2. spark中ClosureClean中的clean方法
  3. Spark RDD深度解析-RDD计算流程
  4. 内存有限的情况下 Spark 如何处理 T 级别的数据?
  5. 彻底搞懂 Spark 的 shuffle 过程(shuffle write)