Contents
  1. 1. 存在的意义 - 对比 SparkThriftServer
    1. 1.1. 查询流程解说实例
      1. 1.1.1. 透过整体协作流程我们可以看到:
  2. 2. Install
  3. 3. Setup
    1. 3.1. 启动细节
      1. 3.1.1. zk
      2. 3.1.2. kyuubi-ctl
      3. 3.1.3. 唤起引擎
  4. 4. 引擎启动配置
    1. 4.1. Spark
      1. 4.1.1. 常用配置
      2. 4.1.2. 是否启动新的engine
    2. 4.2. Flink
    3. 4.3. 用户级默认配置
      1. 4.3.1. 延申JDBC URL参数
  5. 5. Conf
    1. 5.1. kyuubi-defaults.conf
    2. 5.2. Authentication
  6. 6. 管理命令
  7. 7. 引擎共用类型
    1. 7.1. Connection
    2. 7.2. User
    3. 7.3. Group
    4. 7.4. Server
    5. 7.5. subdomain
    6. 7.6. 配置方式
  8. 8. 特性
    1. 8.1. 大结果集增量收集 kyuubi.operation.incremental.collect

解决多租户的共享查询问题,作为多个查询引擎的统一入口

HA通过zk实现

存在的意义 - 对比 SparkThriftServer

  • 权限:STS全局只有一个SparkContext,尽管SQL下发来自不同用户,但实际都是使用启动STS的用户身份执行(proxy-user也只是一个身份),难以对资源和权限做控制
  • 单点:单点故障率高

Kyuubi从整体上可以分为用户层、服务发现层、Kyuubi Server层、Kyuubi Engine层,其整体概述如下:

  • 用户层: 通过不同方式使用Kyuubi的用户,JDBC或beeline
  • 服务发现层:提供负载均衡和高可用,HA通过zk实现,又分为Server层的服务发现和Engine层的服务发现
  • Server层(Daemon代理):由多个不同的KyuubiServer实例组成,每个KyuubiServer实例本质上为基于Apache Thrift实现的RPC服务端,其接收来自用户的请求,但并不会真正执行该请求的相关SQL操作,只会作为代理转发该请求到Kyuubi Engine层用户所属的SparkSQLEngine实例上。【Engine的管理者】
  • Engine层: 由多个不同的SparkSQLEngine实例组成,每个SparkSQLEngine实例本质上为基于Apache Thrift实现的并且持有一个SparkSession实例的RPC服务端,其接收来自KyuubiServer实例的请求,并通过SparkSession实例来执行。在Kyuubi的USER共享层级上,每个SparkSQLEngine实例都是用户级别的,即不同的用户其会持有不同的SparkSQLEngine实例,以实现用户级别的资源隔离和控制。

查询流程解说实例

https://blog.51cto.com/xpleaf/2780248

  1. Kyuubi系统管理员在大数据集群中启动了3个KyuubiServer实例和1个Zookeeper集群,其中3个KyuubiServer实例的连接信息分别为10.2.10.1:10009、10.2.10.1:10010和10.2.10.2:1009;

  2. 用户xpleaf通过beeline终端的方式连接了其中一个KyuubiServer实例;
    ./beeline -u ‘jdbc:hive2://10.2.10.1:10009’ -n xpleaf
    在这里我们假设用户xpleaf事先已经通过管理员告知的方式知道了该KyuubiServer实例的连接信息。

  3. KyuubiServer_instance1接收到xpleaf的连接请求,会为该用户创建session会话,同时会去Zookeeper上检查是否已经存在xpleaf所属的SparkSQLEngine实例;
    3.1 如果已经存在,则获取其连接信息;
    3.2 如果不存在,则通过spark-submit的方式提交一个Spark应用,启动一个SparkSQLEngine实例;

  4. KyuubiServer_instance1在Zookeeper上没有找到xpleaf所属的SparkSQLEngine实例信息,其通过spark-submit的方式启动了一个SparkSQLEngine实例;

  5. 属于xpleaf用户的新的SparkSQLEngine_instance1实例在10.2.10.1节点上进行启动,并且监听的52463端口,启动后,其向Zookeeper注册自己的连接信息/kyuubi_USER/xpleaf/instance1_10.2.10.1:52463;

  6. KyuubiServer_instance1在检测到SparkSQLEngine_instance1启动成功后,会向其发送创建session会话的连接请求;

  7. SparkSQLEngine_instance1收到KyuubiServer_instance1创建session会话的连接请求,则创建一个新的session会话;

  8. 用户启动beeleine完成并成功创建会话,接着用户执行SQL查询;

jdbc:hive2://10.2.10.1:10009> select * from teacher;

  1. KyuubiServer_instance1接收到xpleaf的执行SQL查询的请求,会先检查是否存在xpleaf所属的SparkSQLEngine实例;

  2. KyuubiServer_instance1找到xpleaf所属的SparkSQLEngine_instance1实例,接着会为这次执行SQL的操作创建一个Operation;

  3. KyuubiServer_instance1根据连接信息创建了一个RPC Client,并且构建SQL执行的RPC请求,发到对应的SparkSQLEngine_instance1实例上;

  4. SparkSQLEngine_instance1接收到该请求后,会创建一个该SQL操作的Operation,并且使用其内部的SparkSession实例来进行执行,最后将执行结果返回给KyuubiServer_instance1;

  5. KyuubiServer_instance1接收到SparkSQLEngine_instance1的执行结果,返回给用户,这样一次SQL查询操作就完成了。

透过整体协作流程我们可以看到:

站在用户层视角来看,其为RPC客户端,而为其提供RPC服务的是Kyuubi Server层,在这里,Kyuubi Server是RPC服务端;

站在Kyuubi Server层视角来看,其既是为用户层提供RPC服务的RPC服务端,同时也是使用Kyuubi Engine层RPC服务的RPC客户端;

站在Kyuubi Engine层视角来看,其为RPC服务端,其为Kyuubi Server层提供RPC服务;

Kyuubi在整体Server端和Client端以及其实现功能的设计上,是十分清晰的。

Install

http://mirrors.ustc.edu.cn/

Setup

配置JAVA_HOME,SPARK_HOME 到conf/kyuubi-env.sh。还支持Flink、Trino

1
2
3
4
5
6
7
8
9
cp conf/kyuubi-env.sh.template conf/kyuubi-env.sh
echo 'export JAVA_HOME=/data/java' >> conf/kyuubi-env.sh
echo 'export SPARK_HOME=/data/spark' >> conf/kyuubi-env.sh
echo 'export HADOOP_HOME=/data/hadoop' >> conf/kyuubi-env.sh
echo 'export HADOOP_CONF_DIR=/data/hadoop/etc/hadoop' >> conf/kyuubi-env.sh

cp conf/kyuubi-defaults.conf.template conf/kyuubi-defaults.conf
spark.master=yarn
spark.sql.adaptive.enabled=true
1
2
3
4
5
6
7
bin/kyuubi run # 前台运行
bin/kyuubi start
bin/kyuubi stop

bin/beeline -u 'jdbc:hive2://[HOST]:10009/' -n apache

WebUI: http://[HOST]:10099/

从日志可以获得访问kyuubi server的端口为
Starting and exposing JDBC connection at: jdbc:hive2://a51-dg-hcy-bi-cal-005:10009/

启动细节

zk

若没有配置zk,则会使用embedded_zookeeper监听2181端口,通过kyuubi-defaults.confkyuubi.zookeeper.embedded.client.port修改,体验时管理命令和组共享有问题

生产环境需要配置kyuubi.ha.addresses kyuubi.ha.namespace ,连接需要修改为类似bin/beeline -u 'jdbc:hive2://10.242.189.214:2181/;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=kyuubi' -n kentyao

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
[zk: 10.17.41.130:2181(CONNECTED) 0] ls /
[kyuubi, kyuubi_1.8.0_USER_SPARK_SQL, kyuubi_1.8.0_USER_SPARK_SQL_lock, zookeeper]
[zk: 10.17.41.130:2181(CONNECTED) 3] ls -R /
/
/kyuubi => 10.17.41.130
/kyuubi_1.8.0_USER_SPARK_SQL
/kyuubi_1.8.0_USER_SPARK_SQL_lock
/zookeeper
/kyuubi/serviceUri=lg-hcybi-6967-130:10009;version=1.8.0;sequence=0000000000 =>lg-hcybi-6967-130:10009
/kyuubi_1.8.0_USER_SPARK_SQL/anonymous 测试的一个用户
/kyuubi_1.8.0_USER_SPARK_SQL/apache 测试的一个用户
/kyuubi_1.8.0_USER_SPARK_SQL/anonymous/default => 10.17.41.130
/kyuubi_1.8.0_USER_SPARK_SQL/apache/default => 10.17.41.130
/kyuubi_1.8.0_USER_SPARK_SQL_lock/anonymous
/kyuubi_1.8.0_USER_SPARK_SQL_lock/apache
/kyuubi_1.8.0_USER_SPARK_SQL_lock/anonymous/default
/kyuubi_1.8.0_USER_SPARK_SQL_lock/anonymous/default/leases
/kyuubi_1.8.0_USER_SPARK_SQL_lock/anonymous/default/locks
/kyuubi_1.8.0_USER_SPARK_SQL_lock/apache/default
/kyuubi_1.8.0_USER_SPARK_SQL_lock/apache/default/leases
/kyuubi_1.8.0_USER_SPARK_SQL_lock/apache/default/locks


kyuubi-ctl

1
2
3
4
5
6
7
8
9
 bin/kyuubi-ctl list server -zk 10.17.41.130:2181
Zookeeper service nodes
╔═══════════╤═══════════════════╤═══════╤═════════╗
║ Namespace │ Host │ Port │ Version ║
╠═══════════╪═══════════════════╪═══════╪═════════╣
║ /kyuubi │ lg-hcybi-6967-130 │ 10009 │ 1.8.0 ║
╚═══════════╧═══════════════════╧═══════╧═════════╝
bin/kyuubi-ctl list engine -u apache -zk 10.17.41.130:2181

唤起引擎

[问题: 这里怎么定义使用什么引擎、引擎配置参数、用户到组的映射配置]
在引擎A已启动的情况下,修改连接参数(包括spark/kyuubi的参数)不会启动新的引擎,因为kyuubi是按照用户从zk获取查询引擎连接

1
2
3
4
5
6
7
8
9
10
./bin/beeline -u "jdbc:hive2://a51-dg-hcy-bi-cal-005:10009/;\
spark.yarn.queue=default;\
spark.master=yarn;\
spark.dynamicAllocation.enabled=true;\
spark.dynamicAllocation.maxExecutors=50;\
spark.shuffle.service.enabled=true;\
spark.executor.instances=5;\
spark.executor.cores=4;\
spark.executor.memory=6g;
kyuubi.engine.share.level=CONNECTION;" -n someone

连接后会提交主类为org.apache.kyuubi.engine.spark.SparkSQLEngine的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
2023-10-25 15:07:37.563 INFO org.apache.kyuubi.engine.EngineRef: Launching engine:
/data/soft/spark-3.3.1-bin-hadoop2/bin/spark-submit \
--class org.apache.kyuubi.engine.spark.SparkSQLEngine \
--conf spark.hive.server2.thrift.resultset.default.fetch.size=1000 \
--conf spark.kyuubi.client.ipAddress=10.27.48.3 \
--conf spark.kyuubi.client.version=1.7.1 \
--conf spark.kyuubi.engine.submit.time=1698217657450 \
--conf spark.kyuubi.ha.addresses=a51-dg-hcy-bi-cal-005:2181 \
--conf spark.kyuubi.ha.engine.ref.id=cd5f0546-ebc1-4736-96c7-4bfea78e7053 \
--conf spark.kyuubi.ha.namespace=/kyuubi_1.7.1_USER_SPARK_SQL/anonymous/default \
--conf spark.kyuubi.ha.zookeeper.auth.type=NONE \
--conf spark.kyuubi.server.ipAddress=10.27.48.3 \
--conf spark.kyuubi.session.connection.url=a51-dg-hcy-bi-cal-005:10009 \
--conf spark.kyuubi.session.real.user=anonymous \
--conf spark.app.name=kyuubi_USER_SPARK_SQL_anonymous_default_cd5f0546-ebc1-4736-96c7-4bfea78e7053 \
--conf spark.kubernetes.driver.label.kyuubi-unique-tag=cd5f0546-ebc1-4736-96c7-4bfea78e7053 \
--conf spark.yarn.tags=KYUUBI,cd5f0546-ebc1-4736-96c7-4bfea78e7053 \
--proxy-user anonymous /data/soft/apache-kyuubi-1.7.1-bin/externals/engines/spark/kyuubi-spark-sql-engine_2.12-1.7.1.jar
23/10/25 15:08:09 INFO SparkSQLEngine:
Spark application name: kyuubi_USER_SPARK_SQL_anonymous_default_cd5f0546-ebc1-4736-96c7-4bfea78e7053
application ID: local-1698217666816
application tags: KYUUBI,cd5f0546-ebc1-4736-96c7-4bfea78e7053
application web UI: http://a51-dg-hcy-bi-cal-005:46498
master: local[*]
version: 3.3.1
driver: [cpu: 1, mem: 1g]
executor: [cpu: 2, mem: 1g, maxNum: 2]
Start time: Wed Oct 25 15:07:44 CST 2023

User: anonymous (shared mode: USER)
State: STARTED

不停server的情况下管理engine

引擎启动配置

Spark

  • spark-defaults.conf,默认值
  • kyuubi-defaults.conf, 优先级更高
  • JDBC URL,优先级最高 ,例如:jdbc:hive2://localhost:10009/default;#spark.sql.shuffle.partitions=2;spark.executor.memory=5g
  • Set命令修改动态参数

常用配置

1
2
3
spark.master        yarn
spark.serializer org.apache.spark.serializer.KryoSerializer

是否启动新的engine

org.apache.kyuubi.engine.EngineRef#getOrCreate
根据engineSpace从zk判断,参考前面打印的zk目录信息。
因此每个用户/组/server 都固定一个配置,subdomain默认为default,通过配置多个subdomain来获得多个不同的engine,可以用来轮询/随机等支持HA

实际启动engine的代码在org.apache.kyuubi.engine.spark.SparkSQLEngine$#startEngine,也就是打印出来的spark-submit命令

是否新的session
org.apache.kyuubi.engine.spark.session.SparkSQLSessionManager#getOrNewSparkSession

判断输入为userName和shareLevel
SparkSession.newSession : has separate SQLConf, registered temporary views and UDFs, but shared SparkContext and table cache

  • flink-conf.yaml,默认值
  • kyuubi-defaults.conf, 优先级更高
  • JDBC URL,优先级最高 ,例如:jdbc:hive2://localhost:10009/default;#kyuubi.engine.type=FLINK_SQL;parallelism.default=2;taskmanager.memory.process.size=5g
  • session模式bin/beeline -u 'jdbc:hive2://localhost:10009/;#kyuubi.engine.type=FLINK_SQL;yarn.application.id=application_1679304354185_0001;execution.target=yarn-session' -n xcchen
  • application模式 bin/beeline -u 'jdbc:hive2://localhost:10009/;#kyuubi.engine.type=FLINK_SQL;flink.execution.target=yarn-application' -n xcchen
    `
  • Set命令修改动态参数

用户级默认配置

优先级高于kyuubi-defaults.conf(重启server生效),低于JDBC URL. 配置格式为 ___{username}___.{config key}

1
2
3
4
5
6
7
8
9
10
11
12
13
# For system defaults
spark.master=local
spark.sql.adaptive.enabled=true
spark.driver.memory=2G
spark.executor.instances=1
spark.executor.cores=3
# For a user named apache
___apache___.spark.executor.instances=3
___apache___.spark.executor.cores=5
___apache___.spark.executor.memory=4g
# For a user named bob
___bob___.spark.master=spark://master:7077
___bob___.spark.executor.memory=8g

刷新配置参数

1
bin/kyuubi-admin refresh config userDefaultsConf --hostUrl http://10.17.41.130:10099

延申JDBC URL参数

HiveServer2 Clients - Apache Hive - Apache Software Foundation

The HiveServer2 URL syntax:
jdbc:hive2://<host1>:<port1>,<host2>:<port2>/dbName;initFile=<file>;sess_var_list?hive_conf_list#hive_var_list

where

  • <host1>:<port1>,<host2>:<port2> is a server instance or a comma separated list of server instances to connect to (if dynamic service discovery is enabled). If empty, the embedded server will be used.
  • dbName is the name of the initial database.
  • <file> is the path of init script file executed automatically after connection.
  • sess_var_list is a semicolon separated list of key=value pairs of session variables (e.g., user=foo;password=bar).
  • hive_conf_list is a semicolon separated list of key=value pairs of Hive configuration variables for this session
  • hive_var_list is a semicolon separated list of key=value pairs of Hive variables for this session.

Special characters in sess_var_list, hive_conf_list, hive_var_list parameter values should be encoded with URL encoding if needed.

Conf

kyuubi-defaults.conf

kyuubi.frontend.bind.host
kyuubi.engine.share.level
kyuubi.session.engine.spark.max.lifetime

Authentication

1
2
3
4
5
6
7
8
9
10
kyuubi.authentication=JDBC
kyuubi.authentication.jdbc.driver.class = com.mysql.jdbc.Driver
kyuubi.authentication.jdbc.url = jdbc:mysql://127.0.0.1:3306/auth_db
kyuubi.authentication.jdbc.user = bowenliang123
kyuubi.authentication.jdbc.password = bowenliang123@kyuubi
kyuubi.authentication.jdbc.query = SELECT 1 FROM auth_table WHERE user=${user} AND passwd=MD5(CONCAT(salt,${password}))

create database test default character set utf8mb4;
create table kyuubi_user (`user` varchar(50) primary key, password varchar(50));
insert into kyuubi_user values ('xcchen',md5(concat('1qazZSE$','xcchen')));
1
2
3
4
5
6
7
8
9
10
11
12
13
14
sqlite3 user.db
create table kyuubi_user (u text, p text);
insert into kyuubi_user values ('apache',hex('15642'||hex('1W2e$'||'apache123')));
select * from kyuubi_user;
update kyuubi_user set p=hex('15642'||hex('1W2e$'||'apache123')) where u='apache';
SELECT 1 FROM kyuubi_user WHERE u='${user}' AND p=hex('15642'||hex('1W2e$'||'${password}'))

kyuubi.authentication=JDBC
kyuubi.authentication.jdbc.driver.class=org.sqlite.JDBC
kyuubi.authentication.jdbc.url=jdbc:sqlite:/data/kyuubi/user.db
kyuubi.authentication.jdbc.query=SELECT 1 FROM kyuubi_user WHERE u=${user} AND p=hex('15642'||hex('1W2e$'||${password}))

需要在命令行 -n -p输入密码 或者 !connect 'jdbc:hive2://10.17.41.130:10009/' 方式
在命令行使用-n -p为空 交互输入会失败

管理命令

 > specify the zookeeper address(--zk-quorum) and namespace(--namespace), version(--version) parameters to query a specific kyuubi server cluster.

bin/kyuubi-ctl --help

bin/kyuubi-ctl list server -zk a51-dg-hcy-bi-cal-005:10009
bin/kyuubi-ctl list server -zk a51-dg-hcy-bi-cal-003:2181
bin/kyuubi-ctl list engine -u anonymous
bin/kyuubi-ctl delete engine -s a51-dg-hcy-bi-cal-004 -p 39008 -u anonymous

引擎共用类型

推断:所以是根据url信息 将engine保存到zk,后续再重复使用?那么判断可复用的条件是?url可能顺序不一样?

Connection

One engine per session

适用于大查询。连接断开后引擎就关闭

User

One engine per user

按用户隔离,同一登录用户的多个连接都使用同一个引擎。通过配置项kyuubi.engine.single.spark.session决定多个连接是否共用一个SparkSession使得临时表等信息是共用的

使用不同用户连接服务端,启动查询时会启动新的STS

连接断开后引擎等待超时后才关闭

Group

One engine per primary group
[[_posts/CheatSheet/storage/hdfs#GroupMapping配置|GroupMapping配置]]
Group级别的查询默认为组名身份执行,如果想用连接的用户身份执行需要通过SparkContext.getLocalProperty("kyuubi.session.user") 发送给类似Ranger等安全服务

配置kyuubi.session.group.provider,自定义实现org.apache.kyuubi.plugin.GroupProvider

小查询,或者对人员限制资源使用

Server

One engine per cluster

这种就与Spark Thrift Server类似,执行身份为启动server的用户

subdomain

User/Group/Server级别下,每个层次都可以通过subdomain配置多个engine,然后在jdbc连接url上配置参数kyuubi.engine.share.level.subdomain指定即可

配置方式

kyuubi.engine.share.level

特性

大结果集增量收集 kyuubi.operation.incremental.collect

change from collect to toLocalIterator.

1
2
3
4
5
6
7
使用方式1:set命令
set kyuubi.operation.incremental.collect=true;

使用方式2:JDBC URL(必须配incremental=true,否则结果集会拉到beeline客户端造成oom)
beeline -u 'jdbc:hive2://kyuubi:10009/?spark.driver.maxResultSize=8g;spark.driver.memory=12g#kyuubi.engine.share.level=CONNECTION;kyuubi.operation.incremental.collect=true' \
--incremental=true \
-f big_result_query.sql