Article January 23, 2024

kyuubi

Words count 31k Reading time 28 mins. Read count 0

解决多租户的共享查询问题,作为多个查询引擎的统一入口

HA通过zk实现

存在的意义 - 对比 SparkThriftServer

  • 权限:STS全局只有一个SparkContext,尽管SQL下发来自不同用户,但实际都是使用启动STS的用户身份执行(proxy-user也只是一个身份),难以对资源和权限做控制
  • 单点:单点故障率高

Kyuubi从整体上可以分为用户层、服务发现层、Kyuubi Server层、Kyuubi Engine层,其整体概述如下:

  • 用户层: 通过不同方式使用Kyuubi的用户,JDBC或beeline
  • 服务发现层:提供负载均衡和高可用,HA通过zk实现,又分为Server层的服务发现和Engine层的服务发现
  • Server层(Daemon代理):由多个不同的KyuubiServer实例组成,每个KyuubiServer实例本质上为基于Apache Thrift实现的RPC服务端,其接收来自用户的请求,但并不会真正执行该请求的相关SQL操作,只会作为代理转发该请求到Kyuubi Engine层用户所属的SparkSQLEngine实例上。【Engine的管理者】
  • Engine层: 由多个不同的SparkSQLEngine实例组成,每个SparkSQLEngine实例本质上为基于Apache Thrift实现的并且持有一个SparkSession实例的RPC服务端,其接收来自KyuubiServer实例的请求,并通过SparkSession实例来执行。在Kyuubi的USER共享层级上,每个SparkSQLEngine实例都是用户级别的,即不同的用户其会持有不同的SparkSQLEngine实例,以实现用户级别的资源隔离和控制。

查询流程解说实例

https://blog.51cto.com/xpleaf/2780248

  1. Kyuubi系统管理员在大数据集群中启动了3个KyuubiServer实例和1个Zookeeper集群,其中3个KyuubiServer实例的连接信息分别为10.2.10.1:10009、10.2.10.1:10010和10.2.10.2:1009;

  2. 用户xpleaf通过beeline终端的方式连接了其中一个KyuubiServer实例;
    ./beeline -u ‘jdbc:hive2://10.2.10.1:10009’ -n xpleaf
    在这里我们假设用户xpleaf事先已经通过管理员告知的方式知道了该KyuubiServer实例的连接信息。

  3. KyuubiServer_instance1接收到xpleaf的连接请求,会为该用户创建session会话,同时会去Zookeeper上检查是否已经存在xpleaf所属的SparkSQLEngine实例;
    3.1 如果已经存在,则获取其连接信息;
    3.2 如果不存在,则通过spark-submit的方式提交一个Spark应用,启动一个SparkSQLEngine实例;

  4. KyuubiServer_instance1在Zookeeper上没有找到xpleaf所属的SparkSQLEngine实例信息,其通过spark-submit的方式启动了一个SparkSQLEngine实例;

  5. 属于xpleaf用户的新的SparkSQLEngine_instance1实例在10.2.10.1节点上进行启动,并且监听的52463端口,启动后,其向Zookeeper注册自己的连接信息/kyuubi_USER/xpleaf/instance1_10.2.10.1:52463;

  6. KyuubiServer_instance1在检测到SparkSQLEngine_instance1启动成功后,会向其发送创建session会话的连接请求;

  7. SparkSQLEngine_instance1收到KyuubiServer_instance1创建session会话的连接请求,则创建一个新的session会话;

  8. 用户启动beeleine完成并成功创建会话,接着用户执行SQL查询;

jdbc:hive2://10.2.10.1:10009> select * from teacher;

  1. KyuubiServer_instance1接收到xpleaf的执行SQL查询的请求,会先检查是否存在xpleaf所属的SparkSQLEngine实例;

  2. KyuubiServer_instance1找到xpleaf所属的SparkSQLEngine_instance1实例,接着会为这次执行SQL的操作创建一个Operation;

  3. KyuubiServer_instance1根据连接信息创建了一个RPC Client,并且构建SQL执行的RPC请求,发到对应的SparkSQLEngine_instance1实例上;

  4. SparkSQLEngine_instance1接收到该请求后,会创建一个该SQL操作的Operation,并且使用其内部的SparkSession实例来进行执行,最后将执行结果返回给KyuubiServer_instance1;

  5. KyuubiServer_instance1接收到SparkSQLEngine_instance1的执行结果,返回给用户,这样一次SQL查询操作就完成了。

透过整体协作流程我们可以看到:

站在用户层视角来看,其为RPC客户端,而为其提供RPC服务的是Kyuubi Server层,在这里,Kyuubi Server是RPC服务端;

站在Kyuubi Server层视角来看,其既是为用户层提供RPC服务的RPC服务端,同时也是使用Kyuubi Engine层RPC服务的RPC客户端;

站在Kyuubi Engine层视角来看,其为RPC服务端,其为Kyuubi Server层提供RPC服务;

Kyuubi在整体Server端和Client端以及其实现功能的设计上,是十分清晰的。

Install

http://mirrors.ustc.edu.cn/

Setup

配置JAVA_HOME,SPARK_HOME 到conf/kyuubi-env.sh。还支持Flink、Trino

1
2
3
4
5
6
7
8
9
cp conf/kyuubi-env.sh.template conf/kyuubi-env.sh
echo 'export JAVA_HOME=/data/java' >> conf/kyuubi-env.sh
echo 'export SPARK_HOME=/data/spark' >> conf/kyuubi-env.sh
echo 'export HADOOP_HOME=/data/hadoop' >> conf/kyuubi-env.sh
echo 'export HADOOP_CONF_DIR=/data/hadoop/etc/hadoop' >> conf/kyuubi-env.sh

cp conf/kyuubi-defaults.conf.template conf/kyuubi-defaults.conf
spark.master=yarn
spark.sql.adaptive.enabled=true
1
2
3
4
5
6
7
bin/kyuubi run # 前台运行
bin/kyuubi start
bin/kyuubi stop

bin/beeline -u 'jdbc:hive2://[HOST]:10009/' -n apache

WebUI: http://[HOST]:10099/

从日志可以获得访问kyuubi server的端口为
Starting and exposing JDBC connection at: jdbc:hive2://a51-dg-hcy-bi-cal-005:10009/

启动细节

zk

若没有配置zk,则会使用embedded_zookeeper监听2181端口,通过kyuubi-defaults.confkyuubi.zookeeper.embedded.client.port修改,体验时管理命令和组共享有问题

生产环境需要配置kyuubi.ha.addresses kyuubi.ha.namespace ,连接需要修改为类似bin/beeline -u 'jdbc:hive2://10.242.189.214:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=kyuubi' -n kentyao

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
[zk: 10.17.41.130:2181(CONNECTED) 0] ls /
[kyuubi, kyuubi_1.8.0_USER_SPARK_SQL, kyuubi_1.8.0_USER_SPARK_SQL_lock, zookeeper]
[zk: 10.17.41.130:2181(CONNECTED) 3] ls -R /
/
/kyuubi => 10.17.41.130
/kyuubi_1.8.0_USER_SPARK_SQL
/kyuubi_1.8.0_USER_SPARK_SQL_lock
/zookeeper
/kyuubi/serviceUri=lg-hcybi-6967-130:10009;version=1.8.0;sequence=0000000000 =>lg-hcybi-6967-130:10009
/kyuubi_1.8.0_USER_SPARK_SQL/anonymous 测试的一个用户
/kyuubi_1.8.0_USER_SPARK_SQL/apache 测试的一个用户
/kyuubi_1.8.0_USER_SPARK_SQL/anonymous/default => 10.17.41.130
/kyuubi_1.8.0_USER_SPARK_SQL/apache/default => 10.17.41.130
/kyuubi_1.8.0_USER_SPARK_SQL_lock/anonymous
/kyuubi_1.8.0_USER_SPARK_SQL_lock/apache
/kyuubi_1.8.0_USER_SPARK_SQL_lock/anonymous/default
/kyuubi_1.8.0_USER_SPARK_SQL_lock/anonymous/default/leases
/kyuubi_1.8.0_USER_SPARK_SQL_lock/anonymous/default/locks
/kyuubi_1.8.0_USER_SPARK_SQL_lock/apache/default
/kyuubi_1.8.0_USER_SPARK_SQL_lock/apache/default/leases
/kyuubi_1.8.0_USER_SPARK_SQL_lock/apache/default/locks


kyuubi-ctl

1
2
3
4
5
6
7
8
9
 bin/kyuubi-ctl list server -zk 10.17.41.130:2181
Zookeeper service nodes
╔═══════════╤═══════════════════╤═══════╤═════════╗
║ Namespace │ Host │ Port │ Version ║
╠═══════════╪═══════════════════╪═══════╪═════════╣
║ /kyuubi │ lg-hcybi-6967-130 │ 10009 │ 1.8.0 ║
╚═══════════╧═══════════════════╧═══════╧═════════╝
bin/kyuubi-ctl list engine -u apache -zk 10.17.41.130:2181

唤起引擎

[问题: 这里怎么定义使用什么引擎、引擎配置参数、用户到组的映射配置]
在引擎A已启动的情况下,修改连接参数(包括spark/kyuubi的参数)不会启动新的引擎,因为kyuubi是按照用户从zk获取查询引擎连接

1
2
3
4
5
6
7
8
9
10
./bin/beeline -u "jdbc:hive2://a51-dg-hcy-bi-cal-005:10009/;\
spark.yarn.queue=default;\
spark.master=yarn;\
spark.dynamicAllocation.enabled=true;\
spark.dynamicAllocation.maxExecutors=50;\
spark.shuffle.service.enabled=true;\
spark.executor.instances=5;\
spark.executor.cores=4;\
spark.executor.memory=6g;
kyuubi.engine.share.level=CONNECTION;" -n someone

连接后会提交主类为org.apache.kyuubi.engine.spark.SparkSQLEngine的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
2023-10-25 15:07:37.563 INFO org.apache.kyuubi.engine.EngineRef: Launching engine:
/data/soft/spark-3.3.1-bin-hadoop2/bin/spark-submit \
--class org.apache.kyuubi.engine.spark.SparkSQLEngine \
--conf spark.hive.server2.thrift.resultset.default.fetch.size=1000 \
--conf spark.kyuubi.client.ipAddress=10.27.48.3 \
--conf spark.kyuubi.client.version=1.7.1 \
--conf spark.kyuubi.engine.submit.time=1698217657450 \
--conf spark.kyuubi.ha.addresses=a51-dg-hcy-bi-cal-005:2181 \
--conf spark.kyuubi.ha.engine.ref.id=cd5f0546-ebc1-4736-96c7-4bfea78e7053 \
--conf spark.kyuubi.ha.namespace=/kyuubi_1.7.1_USER_SPARK_SQL/anonymous/default \
--conf spark.kyuubi.ha.zookeeper.auth.type=NONE \
--conf spark.kyuubi.server.ipAddress=10.27.48.3 \
--conf spark.kyuubi.session.connection.url=a51-dg-hcy-bi-cal-005:10009 \
--conf spark.kyuubi.session.real.user=anonymous \
--conf spark.app.name=kyuubi_USER_SPARK_SQL_anonymous_default_cd5f0546-ebc1-4736-96c7-4bfea78e7053 \
--conf spark.kubernetes.driver.label.kyuubi-unique-tag=cd5f0546-ebc1-4736-96c7-4bfea78e7053 \
--conf spark.yarn.tags=KYUUBI,cd5f0546-ebc1-4736-96c7-4bfea78e7053 \
--proxy-user anonymous /data/soft/apache-kyuubi-1.7.1-bin/externals/engines/spark/kyuubi-spark-sql-engine_2.12-1.7.1.jar
23/10/25 15:08:09 INFO SparkSQLEngine:
Spark application name: kyuubi_USER_SPARK_SQL_anonymous_default_cd5f0546-ebc1-4736-96c7-4bfea78e7053
application ID: local-1698217666816
application tags: KYUUBI,cd5f0546-ebc1-4736-96c7-4bfea78e7053
application web UI: http://a51-dg-hcy-bi-cal-005:46498
master: local[*]
version: 3.3.1
driver: [cpu: 1, mem: 1g]
executor: [cpu: 2, mem: 1g, maxNum: 2]
Start time: Wed Oct 25 15:07:44 CST 2023

User: anonymous (shared mode: USER)
State: STARTED

不停server的情况下管理engine

引擎启动配置

Spark

  • spark-defaults.conf,默认值
  • kyuubi-defaults.conf, 优先级更高
  • JDBC URL,优先级最高 ,例如:jdbc:hive2://localhost:10009/default;#spark.sql.shuffle.partitions=2;spark.executor.memory=5g
  • Set命令修改动态参数

常用配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
spark.master        yarn
spark.serializer org.apache.spark.serializer.KryoSerializer

kyuubi.ha.addresses xx:2181,xx:2181
kyuubi.engine.share.level SERVER

spark.driver.cores 2
spark.driver.memory 3G
spark.executor.cores 6
spark.executor.memory 12g
spark.executor.instances 3
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.minExecutors 3
spark.dynamicAllocation.maxExecutors 40
spark.kerberos.keytab xxx
spark.kerberos.principal xxx

kyuubi.engine.doAs.enabled false
kyuubi.kinit.keytab xxx
kyuubi.kinit.principal xxx

数据湖支持

1
2
3
4
5
6
jdbc:hive2://10.27.48.14:10009/ydy_bi_db01;#
kyuubi.engine.share.level.subdomain=delta;
spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension;
spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog;
spark.jars=/data/bdoc/kyuubi/jars/delta-spark_2.12-3.2.1.jar,/data/bdoc/kyuubi/jars/delta-storage-3.2.1.jar;
kyuubi.session.engine.idle.timeout=PT10M;

是否启动新的engine

org.apache.kyuubi.engine.EngineRef#getOrCreate
根据engineSpace从zk判断,参考前面打印的zk目录信息。
因此每个用户/组/server 都固定一个配置,subdomain默认为default,通过配置多个subdomain来获得多个不同的engine,可以用来轮询/随机等支持HA

实际启动engine的代码在org.apache.kyuubi.engine.spark.SparkSQLEngine$#startEngine,也就是打印出来的spark-submit命令

是否新的session
org.apache.kyuubi.engine.spark.session.SparkSQLSessionManager#getOrNewSparkSession

判断输入为userName和shareLevel
SparkSession.newSession : has separate SQLConf, registered temporary views and UDFs, but shared SparkContext and table cache

  • flink-conf.yaml,默认值
  • kyuubi-defaults.conf, 优先级更高
  • JDBC URL,优先级最高 ,例如:jdbc:hive2://localhost:10009/default;#kyuubi.engine.type=FLINK_SQL;parallelism.default=2;taskmanager.memory.process.size=5g
  • session模式bin/beeline -u 'jdbc:hive2://localhost:10009/;#kyuubi.engine.type=FLINK_SQL;yarn.application.id=application_1679304354185_0001;execution.target=yarn-session' -n xcchen
  • application模式 bin/beeline -u 'jdbc:hive2://localhost:10009/;#kyuubi.engine.type=FLINK_SQL;flink.execution.target=yarn-application' -n xcchen
    `
  • Set命令修改动态参数

用户级默认配置

优先级高于kyuubi-defaults.conf(重启server生效),低于JDBC URL. 配置格式为 ___{username}___.{config key}

1
2
3
4
5
6
7
8
9
10
11
12
13
# For system defaults
spark.master=local
spark.sql.adaptive.enabled=true
spark.driver.memory=2G
spark.executor.instances=1
spark.executor.cores=3
# For a user named apache
___apache___.spark.executor.instances=3
___apache___.spark.executor.cores=5
___apache___.spark.executor.memory=4g
# For a user named bob
___bob___.spark.master=spark://master:7077
___bob___.spark.executor.memory=8g

刷新配置参数

1
bin/kyuubi-admin refresh config userDefaultsConf --hostUrl http://10.17.41.130:10099

延申JDBC URL参数

HiveServer2 Clients - Apache Hive - Apache Software Foundation

The HiveServer2 URL syntax:
jdbc:hive2://<host1>:<port1>,<host2>:<port2>/dbName;initFile=<file>;sess_var_list?hive_conf_list#hive_var_list

where

  • <host1>:<port1>,<host2>:<port2> is a server instance or a comma separated list of server instances to connect to (if dynamic service discovery is enabled). If empty, the embedded server will be used.
  • dbName is the name of the initial database.
  • <file> is the path of init script file executed automatically after connection.
  • sess_var_list is a semicolon separated list of key=value pairs of session variables (e.g., user=foo;password=bar).
  • hive_conf_list is a semicolon separated list of key=value pairs of Hive configuration variables for this session
  • hive_var_list is a semicolon separated list of key=value pairs of Hive variables for this session.

Special characters in sess_var_list, hive_conf_list, hive_var_list parameter values should be encoded with URL encoding if needed.

Conf

kyuubi-defaults.conf

kyuubi.frontend.bind.host
kyuubi.engine.share.level
kyuubi.session.engine.spark.max.lifetime

Authentication

1
2
3
4
5
6
7
8
9
10
kyuubi.authentication=JDBC
kyuubi.authentication.jdbc.driver.class = com.mysql.jdbc.Driver
kyuubi.authentication.jdbc.url = jdbc:mysql://127.0.0.1:3306/auth_db
kyuubi.authentication.jdbc.user = bowenliang123
kyuubi.authentication.jdbc.password = bowenliang123@kyuubi
kyuubi.authentication.jdbc.query = SELECT 1 FROM auth_table WHERE user=${user} AND passwd=MD5(CONCAT(salt,${password}))

create database test default character set utf8mb4;
create table kyuubi_user (`user` varchar(50) primary key, password varchar(50));
insert into kyuubi_user values ('xcchen',md5(concat('1qazZSE$','xcchen')));
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
sqlite3 user.db
create table kyuubi_user (u text, p text);
insert into kyuubi_user values ('apache',hex('15642'||hex('1W2e$'||'apache123')));
select * from kyuubi_user;
update kyuubi_user set p=hex('15642'||hex('1W2e$'||'apache123')) where u='apache';
SELECT 1 FROM kyuubi_user WHERE u='${user}' AND p=hex('15642'||hex('1W2e$'||'${password}'))

kyuubi.authentication=JDBC
kyuubi.authentication.jdbc.driver.class=org.sqlite.JDBC
kyuubi.authentication.jdbc.url=jdbc:sqlite:/data/kyuubi/user.db
kyuubi.authentication.jdbc.query=SELECT 1 FROM kyuubi_user WHERE u=${user} AND p=hex('15642'||hex('1W2e$'||${password}))
kyuubi.server.administrators=apache

需要在命令行 -n -p输入密码 或者 !connect 'jdbc:hive2://10.17.41.130:10009/' 方式
在命令行使用-n -p为空 交互输入会失败

insert into kyuubi_user values ('xxx',hex('15642'||hex('1W2e$'||'Lcj-X_FQ4@Rs+[L')));

管理命令

 > specify the zookeeper address(--zk-quorum) and namespace(--namespace), version(--version) parameters to query a specific kyuubi server cluster.

bin/kyuubi-ctl --help

bin/kyuubi-ctl list server -zk a51-dg-hcy-bi-cal-005:10009
bin/kyuubi-ctl list server -zk a51-dg-hcy-bi-cal-003:2181
bin/kyuubi-ctl list engine -u anonymous
bin/kyuubi-ctl delete engine -s a51-dg-hcy-bi-cal-004 -p 39008 -u anonymous

引擎共用类型

推断:所以是根据url信息 将engine保存到zk,后续再重复使用?那么判断可复用的条件是?url可能顺序不一样?

Connection

One engine per session

适用于大查询。连接断开后引擎就关闭

User

One engine per user

按用户隔离,同一登录用户的多个连接都使用同一个引擎。通过配置项kyuubi.engine.single.spark.session决定多个连接是否共用一个SparkSession使得临时表等信息是共用的

使用不同用户连接服务端,启动查询时会启动新的STS

连接断开后引擎等待超时后才关闭

Group

One engine per primary group
[[_posts/CheatSheet/storage/hdfs#GroupMapping配置|GroupMapping配置]]
Group级别的查询默认为组名身份执行,如果想用连接的用户身份执行需要通过SparkContext.getLocalProperty("kyuubi.session.user") 发送给类似Ranger等安全服务

配置kyuubi.session.group.provider,自定义实现org.apache.kyuubi.plugin.GroupProvider

小查询,或者对人员限制资源使用

Server

One engine per cluster

这种就与Spark Thrift Server类似,执行身份为启动server的用户

subdomain

User/Group/Server级别下,每个层次都可以通过subdomain配置多个engine,然后在jdbc连接url上配置参数kyuubi.engine.share.level.subdomain指定即可

配置方式

kyuubi.engine.share.level

特性

大结果集增量收集 kyuubi.operation.incremental.collect

change from collect to toLocalIterator.

1
2
3
4
5
6
7
使用方式1:set命令
set kyuubi.operation.incremental.collect=true;

使用方式2:JDBC URL(必须配incremental=true,否则结果集会拉到beeline客户端造成oom)
beeline -u 'jdbc:hive2://kyuubi:10009/?spark.driver.maxResultSize=8g;spark.driver.memory=12g#kyuubi.engine.share.level=CONNECTION;kyuubi.operation.incremental.collect=true' \
--incremental=true \
-f big_result_query.sql
0%