Contents
  1. 1. 入口
  2. 2. 查询
  3. 3. CarbonScanRDD
    1. 3.1. internalGetPartitions
    2. 3.2. internalCompute

入口

CarbonSession继承SparkSession,查询时我们使用的是CarbonSession。

1
2
3
4
5
6
7
8
override def sql(sqlText: String): DataFrame = {
withProfiler(
sqlText,
(qe, sse) => {
new Dataset[Row](self, qe, RowEncoder(qe.analyzed.schema))
}
)
}

这里除了增加withProfiler用于统计耗时外与SparkSQL原来的处理实质是一样的,只是为了获取时间调整了一下代码。

在withProfiler中有一行代码实现string解析成LogicalPlan:

val logicalPlan = sessionState.sqlParser.parsePlan(sqlText)

关键的地方在于CarbonSessionStateBuilder修改了sqlParser,使用Carbondata自定义的实现,具体的语法定义在CarbonSpark2SqlParser(处理carbon独有的命令)。所以要修改、增减命令的话都是要在这里处理,另外还有一部分在DDLStrategy(这是因为部分命令与spark原有的冲突,这里判断是carbon表后进行处理实现的替换)。

Carbon2.0开始通过CarbonExtensions使用Extension的方式注入

查询

对查询计划的优化调整可以看QueryExecution的实现,最终就是转换成RDD执行获取结果。

和sqlParser类似,在CarbonSessionStateBuilder修改了extraStrategies使用Carbondata自定义的CarbonLateDecodeStrategy,CarbonLateDecodeStrategy中通过CarbonDatasourceHadoopRelation.buildScan得到CarbonScanRDD。

CarbonLateDecodeStrategy包含一些优化的预处理,从类名上可知还涉及到字典编码列的延迟解码。延迟解码的好处在于只对最终结果的数据解码,减少了不必要的计算。

CarbonScanRDD

CarbonScanRDD是carbon自定义的RDD实现,所以关注点主要也在RDD的getPartitions和compute两个方法。

由于CarbonScanRDD继承CarbonRDD,且CarbonRDD做了简单的封装,所以我们要看的internalGetPartitions和internalCompute。

需要说明的是,到了CarbonScanRDD说明正在读carbon表的数据,而sql的其余逻辑是不涉及的。

internalGetPartitions

决定了多少个查询任务从carbon表中获取数据。

重点函数:distributeColumnarSplits,它基于本地性和策略实现对任务的合并调整,策略包括custom、block(文件)、blocklet、merge_small_files

internalCompute

单个任务的数据读取,常用的是VectorizedCarbonRecordReader,一条或者一批的记录返回。

记录的获取通过QueryExecutor实现,使用迭代器获取数据。具体的读取数据就结合前面提及的format定义相关类。

BlockletFilterScanner、BlockletFullScanner中则是真正涉及读操作的部分,还包含了filter处理跳过不必要数据的实现。