Contents
  1. 1. node
  2. 2. Spark任务调度模块
    1. 2.1. DAGScheduler
    2. 2.2. TaskScheduler
  3. 3. Spark调度模式:
    1. 3.1. FIFO:FIFOSchedulingAlgorithm (才有pool)
    2. 3.2. FAIR:FairSchedulingAlgorithm

node

  • Application:用户编写的Spark应用程序,由一个或多个Job组成。提交到Spark之后,Spark会为- Application分配资源,将程序进行转换并执行。
  • Job(作业):由Action算子触发生成的由一个或多个Stage组成的计算作业。
  • Stage(调度阶段):每个Job会根据RDD的宽依赖被切分为多个Stage,每个Stage都包含一个TaskSet。
  • TaskSet(任务集):一组关联的,但相互之间没有shuffle依赖关系的Task集合。一个TaskSet对应的调度阶- 段。
  • Task(任务):RDD中的一个分区对应一个Task,Task是单个分区上最小的处理流程单元。

Spark任务调度模块

DAGScheduler和TaskScheduler

DAGScheduler

DAGScheduler:主要负责分析用户提交的应用,并根据计算任务的依赖关系建立DAG,然后将DAG划分为不同的Stage,并以TaskSet的形式把Stage提交给TaskScheduler。其中每个Stage由可以并发执行的一组Task构成,这些Task的执行逻辑完全相同,只是作用于不同的数据。

TaskScheduler

TaskScheduler:负责Application中不同job之间的调度,将TaskSet提交给Worker执行并返回结果,在Task执行失败时启动重试机制,并且为执行速度慢的Task启动备份的任务。

val rootPool: Pool = new Pool(“”, schedulingMode, 0, 0)

FIFO时,

  • rootPool
    • TaskSetManager…

FAIR时,

  • rootPool
    • customPool0
    • customPool1
    • default(如果没有自定义覆盖,则使用以下配置)
      • FIFO,minshare=0,weight=1

Spark调度模式:

调度对象:Pools and TaskSetManagers

Pools管理多个TaskSetManager

Pool:

  • priority = 0
  • stageId = -1
  • name = default(default)
  • weight = 1(default)
  • minShare = 0(default) (as a number of CPU cores)

TaskSetManager:

  • priority = jobID
  • stageId = stageId
  • name = TaskSet_<stageId>.<stageAttemptId>
  • weight = 1
  • minShare = 0

FIFO:FIFOSchedulingAlgorithm (才有pool)

先进先出调度模式(默认)。FIFO调度会根据StageID和JobID的大小来调度,数值较小的任务优先被调度。FIFO调度方式存在一个缺点:当遇到一个耗时较长的任务时,后续任务必须等待这个耗时任务执行完成才能得到可用的计算资源。

  1. 比较priority(实际为jobID)
  2. 比较stageId

FAIR:FairSchedulingAlgorithm

公平调度模式。FAIR模式下每个计算任务具有相等的优先级,Spark以轮询的方式为每个任务分配计算资源。FAIR不像FIFO那样必须等待前面耗时任务完成后后续任务才能执行。在FAIR模式下,无论是耗时短任务还是耗时长任务、无论是先提交的任务还是后提交的任务都可以公平的获得资源执行,这样就提高了耗时短的任务的响应时间。FAIR比FIFO更加灵活,FAIR模式为用户提供了一个调度池的概念,用户可以将重要的计算任务放入一个调度池Pool中,通过设置该调度池的权重来使该调度池中的计算任务获得较高的优先级。

  1. 任一方A运行的任务数<minShare时,且另一方运行的任务≥minShare时,A优先
  2. 双方运行的任务数<minShare时,计算比较值(任务数/minShare),小的优先
  3. 双方运行的任务数≥ minShare时,计算比较值(任务数/weight),小的优先
  4. 若比较值相同,则比较调度对象名称字符串,小的优先

sh start-thriftserver.sh –master yarn –deploy-mode client –conf spark.scheduler.allocation.file=”file:///data/soft/spark/conf/fairscheduler_test.xml”

fairscheduler_test.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
<?xml version="1.0"?>
<allocations>
<pool name="default">
<schedulingMode>FAIR</schedulingMode>
<weight>3</weight>
<minShare>5</minShare>
</pool>
<pool name="quick">
<schedulingMode>FIFO</schedulingMode>
<weight>1</weight>
<minShare>5</minShare>
</pool>
</allocations>

SET spark.sql.thriftserver.scheduler.pool=quick;