Carbondata查询逻辑
入口
CarbonSession继承SparkSession,查询时我们使用的是CarbonSession。
1 | override def sql(sqlText: String): DataFrame = { |
这里除了增加withProfiler用于统计耗时外与SparkSQL原来的处理实质是一样的,只是为了获取时间调整了一下代码。
在withProfiler中有一行代码实现string解析成LogicalPlan:
val logicalPlan = sessionState.sqlParser.parsePlan(sqlText)
关键的地方在于CarbonSessionStateBuilder修改了sqlParser,使用Carbondata自定义的实现,具体的语法定义在CarbonSpark2SqlParser(处理carbon独有的命令)。所以要修改、增减命令的话都是要在这里处理,另外还有一部分在DDLStrategy(这是因为部分命令与spark原有的冲突,这里判断是carbon表后进行处理实现的替换)。
Carbon2.0开始通过CarbonExtensions使用Extension的方式注入
查询
对查询计划的优化调整可以看QueryExecution的实现,最终就是转换成RDD执行获取结果。
和sqlParser类似,在CarbonSessionStateBuilder修改了extraStrategies使用Carbondata自定义的CarbonLateDecodeStrategy,CarbonLateDecodeStrategy中通过CarbonDatasourceHadoopRelation.buildScan得到CarbonScanRDD。
CarbonLateDecodeStrategy包含一些优化的预处理,从类名上可知还涉及到字典编码列的延迟解码。延迟解码的好处在于只对最终结果的数据解码,减少了不必要的计算。
CarbonScanRDD
CarbonScanRDD是carbon自定义的RDD实现,所以关注点主要也在RDD的getPartitions和compute两个方法。
由于CarbonScanRDD继承CarbonRDD,且CarbonRDD做了简单的封装,所以我们要看的internalGetPartitions和internalCompute。
需要说明的是,到了CarbonScanRDD说明正在读carbon表的数据,而sql的其余逻辑是不涉及的。
internalGetPartitions
决定了多少个查询任务从carbon表中获取数据。
重点函数:distributeColumnarSplits,它基于本地性和策略实现对任务的合并调整,策略包括custom、block(文件)、blocklet、merge_small_files
internalCompute
单个任务的数据读取,常用的是VectorizedCarbonRecordReader,一条或者一批的记录返回。
记录的获取通过QueryExecutor实现,使用迭代器获取数据。具体的读取数据就结合前面提及的format定义相关类。
BlockletFilterScanner、BlockletFullScanner中则是真正涉及读操作的部分,还包含了filter处理跳过不必要数据的实现。