kyuubi
解决多租户的共享查询问题,作为多个查询引擎的统一入口
HA通过zk实现
存在的意义 - 对比 SparkThriftServer
- 权限:STS全局只有一个SparkContext,尽管SQL下发来自不同用户,但实际都是使用启动STS的用户身份执行(proxy-user也只是一个身份),难以对资源和权限做控制
- 单点:单点故障率高
Kyuubi从整体上可以分为用户层、服务发现层、Kyuubi Server层、Kyuubi Engine层,其整体概述如下:
- 用户层: 通过不同方式使用Kyuubi的用户,JDBC或beeline
- 服务发现层:提供负载均衡和高可用,HA通过zk实现,又分为Server层的服务发现和Engine层的服务发现
- Server层(Daemon代理):由多个不同的KyuubiServer实例组成,每个KyuubiServer实例本质上为基于Apache Thrift实现的RPC服务端,其接收来自用户的请求,但并不会真正执行该请求的相关SQL操作,只会作为代理转发该请求到Kyuubi Engine层用户所属的SparkSQLEngine实例上。【Engine的管理者】
- Engine层: 由多个不同的SparkSQLEngine实例组成,每个SparkSQLEngine实例本质上为基于Apache Thrift实现的并且持有一个SparkSession实例的RPC服务端,其接收来自KyuubiServer实例的请求,并通过SparkSession实例来执行。在Kyuubi的USER共享层级上,每个SparkSQLEngine实例都是用户级别的,即不同的用户其会持有不同的SparkSQLEngine实例,以实现用户级别的资源隔离和控制。
查询流程解说实例
https://blog.51cto.com/xpleaf/2780248
Kyuubi系统管理员在大数据集群中启动了3个KyuubiServer实例和1个Zookeeper集群,其中3个KyuubiServer实例的连接信息分别为10.2.10.1:10009、10.2.10.1:10010和10.2.10.2:1009;
用户xpleaf通过beeline终端的方式连接了其中一个KyuubiServer实例;
./beeline -u ‘jdbc:hive2://10.2.10.1:10009’ -n xpleaf
在这里我们假设用户xpleaf事先已经通过管理员告知的方式知道了该KyuubiServer实例的连接信息。KyuubiServer_instance1接收到xpleaf的连接请求,会为该用户创建session会话,同时会去Zookeeper上检查是否已经存在xpleaf所属的SparkSQLEngine实例;
3.1 如果已经存在,则获取其连接信息;
3.2 如果不存在,则通过spark-submit的方式提交一个Spark应用,启动一个SparkSQLEngine实例;KyuubiServer_instance1在Zookeeper上没有找到xpleaf所属的SparkSQLEngine实例信息,其通过spark-submit的方式启动了一个SparkSQLEngine实例;
属于xpleaf用户的新的SparkSQLEngine_instance1实例在10.2.10.1节点上进行启动,并且监听的52463端口,启动后,其向Zookeeper注册自己的连接信息/kyuubi_USER/xpleaf/instance1_10.2.10.1:52463;
KyuubiServer_instance1在检测到SparkSQLEngine_instance1启动成功后,会向其发送创建session会话的连接请求;
SparkSQLEngine_instance1收到KyuubiServer_instance1创建session会话的连接请求,则创建一个新的session会话;
用户启动beeleine完成并成功创建会话,接着用户执行SQL查询;
jdbc:hive2://10.2.10.1:10009> select * from teacher;
KyuubiServer_instance1接收到xpleaf的执行SQL查询的请求,会先检查是否存在xpleaf所属的SparkSQLEngine实例;
KyuubiServer_instance1找到xpleaf所属的SparkSQLEngine_instance1实例,接着会为这次执行SQL的操作创建一个Operation;
KyuubiServer_instance1根据连接信息创建了一个RPC Client,并且构建SQL执行的RPC请求,发到对应的SparkSQLEngine_instance1实例上;
SparkSQLEngine_instance1接收到该请求后,会创建一个该SQL操作的Operation,并且使用其内部的SparkSession实例来进行执行,最后将执行结果返回给KyuubiServer_instance1;
KyuubiServer_instance1接收到SparkSQLEngine_instance1的执行结果,返回给用户,这样一次SQL查询操作就完成了。
透过整体协作流程我们可以看到:
站在用户层视角来看,其为RPC客户端,而为其提供RPC服务的是Kyuubi Server层,在这里,Kyuubi Server是RPC服务端;
站在Kyuubi Server层视角来看,其既是为用户层提供RPC服务的RPC服务端,同时也是使用Kyuubi Engine层RPC服务的RPC客户端;
站在Kyuubi Engine层视角来看,其为RPC服务端,其为Kyuubi Server层提供RPC服务;
Kyuubi在整体Server端和Client端以及其实现功能的设计上,是十分清晰的。
Install
Setup
配置JAVA_HOME,SPARK_HOME 到conf/kyuubi-env.sh
。还支持Flink、Trino
1 | cp conf/kyuubi-env.sh.template conf/kyuubi-env.sh |
1 | bin/kyuubi run # 前台运行 |
从日志可以获得访问kyuubi server的端口为Starting and exposing JDBC connection at: jdbc:hive2://a51-dg-hcy-bi-cal-005:10009/
启动细节
zk
若没有配置zk,则会使用embedded_zookeeper监听2181端口,通过kyuubi-defaults.conf
的kyuubi.zookeeper.embedded.client.port
修改,体验时管理命令和组共享有问题
生产环境需要配置kyuubi.ha.addresses
kyuubi.ha.namespace
,连接需要修改为类似bin/beeline -u 'jdbc:hive2://10.242.189.214:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=kyuubi' -n kentyao
1 | [zk: 10.17.41.130:2181(CONNECTED) 0] ls / |
kyuubi-ctl
1 | bin/kyuubi-ctl list server -zk 10.17.41.130:2181 |
唤起引擎
[问题: 这里怎么定义使用什么引擎、引擎配置参数、用户到组的映射配置]
在引擎A已启动的情况下,修改连接参数(包括spark/kyuubi的参数)不会启动新的引擎,因为kyuubi是按照用户从zk获取查询引擎连接
1 | ./bin/beeline -u "jdbc:hive2://a51-dg-hcy-bi-cal-005:10009/;\ |
连接后会提交主类为org.apache.kyuubi.engine.spark.SparkSQLEngine
的任务
1 | 2023-10-25 15:07:37.563 INFO org.apache.kyuubi.engine.EngineRef: Launching engine: |
不停server的情况下管理engine
引擎启动配置
Spark
- spark-defaults.conf,默认值
- kyuubi-defaults.conf, 优先级更高
- JDBC URL,优先级最高 ,例如:
jdbc:hive2://localhost:10009/default;#spark.sql.shuffle.partitions=2;spark.executor.memory=5g
- Set命令修改动态参数
常用配置
1 | spark.master yarn |
是否启动新的engine
org.apache.kyuubi.engine.EngineRef#getOrCreate
根据engineSpace从zk判断,参考前面打印的zk目录信息。
因此每个用户/组/server 都固定一个配置,subdomain默认为default,通过配置多个subdomain来获得多个不同的engine,可以用来轮询/随机等支持HA
实际启动engine的代码在org.apache.kyuubi.engine.spark.SparkSQLEngine$#startEngine
,也就是打印出来的spark-submit命令
是否新的sessionorg.apache.kyuubi.engine.spark.session.SparkSQLSessionManager#getOrNewSparkSession
判断输入为userName和shareLevel
SparkSession.newSession : has separate SQLConf, registered temporary views and UDFs, but shared SparkContext
and table cache
Flink
- flink-conf.yaml,默认值
- kyuubi-defaults.conf, 优先级更高
- JDBC URL,优先级最高 ,例如:
jdbc:hive2://localhost:10009/default;#kyuubi.engine.type=FLINK_SQL;parallelism.default=2;taskmanager.memory.process.size=5g
- session模式
bin/beeline -u 'jdbc:hive2://localhost:10009/;#kyuubi.engine.type=FLINK_SQL;yarn.application.id=application_1679304354185_0001;execution.target=yarn-session' -n xcchen
- application模式
bin/beeline -u 'jdbc:hive2://localhost:10009/;#kyuubi.engine.type=FLINK_SQL;flink.execution.target=yarn-application' -n xcchen
` - Set命令修改动态参数
用户级默认配置
优先级高于kyuubi-defaults.conf(重启server生效),低于JDBC URL. 配置格式为 ___{username}___.{config key}
1 | # For system defaults |
刷新配置参数
1 | bin/kyuubi-admin refresh config userDefaultsConf --hostUrl http://10.17.41.130:10099 |
延申JDBC URL参数
HiveServer2 Clients - Apache Hive - Apache Software Foundation
The HiveServer2 URL syntax:jdbc:hive2://<host1>:<port1>,<host2>:<port2>/dbName;initFile=<file>;sess_var_list?hive_conf_list#hive_var_list
where
<host1>:<port1>,<host2>:<port2>
is a server instance or a comma separated list of server instances to connect to (if dynamic service discovery is enabled). If empty, the embedded server will be used.dbName
is the name of the initial database.<file>
is the path of init script file executed automatically after connection.sess_var_list
is a semicolon separated list of key=value pairs of session variables (e.g.,user=foo;password=bar
).hive_conf_list
is a semicolon separated list of key=value pairs of Hive configuration variables for this sessionhive_var_list
is a semicolon separated list of key=value pairs of Hive variables for this session.
Special characters in sess_var_list, hive_conf_list, hive_var_list
parameter values should be encoded with URL encoding if needed.
Conf
kyuubi-defaults.conf
kyuubi.frontend.bind.host
kyuubi.engine.share.level
kyuubi.session.engine.spark.max.lifetime
Authentication
1 | kyuubi.authentication=JDBC |
1 | sqlite3 user.db |
管理命令
> specify the zookeeper address(--zk-quorum
) and namespace(--namespace
), version(--version
) parameters to query a specific kyuubi server cluster.
bin/kyuubi-ctl --help
bin/kyuubi-ctl list server -zk a51-dg-hcy-bi-cal-005:10009
bin/kyuubi-ctl list server -zk a51-dg-hcy-bi-cal-003:2181
bin/kyuubi-ctl list engine -u anonymous
bin/kyuubi-ctl delete engine -s a51-dg-hcy-bi-cal-004 -p 39008 -u anonymous
引擎共用类型
推断:所以是根据url信息 将engine保存到zk,后续再重复使用?那么判断可复用的条件是?url可能顺序不一样?
Connection
One engine per session
适用于大查询。连接断开后引擎就关闭
User
One engine per user
按用户隔离,同一登录用户的多个连接都使用同一个引擎。通过配置项kyuubi.engine.single.spark.session
决定多个连接是否共用一个SparkSession使得临时表等信息是共用的
使用不同用户连接服务端,启动查询时会启动新的STS
连接断开后引擎等待超时后才关闭
Group
One engine per primary group
[[_posts/CheatSheet/storage/hdfs#GroupMapping配置|GroupMapping配置]]
Group级别的查询默认为组名身份执行,如果想用连接的用户身份执行需要通过SparkContext.getLocalProperty("kyuubi.session.user")
发送给类似Ranger等安全服务
配置kyuubi.session.group.provider
,自定义实现org.apache.kyuubi.plugin.GroupProvider
小查询,或者对人员限制资源使用
Server
One engine per cluster
这种就与Spark Thrift Server类似,执行身份为启动server的用户
subdomain
User/Group/Server级别下,每个层次都可以通过subdomain配置多个engine,然后在jdbc连接url上配置参数kyuubi.engine.share.level.subdomain
指定即可
配置方式
kyuubi.engine.share.level
特性
大结果集增量收集 kyuubi.operation.incremental.collect
change from collect
to toLocalIterator
.
1 | 使用方式1:set命令 |