Scheduler

DAGScheduler

DAGScheduler实现了面向stage的调度层. 它将每个job的stage划分成DAG, 追踪RDD和stage输出, 找出一个可以运行作业的最小化调度方法. 接下来就将每个stage以TaskSet提交给TaskScheduler执行.

为了构造出stage的DAG图, 该类还需要根据当前的cache状态, 计算出每个task优先跑在那个节点上, 并将该location传递给低层的TaskScheduler. 此外, 它需要处理shuffule输出文件丢失的异常, 此时前一个stage可能会被重新提交. 不是由shuffle文件丢失引起的错误将会由TaskScheduler处理, 这些task在取消整个stage之前会被重试多次.

THREADING: 该类的所有逻辑操作都是在执行run()方法的单线程完成的, 事件都被提交到一个同步队列(eventQueue)中. public的API方法, 如runJob, taskEnded和executorLost, 都会异步的把事件放入到这个队列中. 所有其他的方法都应该是private的.

先看start方法. start方法启动了eventProcessorActor, 该actor有两个职责: - 等待事件, 如作业提交, 作业结束, 作业失败等. 调用[[org.apache.spark.scheduler.DAGScheduler.processEvent()]]来处理事件. - 调度周期task来重新提交失败的stage.

注意: 该actor不能在构造函数里启动, 因为周期任务参考了一些内部状态的封闭[[org.apache.spark.scheduler.DAGScheduler]]对象, 因此在[[org.apache.spark.scheduler.DAGScheduler]完全构造好之前不能调度.

def start() {
    eventProcessActor = env.actorSystem.actorOf(Props(new Actor {

        def receive = {
            case event: DAGSchedulerEvent =>
                logTrace("Get event of type " + event.getClass.getName)

                /**
                 * All events are fowarded to `processEvent()`, so that the event processing logic can
                 * easily tested without starting a dedicated actor. Please refer to `DAGSchedulerSuite`
                 * for details.
                 * /
                if (!processEvent(event)) {
                    submitWaitingStages()
                } else {
                    context.stop(self)
                }
        }
    }))
}

可见processEvent是所有事件的入口, 需要注意一下这个函数的返回值, 如果是true表示事件循环可以结束了. processEvent主要针对不同的事件做不同的处理:

SparkListenrBus

主要成员变量有

// 一个SparkListener的数组
private val sparkListeners = new ArrayBuffer[SparkListener] with SynchronizedBuffer[SparkListner]
// 一个事件队列
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents](EVENT_QUEUE_CAPACITY)

主线程(守护)一直尝试从事件队列里取出事件, 然后对每个SparkListener执行该事件.

主要方法: post方法就是往事件队列里面push事件

def post(event: SparkListenerEvents) {
    val eventAdded = eventQueue.offer(event)
    if (!eventAdded && !queueFullErrorMessageLogged) {
        queueFullErrorMessageLoged = true
    }
}

SparkListener

首先定义了一系列SparkListenerEvents:

特质SparkListener主要定义了一堆onCompletion接口.

StatsReportListener类主要记录了当每个stage完成时的一些统计信息, 因此只实现了一个onStageCompleted接口.伴生对象StatReportListener实现了一堆metrics获取的方法.

TaskScheduler && TaskSchedulerImpl

TaskScheduler提供了更底层的task调度接口, 目前是由ClusterScheduler实现的. 这些接口定义允许用户自定义可插拔的task调度器. 每一个TaskScheduler为一个独立的SparkContext调度任务. 这些调度器会从DAGScheduler的每个stage获取一个task集合, 然后将这些task发送给集群, 运行, 在发生错误时进行重试, 并且减轻落后者. 最终以事件的形式将结果反馈给DAGScheduler. 主要成员变量与接口:

TaskSchedulerImpl是TaskScheduler接口的一个实现. 通过SchedulerBackend来为多类cluster调度任务. 也可以通过配置使用LocalBackend. 它主要负责普通逻辑, 如决定作业之间的调度顺序, 唤醒speculative任务等. Clients必须先调用initialize和start, 然后通过runTasks方法提交task集合.

THREADING: SchedulerBackends和提交任务的客户端可以从不同的线程中调用该类, 所以在public API方法中需要加锁. 此外, 一些SchedulerBackends在发送事件时可以在这里进行同步, 它们在这里会获取锁, 所谓我们必须确保在持有锁时不会尝试锁住backend

成员变量和接口:

TaskSchedulerImlp的伴生对象, 仅提供了一个prioritizeContainers方法. 这个方法是用于跨机器均衡containers的. 该方法传入一个host和host拥有资源的map, 返回一个资源分配的顺序列表. 注意传入的host资源已经是有序的, 因此我们在同一台机器上我们优先分配前面的container.

示例: 给定 <h1, [o1, o2, o3]>, <h2, [o4]>, <h1, [o5, o6]>, 返回[o1, o5, o4, o2, o6, o3].

SchedulerBackend

调度系统的后台接口, 允许在ClusterScheduler下挂载不同的调度器. 我们假定Mesos-like的系统, 可以从机器上获取资源并在上面运行任务. 接口: - start

Schedulable

特质, 定义了可调度实体的接口, 实际有两类可调度实体, 分别是Pools和TaskSetManagers.

Pool

后面提供的一些接口就是对这两个数据集的增删改查.

TaskSetManagers(TODO)

SchedulableBuilder

特质, 构建Schedulable tree的接口, 总共就两个接口: - buildPools 用来构建所有树的节点

针对该特质提供了两个实现: - FIFOSchedulableBuilder - buildPools 空

- addTaskSetManager 直接调用rootPool的addSchedulable接口

TaskResultGetter

TaskSet

Task

TaskInfo

Stage

Stage是一个Spark job中运行相同计算逻辑的一组独立task的集合, 这些task的shuffle依赖都相同. 每个task的DAG都被切分成不同的stage, 不同的stage是以shuffle发生为边界的. DAGScheduler会以逻辑顺序运行这些stage.

Stage就分两种, 一种是shuffle map stage, 这类task的结果是另一阶段task的输入; 另一种是result stage, 这类task可以直接完成计算行为如初始化job(e.g. count(), save(), etc). 对于shuffle map stage, 我们还要追踪每个输出partition在哪些节点上.

每个Stage都有一个jobId, 可以和提交该stage的job做区分. 在使用FIFO调度时, 这使得先来job的stage可以先被计算或者快速回复.

StageInfo

保存所有从scheduler发送给SparkListener的stage信息. 其中taskInfos保存了所有已完成的task的metrics信息, 包括冗余的, 特殊的task. 两个元素, stage和taskInfos. 其中taskInfos是mutable.Buffer[(TaskInfo, TaskMetrics)]类型.