CHEATSHEET January 20, 2022

dolphinscheduler

Words count 74k Reading time 1:08 Read count 0

dolphinscheduler

Site

Is:分布式 工作流 调度平台,带DAG可视化

For:复杂任务依赖

去中心化(动态主,zk选)

DolphinScheduler本身不依赖Hadoop、Hive、Spark,仅是会调用他们的Client,用于对应任务的提交

基于cron表达式调度

quartz 分布式调度器

调度情况报表

t_ds_process_instance关键字段

  • name(流程名+时间戳)
  • process_definition_id (可关联得流程名)
  • state (流程实例状态:0 提交成功,1 正在运行,2 准备暂停,3 暂停,4 准备停止,5 停止,6 失败,7 成功,8 需要容错,9 kill,10 等待线程,11 等待依赖完成)
  • start_time
  • end_time

select
from dolphinscheduler.t_ds_process_instance
group by process_definition_id

日志检查

页面操作上下线关键字 SchedulerService

grep SchedulerService dolphinscheduler-api.2021-10-26_14.0.log

升级操作

停止服务

数据库升级 create-dolphinscheduler.sh

解压替换

OR修改它的部署脚本
core-site.xml
hdfs-site.xml

配置等:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
修改
install_config.conf的数据库部分
application-mysql.yaml
registry.properties
common.properties

保留
start.sh
stop.sh
lib/mysql-connector-java-5.1.47.jar
script/alert-script.sh
script/spark-test-1.0-SNAPSHOT.jar
conf/hdfs-site.xml
conf/core-site.xml

打包分发:

1
2
3
4
5
6
7
8
9
10
11
stop.sh
rm -rf ./pid ./logs

tar czf ds201.tgz dolphinscheduler-2.0.1
mv ds201.tgz /data/tmp
chmod 755 /data/tmp/ds201.tgz
exit
sudo su hadoop
cd /data/tmp
hdfs dfs -put ds201.tgz /
hdfs dfs -chmod 755 /ds201.tgz

其他节点:

1
2
3
4
5
6
7
8
9
10
cd /data/soft
rm dolphinscheduler

hdfs dfs -get /ds201.tgz
tar xf ds201.tgz
rm ds201.tgz

ln -s dolphinscheduler-2.0.1 dolphinscheduler
chmod 755 -R dolphinscheduler-2.0.1
[改start/stop]

告警配置

{“path”:”/data/soft/dolphinscheduler/script/alert-script.sh”
,”userParams”:”138xxxxxxx”,”type”:”SHELL”}

配置样例增加
#registry.namespace=dolphinscheduler

curl -s “http://10.3.4.191:8321/sms/getReport?content=message&mobilesStr=19802021823

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

while getopts t:c:p: opts; do
case $opts in
t) t=$OPTARG ;;
c) c=$OPTARG ;;
p) p=$OPTARG ;;
?) ;;
esac
done


# Write your specific logic here

# Set the exit code according to your execution result, and alert needs to use it to judge the status of this alarm result


if [ "$t" = "error msg title" ]
then
exit 12
fi

echo ''
echo ' title='$t
echo ' content='$c
echo ' mobilesStr='$p

#title=`echo $t |tr -d '\n' |od -An -tx1|tr ' ' %| tr -d '\n'`
#content=`echo $c |tr -d '"' |tr -d '\n' |od -An -tx1|tr ' ' %| tr -d '\n' `
#msg=$title'-'$content
#msgout=${msg:0:250}

msgout=`java -cp /data/soft/dolphinscheduler/script/spark-test-1.0-SNAPSHOT.jar ds.GetAlertMsg "${t}" "${c}"`

echo '>>> http://xxx:8321/sms/getReport?content='$msgout'&mobilesStr='$p

curl -s 'http://xxx:8321/sms/getReport?content='$msgout'&mobilesStr='$p

exit 0
exit 0

部署配置项2.0.1

复制mysql jar包到lib

数据源-installconfig

1
2
3
4
DATABASE_TYPE=mysql
SPRING_DATASOURCE_URL=jdbc:mysql://127.0.0.1:3306/dolphinscheduler
SPRING_DATASOURCE_USERNAME=ds
SPRING_DATASOURCE_PASSWORD=123

连接mysql,创建数据库和用户

初始化数据库
./script/create-dolphinscheduler.sh

application-mysql.yaml 待确认是否需要配置

copy core-site.xml and hdfs-site.xml to conf dir

common.property

1
2
3
4
5
6
7
8
9
10
11
12
data.basedir.path=/tmp/dolphinscheduler
resource.storage.type=HDFS
resource.upload.path=/dolphinscheduler
hdfs.root.user=hadoop
fs.defaultFS=hdfs://localhost:9000
resource.manager.httpaddress.port=8088

yarn.resourcemanager.ha.rm.ids=
yarn.application.status.address
yarn.job.history.status.address

sudo.enable=false

registry.properties

1
配置zk服务器地址和路径

部署用户权限问题

1.创建部署用户,该部署用户必须要有sudo免密权限/etc/sudoers -> 使得可以用租户身份执行任务

2.修改部署用户的~/.bashrc添加umask 022 -> 使得相关文件可以被租户访问,否则任务无法执行

sudo su dolphinscheduler

vi ~/.bashrc

3.解压安装包后修改./bin/dolphinscheduler-daemon.sh再此添加umask 022 -> 该脚本source了/etc/profile,需要覆盖配置项

4.conf/env/dolphinscheduler_env.sh 需要对租户可读

1
2
3
4
5

[INFO] 2021-05-18 15:52:46.655  - [taskAppId=TASK-2-2-2]:[129] -  -> hadoop is not in the sudoers file.  This incident will be reported.

[INFO] 2021-05-19 09:25:40.521  - [taskAppId=TASK-2-4-5]:[129] -  -> root is not in the sudoers file.  This incident will be reported.

任务执行服务是以 sudo -u {linux-user} 切换不同linux用户的方式来实现多租户运行作业,所以部署用户需要有 sudo 权限,而且是免密的。

sudo权限  https://mp.weixin.qq.com/s/5rRWMoT0DLMcOdDl-mrsRQ

可以通过以下方式限制可切换到的用户,比如可以设置为只能切到hadoop用户来执行任务

echo ‘dolphinscheduler  ALL=(userA,userB,userC)  NOPASSWD:ALL’ >> /etc/sudoers

注意:后续增加租户的时候则要同步修改此处配置

通配符和取消命令

eg:papi ALL=/usr/sbin/,/sbin/,!/usr/sbin/fdisk

租户对应的是Linux的用户,用于worker提交作业所使用的用户。如果linux没有这个用户,worker会在执行脚本的时候创建这个用户。

租户编码:租户编码是Linux上的用户,唯一,不能重复

在任务执行时,可以将任务分配给指定worker分组,最终由该组中的worker节点执行该任务

一键部署分析

读入./conf/config/install_config.conf
修改配置
远程复制
修改权限

数据库升级分析

修改schema/data
./sql/sql/upgrade/2.0.1_schema/mysql/
进入mysql,source SQL脚本文件

升级版本号
update t_ds_version set version = 2.0.1

修改./conf/config/install_config.conf

手动部署

1.解压。部署目录 参考其他软件目录:/data

chown dolphinscheduler apache-dolphinscheduler-1.3.6-bin.tar.gz
tar -zxvf apache-dolphinscheduler-1.3.6-bin.tar.gz -C /data/
sudo chown -R dolphinscheduler:dolphinscheduler dolphinscheduler
sudo chmod 755 -R  dolphinscheduler

2.上传mysql-connector-java到DolphinScheduler的lib目录

3.初始化mysql表(建表、配权限、导数据)

Server version: 5.7.30 MySQL Community Server (GPL)

1
2
3
4
5
6
7
8
9
10
11

mysql -h192.168.xx.xx -P3306 -uroot -p

CREATE DATABASE dolphinscheduler DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci;

GRANT ALL PRIVILEGES ON dolphinscheduler.* TO 'dolphinscheduler'@'%' IDENTIFIED BY 'xxx';

GRANT ALL PRIVILEGES ON dolphinscheduler.* TO 'dolphinscheduler'@'localhost' IDENTIFIED BY 'xxx';

flush privileges;

登录126,切部署用户

vi conf/datasource.properties注释掉 PostgreSQL 相关配置,配置mysql数据库连接相关信息:  

1
2
3
4
5
6
7
8
9

spring.datasource.driver-class-name=com.mysql.jdbc.Driver

spring.datasource.url=jdbc:mysql://10.3.4.194:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8

spring.datasource.username=xxx

spring.datasource.password=xxx

sh script/create-dolphinscheduler.sh执行 script 目录下的创建表及导入基础数据脚本

4.修改运行参数,添加各调用组件路径

(可以先全部注释掉)

conf/env/dolphinscheduler_env.sh

 > export PYTHON_HOME=/usr/bin/python

 #this is fixed after 1.3.6. now need to set python binary path

5.由于没有ssh,需按照install.sh手动执行部署

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
https://www.yiibai.com/zookeeper/zookeeper_installation.html

https://cloud.tencent.com/developer/article/1458839


修改conf/common.properties

data.basedir.path=/tmp/dolphinscheduler

resource.storage.type=HDFS

resource.upload.path=/dolphinscheduler

hdfs.root.user=hadoop

fs.defaultFS=hdfs://LGJF-ZYC6-HCY-SVR553:9000

yarn.resourcemanager.ha.rm.ids=

yarn.application.status.address=http://10.17.41.126:8088/ws/v1/cluster/apps/%s



集群配置其他须知:

#NameNode启用了HA的话,还需要将hadoop的配置文件core-site.xml和hdfs-site.xml放到安装路径的conf目录下

# 如果ResourceManager是HA,则配置为ResourceManager节点的主备ip或者hostname,比如"192.168.xx.xx,192.168.xx.xx";

# 如果是单ResourceManager请配置yarnHaIps=""即可

yarn.resourcemanager.ha.rm.ids="192.168.xx.xx,192.168.xx.xx"

yarn.resourcemanager.ha.rm.ids=""

# 如果是单ResourceManager,请配置真实的ResourceManager主机名或者ip.否则保持默认值即可:yarn.application.status.address=http://10.17.41.126:8088/ws/v1/cluster/apps/%s




由于191集群有访问验证,当前通过白名单对集群服务器免密,需要配置如下参数:

yarn.resourcemanager.ha.rm.ids=10.3.4.191,10.17.41.21

yarn.application.status.address=http://ds1:8089/ws/v1/cluster/apps/%s

resource.manager.httpaddress.port=8089

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

测试命令`curl http://10.3.4.191:8089/ws/v1/cluster/apps/application_1622165044791_10946`



修改conf/zookeeper.properties

zookeeper.quorum=localhost:2181

zookeeper.dolphinscheduler.root=/dolphinscheduler



修改conf/alert.properties [目前跳过]



修改conf/worker.properties [目前跳过,使用默认组]

目录bin conf lib script sql ui下,该节点对应的组

worker.groups=default

拷贝到其他服务器

dolphin各节点身份是靠worker.properties的group记录

6.启停命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

sh ./bin/dolphinscheduler-daemon.sh start master-server

sh ./bin/dolphinscheduler-daemon.sh start worker-server

sh ./bin/dolphinscheduler-daemon.sh start api-server

sh ./bin/dolphinscheduler-daemon.sh start logger-server

sh ./bin/dolphinscheduler-daemon.sh start alert-server



sh ./bin/dolphinscheduler-daemon.sh stop master-server

sh ./bin/dolphinscheduler-daemon.sh stop worker-server

sh ./bin/dolphinscheduler-daemon.sh stop api-server

sh ./bin/dolphinscheduler-daemon.sh stop logger-server

sh ./bin/dolphinscheduler-daemon.sh stop alert-server

7.页面访问

http://10.17.41.126:12345/dolphinscheduler

管理员 admin  dolphinscheduler123

用户  tester  tester123

sudo su dolphinscheduler

8.添加worker节点

配置jdk

解压安装文件

复制mysql jar包

复制配置文件datasource、zookeeper、common、dolphinscheduler_env(按需修正)

是否worker master是看是否启动对应进程。

根据start-all启动脚本可知,worker只需要启动worker和logger

启停

1
2
3
4
5
6
7
8
9
10
11
sh ./bin/dolphinscheduler-daemon.sh stop master-server
sh ./bin/dolphinscheduler-daemon.sh stop worker-server
sh ./bin/dolphinscheduler-daemon.sh stop api-server
sh ./bin/dolphinscheduler-daemon.sh stop logger-server
sh ./bin/dolphinscheduler-daemon.sh stop alert-server

sh ./bin/dolphinscheduler-daemon.sh start master-server
sh ./bin/dolphinscheduler-daemon.sh start worker-server
sh ./bin/dolphinscheduler-daemon.sh start api-server
sh ./bin/dolphinscheduler-daemon.sh start logger-server
sh ./bin/dolphinscheduler-daemon.sh start alert-server

日期参数配置

2.0.2 Remove “+1” (days) in complement dates

其中日期格式支持自定义DateTimeFormatter,如yyyy-MM-dd

  • 后 N 年:$[add_months(yyyyMMdd,12*N)]
  • 前 N 年:$[add_months(yyyyMMdd,-12*N)]
  • 后 N 月:$[add_months(yyyyMMdd,N)]
  • 前 N 月:$[add_months(yyyyMMdd,-N)]
  • 后 N 周:$[yyyyMMdd+7*N]
  • 前 N 周:$[yyyyMMdd-7*N]
  • 后 N 天:$[yyyyMMdd+N]
  • 前 N 天:$[yyyyMMdd-N]
  • 后 N 小时:$[HHmmss+N/24]
  • 前 N 小时:$[HHmmss-N/24]
  • 后 N 分钟:$[HHmmss+N/24/60]
  • 前 N 分钟:$[HHmmss-N/24/60]

${system.biz.date} : 日常调度实例定时的定时时间前一天,格式为 yyyyMMdd,补数据时,该日期 +1
${system.biz.curdate} : 日常调度实例定时的定时时间,格式为 yyyyMMdd,补数据时,该日期 +1

${system.datetime} : 日常调度实例定时的定时时间,格式为 yyyyMMddHHmmss,补数据时,该日期 +1

参考调度样例  http://10.17.41.126:12345/dolphinscheduler/ui/#/projects/definition/list/14

比如azkaban中的$(new(“org.joda.time.DateTime”).minusDays(1).toString(“yyyy-MM-dd”))可以$[yyyy-MM-dd-1]

参数传递

原理是通过字符串打印出指定模式字符后设置参数

1
2
3
4
5
6
7
8
上游-需配置output:
tmppath=/data/bdoc/ob_data
tmporcpath=${tmppath}/data/ABLUM_SAAS_PRO/TABLE
echo '${setValue(tmp='$tmporcpath')}'

下游
echo ${tmp}

指定绑定的ip

common.properties

dolphin.scheduler.network.interface.preferred=网卡显示名称

源码

启动

org.apache.dolphinscheduler.server.master.MasterServer

MasterServer

主要成员
ZKMasterClient

MasterSchedulerService 核心服务

  • 循环检测主节点的内存和CPU资源。过载或内存过低,跳过调度执行1s
  • zk状态正常则开始调度scheduleProcess
    • 加锁zk后取command。getOneToRun
    • 创建ProcessInstance,并启动线程MasterExecThread执行(有最大线程数限制)
    • 线程创建成功,数据库存储saveProcessInstance、删除command

MasterExecThread#executeProcess

  • prepareProcess
    • 生成DAG
    • 初始化任务队列
  • runProcess
    • 循环至finish/stop
    • 超时检测
    • 遍历运行节点MasterTaskExecThread[提交到DB并循环检测状态],检测task在worker结束后处理任务状态和队列
    • 提交新任务submitStandByTask
    • 更新工作流状态 updateProcessInstanceState
  • endProcess
    • 更新结束时间
    • 更新元数据库
    • 成功失败告警

netty注册处理类:

  • TaskResponseProcessor
  • TaskAckProcessor
  • TaskKillResponseProcessor

WorkerServer

TaskExecuteProcessor

HDFS上的user目录

HDFS路径:/dolphinscheduler/hadoop/home

源码相关函数:getHdfsUserDir

问题:该目录只用在创建用户、更新用户和删除用户,没有实质作用

api变量参数启动任务

变量替换脚本写法如下,msg变量调用使用:”‘“变量”‘“

1
2
3
4
5
6
7
8
9
10
11
12
13
date="20231008"
site="gz"
server="10.3.68.166"
tablesuffix=1

curl -i -X POST "http://10.27.48.1:12345/dolphinscheduler/projects/8653695427776/executors/start-process-instance" -H "Request-Origion:SwaggerBootstrapUi" -H "accept:*/*" -H "token:xxx" -H "Content-Type:application/x-www-form-urlencoded;charset=UTF-8" -d "failureStrategy=END" -d "processDefinitionCode=11174492918720" -d "processInstancePriority=MEDIUM" -d "scheduleTime= " -d "warningGroupId=100" -d "warningType=NONE" -d "execType=START_PROCESS" -d "runMode=RUN_MODE_SERIAL" -d 'startParams={"date":"'"${date}"'","server":"'"${server}"'","site":"'"${site}"'","tablesuffix":"'"${tablesuffix}"'"}'

for i in {2..500}; do
tablesuffix=$i
echo $site $tablesuffix
sleep 60;
done

查询工作节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
SELECT 
p.name AS project_name,p.code,
pro.name AS process_name,pro.code,
t.name AS task_name,t.code
FROM (
SELECT
code,
name,
project_code
FROM dolphinscheduler.t_ds_task_definition
WHERE task_params LIKE '%10.17.41.127%'
) t
LEFT JOIN dolphinscheduler.t_ds_project p
ON p.code = t.project_code
LEFT JOIN dolphinscheduler.t_ds_process_definition pro
ON pro.locations like concat('%',t.code,'%')

API获得节点状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
str=$(curl "http://10.17.41.129:12345/dolphinscheduler/monitor/master/list" -H 'token:xxx')


返回值
str='
{"code":0,"msg":"success","data":[{"id":130615,"host":"10.17.41.129","port":5678,"zkDirectory":"/dolphinscheduler/nodes/master/10.17.41.129:5678","resInfo":"{\"cpuUsage\":0.53,\"loadAverage\":6.53,\"memoryUsage\":0.7}","createTime":"2021-06-21T17:38:20.000+0800","lastHeartbeatTime":"2021-07-19T10:49:51.000+0800"},{"id":10348,"host":"10.17.41.130","port":5678,"zkDirectory":"/dolphinscheduler/nodes/master/10.17.41.130:5678","resInfo":"{\"cpuUsage\":0.06,\"loadAverage\":0.76,\"memoryUsage\":0.48}","createTime":"2021-07-07T17:11:24.000+0800","lastHeartbeatTime":"2021-07-19T10:49:54.000+0800"}]}
'

{"code":0,"msg":"success","data":[{"id":125264,"host":"10.17.41.129","port":1234,"zkDirectories":["/dolphinscheduler/nodes/worker/default/10.17.41.129:1234"],"resInfo":"{\"cpuUsage\":0.24,\"loadAverage\":4.08,\"memoryUsage\":0.74}","createTime":"2021-07-14 14:50:53","lastHeartbeatTime":"2021-07-19 11:22:33"},{"id":86514,"host":"10.3.4.194","port":1234,"zkDirectories":["/dolphinscheduler/nodes/worker/default/10.3.4.194:1234"],"resInfo":"{\"cpuUsage\":0.05,\"loadAverage\":1.77,\"memoryUsage\":0.55}","createTime":"2021-07-14 14:11:30","lastHeartbeatTime":"2021-07-19 11:22:31"},{"id":55682,"host":"10.17.41.130","port":1234,"zkDirectories":["/dolphinscheduler/nodes/worker/default/10.17.41.130:1234"],"resInfo":"{\"cpuUsage\":0.03,\"loadAverage\":1.01,\"memoryUsage\":0.47}","createTime":"2021-07-14 14:51:32","lastHeartbeatTime":"2021-07-19 11:22:33"},{"id":53055,"host":"10.3.4.191","port":1234,"zkDirectories":["/dolphinscheduler/nodes/worker/default/10.3.4.191:1234"],"resInfo":"{\"cpuUsage\":0.02,\"loadAverage\":0.18,\"memoryUsage\":0.42}","createTime":"2021-07-15 16:59:24","lastHeartbeatTime":"2021-07-19 11:22:35"},{"id":105568,"host":"10.17.41.132","port":1234,"zkDirectories":["/dolphinscheduler/nodes/worker/default/10.17.41.132:1234"],"resInfo":"{\"cpuUsage\":0.44,\"loadAverage\":8.29,\"memoryUsage\":0.46}","createTime":"2021-07-19 10:30:21","lastHeartbeatTime":"2021-07-19 11:22:31"},{"id":34498,"host":"10.17.41.133","port":1234,"zkDirectories":["/dolphinscheduler/nodes/worker/default/10.17.41.133:1234"],"resInfo":"{\"cpuUsage\":0.43,\"loadAverage\":8.21,\"memoryUsage\":0.63}","createTime":"2021-07-14 14:52:25","lastHeartbeatTime":"2021-07-19 11:22:36"},{"id":9901,"host":"10.17.41.131","port":1234,"zkDirectories":["/dolphinscheduler/nodes/worker/default/10.17.41.131:1234"],"resInfo":"{\"cpuUsage\":0.44,\"loadAverage\":8.3,\"memoryUsage\":0.61}","createTime":"2021-07-19 10:29:24","lastHeartbeatTime":"2021-07-19 11:22:34"}]}


if [[ $str =~ '10.17.41.129' ]]
then
echo "包含"
else
echo "不包含"
fi

清理历史版本定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import io
import subprocess
import requests
import json
import time
import datetime

# 配置信息: ip 端口 token自行修改
base_url = 'http://xxx:12345'
token = 'xxx'
keep_version=3
payload={}
headers = {
'Connection': 'keep-alive',
'Accept': 'application/json, text/plain, */*',
'language': 'zh_CN',
'token':token
}

# 获取项目列表
def get_project_list():
url = "{base_url}/dolphinscheduler/projects?pageSize=100&pageNo=1&searchVal=".format(base_url=base_url)
response = requests.request("GET", url, headers=headers, data=payload)
response_data = json.loads(response.text)
projects = response_data['data']['totalList']
for project in projects:
print(project['name'], project['code'])
return projects

# 获取工作定义列表
def get_definition_detail(project_code):
all_dags = []
pageNo = 1
while True:
url = "{base_url}/dolphinscheduler/projects/{project_code}/process-definition?searchVal=&pageSize=50&pageNo={pageNo}".format(project_code=project_code,pageNo=pageNo,base_url=base_url)
response = requests.request("GET", url, headers=headers, data=payload)
response_data = json.loads(response.text)
page_data = response_data['data']['totalList']
totalPage = response_data['data']['totalPage']
if len(page_data) == 0:
print('工作定义列表为空,退出循环...')
break
all_dags.extend(page_data)
if pageNo >= totalPage:
break
pageNo += 1
return all_dags

def get_version_detail(project_code,dag_code,current_version):
del_version = []
pageNo = 1
while True:
#if pageNo <= 1:
# print('获取工作定义的版本信息列表,pageNo 必须大于1!!!')
# break
url = "{base_url}/dolphinscheduler/projects/{project_code}/process-definition/{dag_code}/versions?searchVal=&pageSize=20&pageNo={pageNo}".format(project_code=project_code,dag_code=dag_code,pageNo=pageNo,base_url=base_url)
response = requests.request("GET", url, headers=headers, data=payload)
response_data = json.loads(response.text)
page_data = response_data['data']['totalList']
totalPage = response_data['data']['totalPage']
if len(page_data) == 0:
print('version列表为空,退出循环...')
break
for page in page_data:
version = int(page['version'])
# 保留近keep_version个版本
if version + keep_version <= current_version:
del_version.append(version)
if pageNo >= totalPage:
break
pageNo += 1
return del_version

def delete(project_code,dag_code,version):
print(' Now deleting: ',project_code,dag_code,version)
url = "{base_url}/dolphinscheduler/projects/{project_code}/process-definition/{dag_code}/versions/{version}".format(project_code=project_code,dag_code=dag_code,version=version,base_url=base_url)
response = requests.request("DELETE", url, headers=headers, data=payload)
print(' ' + response.text)

if __name__ == '__main__':
projects = get_project_list()
for project in projects:
project_code = project['code']
print('Project:'+ str(project_code) + project['name'])
all_dags = get_definition_detail(project_code)
for dag in all_dags:
dag_code = dag['code']
current_version = dag['version']
if current_version > keep_version:
print(str(dag_code)+' ' + str(current_version) + ' ' +dag['name'])
del_version = get_version_detail(project_code,dag_code,current_version)
print(del_version)
if(len(del_version)>0):
a=input() # 直接回车则停止
for v in del_version:
delete(project_code,dag_code,v)

清理历史工作流

http://host:12345/dolphinscheduler/doc.html?language=zh_CN&lang=cn

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import io
import subprocess
import requests
import json
import time
import datetime

# 配置信息: ip 端口 token自行修改
base_url = 'http://xxx:12345'
token = 'xxx'
payload={}
headers = {
'Connection': 'keep-alive',
'Accept': 'application/json, text/plain, */*',
'language': 'zh_CN',
'token':token
}

# 获取项目列表
def get_project_list():
url = "{base_url}/dolphinscheduler/projects?pageSize=100&pageNo=1&searchVal=".format(base_url=base_url)
response = requests.request("GET", url, headers=headers, data=payload)
response_data = json.loads(response.text)
projects = response_data['data']['totalList']
for project in projects:
print(project['name'], project['code'])
return projects

def get_process_instances(project_code,date_before):
del_id = []
pageNo = 1
while True:
url = "{base_url}/dolphinscheduler/projects/{project_code}/process-instances?pageSize=200&pageNo={pageNo}&searchVal=&endDate={date_before}%2000:00:00&startDate=2022-01-01%2000:00:00".format( base_url=base_url,project_code=project_code,pageNo=pageNo,date_before=date_before)
response = requests.request("GET", url, headers=headers, data=payload)
response_data = json.loads(response.text)
page_data = response_data['data']['totalList']
totalPage = response_data['data']['totalPage']
if len(page_data) == 0:
print('列表为空,退出循环...')
break
for item in page_data:
#print(item['endTime'], item['id'])
del_id.append(str(item['id']))
if pageNo >= totalPage:
break
pageNo += 1
return del_id

def delete_process_instances(project_code,id):
print(' Now deleting: ',project_code,id)
url = "{base_url}/dolphinscheduler/projects/{project_code}/process-instances/{id}".format(project_code=project_code,id=id,base_url=base_url)
response = requests.request("DELETE", url, headers=headers, data=payload)
print(' ' + response.text)

# 手动
# project_code=''
# del_id=get_process_instances(project_code,'2024-03-01')
# print(len(del_id))
# for id in del_id:
# delete_process_instances(project_code,id)


# batch
def batch_delete(del_id):
url = "{base_url}/dolphinscheduler/projects/{project_code}/process-instances/batch-delete".format(project_code=project_code,base_url=base_url)
response = requests.request("POST", url, headers=headers, data={'processInstanceIds':",".join(del_id)}) # 逗号分割
print(' ' + response.text)


if __name__ == '__main__':
projects = get_project_list()
for project in projects:
project_code = project['code']
print(str(project_code) + project['name'])
del_id=get_process_instances(project_code,'2024-01-01')
print(len(del_id))
if(len(del_id)>0):
a=input() # 直接回车则停止
batch_delete(del_id)

3.2.2 部署

初始化数据库

1
2
3
4
5
cp mysql-connector-java-8.0.16.jar alert-server/libs/
cp mysql-connector-java-8.0.16.jar api-server/libs/
cp mysql-connector-java-8.0.16.jar master-server/libs/
cp mysql-connector-java-8.0.16.jar worker-server/libs/
cp mysql-connector-java-8.0.16.jar tools/libs/
1
2
3
4
5
6
7
8
mysql -uroot -p

CREATE DATABASE dolphinscheduler DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci;

GRANT ALL PRIVILEGES ON dolphinscheduler.* TO '{user}'@'%' IDENTIFIED BY '{password}';
GRANT ALL PRIVILEGES ON dolphinscheduler.* TO '{user}'@'localhost' IDENTIFIED BY '{password}';

flush privileges;

配置

1
2
3
4
5
6
7
8
9
10
11
export DATABASE=${DATABASE:-mysql}
export SPRING_PROFILES_ACTIVE=${DATABASE}
export SPRING_DATASOURCE_URL="jdbc:mysql://xxx:3306,xxx:3306/ds?useUnicode=true&characterEncoding=UTF-8&useSSL=false"
export SPRING_DATASOURCE_USERNAME=xx
export SPRING_DATASOURCE_PASSWORD=xx


#zk
export REGISTRY_TYPE=${REGISTRY_TYPE:-zookeeper}
export REGISTRY_ZOOKEEPER_CONNECT_STRING="xx:2181,x:2181"
export REGISTRY_ZOOKEEPER_NAMESPACE=ds
1
2
3
4
5
6
7
8
resource.storage.type=HDFS
resource.storage.upload.base.path=/dolphinscheduler2
resource.hdfs.root.user=hadoop
resource.hdfs.fs.defaultFS=hdfs://bicluster
## copy core-site.xml and hdfs-site.xml to conf dir
resource.manager.httpaddress.port=8088
yarn.resourcemanager.ha.rm.ids=10.27.48.1,10.27.48.7
sudo.enable=false
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
cp common.properties alert-server/conf/common.properties
cp common.properties api-server/conf/common.properties
cp common.properties master-server/conf/common.properties
cp common.properties standalone-server/conf/common.properties
cp common.properties tools/conf/common.properties
cp common.properties worker-server/conf/common.properties

cp /data/soft/hadoop/etc/hadoop/hdfs-site.xml alert-server/conf/
cp /data/soft/hadoop/etc/hadoop/hdfs-site.xml api-server/conf/
cp /data/soft/hadoop/etc/hadoop/hdfs-site.xml master-server/conf/
cp /data/soft/hadoop/etc/hadoop/hdfs-site.xml standalone-server/conf/
cp /data/soft/hadoop/etc/hadoop/hdfs-site.xml tools/conf/
cp /data/soft/hadoop/etc/hadoop/hdfs-site.xml worker-server/conf/

cp /data/soft/hadoop/etc/hadoop/core-site.xml alert-server/conf/
cp /data/soft/hadoop/etc/hadoop/core-site.xml api-server/conf/
cp /data/soft/hadoop/etc/hadoop/core-site.xml master-server/conf/
cp /data/soft/hadoop/etc/hadoop/core-site.xml standalone-server/conf/
cp /data/soft/hadoop/etc/hadoop/core-site.xml tools/conf/
cp /data/soft/hadoop/etc/hadoop/core-site.xml worker-server/conf/

worker

1
2
worker.tenant-config.auto-create-tenant-enabled = false
worker.tenant-config.default-tenant-enabled = true

1235 trace漏洞

修改worker的application.yaml

1
2
server:
port: -1

历史数据表情况

1
2
3
4
5
6
7
8
use dolphinscheduler
SELECT
TABLE_NAME AS '表名',
TABLE_ROWS AS '行数'
FROM
information_schema.TABLES
WHERE
TABLE_SCHEMA = DATABASE();
0%