dolphinscheduler Site
Is:分布式 工作流 调度平台,带DAG可视化
For:复杂任务依赖
去中心化(动态主,zk选)
DolphinScheduler本身不依赖Hadoop、Hive、Spark,仅是会调用他们的Client,用于对应任务的提交
基于cron表达式调度
quartz 分布式调度器
调度情况报表 t_ds_process_instance关键字段
name(流程名+时间戳)
process_definition_id (可关联得流程名)
state (流程实例状态:0 提交成功,1 正在运行,2 准备暂停,3 暂停,4 准备停止,5 停止,6 失败,7 成功,8 需要容错,9 kill,10 等待线程,11 等待依赖完成)
start_time
end_time
select from dolphinscheduler.t_ds_process_instance group by process_definition_id
日志检查 页面操作上下线关键字 SchedulerService
grep SchedulerService dolphinscheduler-api.2021-10-26_14.0.log
升级操作 停止服务
数据库升级 create-dolphinscheduler.sh
解压替换
OR修改它的部署脚本 core-site.xml hdfs-site.xml
配置等:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 修改 install_config.conf的数据库部分 application-mysql.yaml registry.properties common.properties 保留 start.sh stop.sh lib/mysql-connector-java-5.1.47.jar script/alert-script.sh script/spark-test-1.0-SNAPSHOT.jar conf/hdfs-site.xml conf/core-site.xml
打包分发:
1 2 3 4 5 6 7 8 9 10 11 stop.sh rm -rf ./pid ./logs tar czf ds201.tgz dolphinscheduler-2.0.1 mv ds201.tgz /data/tmp chmod 755 /data/tmp/ds201.tgz exit sudo su hadoop cd /data/tmp hdfs dfs -put ds201.tgz / hdfs dfs -chmod 755 /ds201.tgz
其他节点:
1 2 3 4 5 6 7 8 9 10 cd /data/soft rm dolphinscheduler hdfs dfs -get /ds201.tgz tar xf ds201.tgz rm ds201.tgz ln -s dolphinscheduler-2.0.1 dolphinscheduler chmod 755 -R dolphinscheduler-2.0.1 [改start/stop]
告警配置 {“path”:”/data/soft/dolphinscheduler/script/alert-script.sh” ,”userParams”:”138xxxxxxx”,”type”:”SHELL”}
配置样例增加 #registry.namespace=dolphinscheduler
curl -s “http://10.3.4.191:8321/sms/getReport?content=message&mobilesStr=19802021823 “
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 while getopts t:c:p: opts; do case $opts in t) t=$OPTARG ;; c) c=$OPTARG ;; p) p=$OPTARG ;; ?) ;; esac done # Write your specific logic here # Set the exit code according to your execution result, and alert needs to use it to judge the status of this alarm result if [ "$t" = "error msg title" ] then exit 12 fi echo '' echo ' title='$t echo ' content='$c echo ' mobilesStr='$p #title=`echo $t |tr -d '\n' |od -An -tx1|tr ' ' %| tr -d '\n'` #content=`echo $c |tr -d '"' |tr -d '\n' |od -An -tx1|tr ' ' %| tr -d '\n' ` #msg=$title'-'$content #msgout=${msg:0:250} msgout=`java -cp /data/soft/dolphinscheduler/script/spark-test-1.0-SNAPSHOT.jar ds.GetAlertMsg "${t}" "${c}"` echo '>>> http://xxx:8321/sms/getReport?content='$msgout'&mobilesStr='$p curl -s 'http://xxx:8321/sms/getReport?content='$msgout'&mobilesStr='$p exit 0 exit 0
部署配置项2.0.1 复制mysql jar包到lib
数据源-installconfig
1 2 3 4 DATABASE_TYPE=mysql SPRING_DATASOURCE_URL=jdbc:mysql://127.0.0.1:3306/dolphinscheduler SPRING_DATASOURCE_USERNAME=ds SPRING_DATASOURCE_PASSWORD=123
连接mysql,创建数据库和用户
初始化数据库 ./script/create-dolphinscheduler.sh
application-mysql.yaml 待确认是否需要配置
copy core-site.xml and hdfs-site.xml to conf dir
common.property
1 2 3 4 5 6 7 8 9 10 11 12 data.basedir.path=/tmp/dolphinscheduler resource.storage.type=HDFS resource.upload.path=/dolphinscheduler hdfs.root.user=hadoop fs.defaultFS=hdfs://localhost:9000 resource.manager.httpaddress.port=8088 yarn.resourcemanager.ha.rm.ids= yarn.application.status.address yarn.job.history.status.address sudo.enable=false
registry.properties
部署用户权限问题 1.创建部署用户,该部署用户必须要有sudo免密权限/etc/sudoers -> 使得可以用租户身份执行任务
2.修改部署用户的~/.bashrc添加umask 022 -> 使得相关文件可以被租户访问,否则任务无法执行
sudo su dolphinscheduler
vi ~/.bashrc
3.解压安装包后修改./bin/dolphinscheduler-daemon.sh再此添加umask 022 -> 该脚本source了/etc/profile,需要覆盖配置项
4.conf/env/dolphinscheduler_env.sh 需要对租户可读
1 2 3 4 5 [INFO] 2021-05-18 15:52:46.655 - [taskAppId=TASK-2-2-2]:[129] - -> hadoop is not in the sudoers file. This incident will be reported. [INFO] 2021-05-19 09:25:40.521 - [taskAppId=TASK-2-4-5]:[129] - -> root is not in the sudoers file. This incident will be reported.
任务执行服务是以 sudo -u {linux-user} 切换不同linux用户的方式来实现多租户运行作业,所以部署用户需要有 sudo 权限,而且是免密的。
sudo权限 https://mp.weixin.qq.com/s/5rRWMoT0DLMcOdDl-mrsRQ
可以通过以下方式限制可切换到的用户,比如可以设置为只能切到hadoop用户来执行任务
echo ‘dolphinscheduler ALL=(userA,userB,userC) NOPASSWD:ALL’ >> /etc/sudoers
注意:后续增加租户的时候则要同步修改此处配置
通配符和取消命令
eg:papi ALL=/usr/sbin/,/sbin/ ,!/usr/sbin/fdisk
租户对应的是Linux的用户,用于worker提交作业所使用的用户。如果linux没有这个用户,worker会在执行脚本的时候创建这个用户。
租户编码:租户编码是Linux上的用户,唯一,不能重复
在任务执行时,可以将任务分配给指定worker分组,最终由该组中的worker节点执行该任务
一键部署分析 读入./conf/config/install_config.conf 修改配置 远程复制 修改权限
数据库升级分析 修改schema/data ./sql/sql/upgrade/2.0.1_schema/mysql/ 进入mysql,source SQL脚本文件
升级版本号 update t_ds_version set version = 2.0.1
修改./conf/config/install_config.conf
手动部署 1.解压。部署目录 参考其他软件目录:/data
chown dolphinscheduler apache-dolphinscheduler-1.3.6-bin.tar.gz tar -zxvf apache-dolphinscheduler-1.3.6-bin.tar.gz -C /data/ sudo chown -R dolphinscheduler:dolphinscheduler dolphinscheduler sudo chmod 755 -R dolphinscheduler
2.上传mysql-connector-java到DolphinScheduler的lib目录
3.初始化mysql表(建表、配权限、导数据)
Server version: 5.7.30 MySQL Community Server (GPL)
1 2 3 4 5 6 7 8 9 10 11 mysql -h192.168.xx.xx -P3306 -uroot -p CREATE DATABASE dolphinscheduler DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci; GRANT ALL PRIVILEGES ON dolphinscheduler.* TO 'dolphinscheduler'@'%' IDENTIFIED BY 'xxx'; GRANT ALL PRIVILEGES ON dolphinscheduler.* TO 'dolphinscheduler'@'localhost' IDENTIFIED BY 'xxx'; flush privileges;
登录126,切部署用户
vi conf/datasource.properties
注释掉 PostgreSQL 相关配置,配置mysql数据库连接相关信息:
1 2 3 4 5 6 7 8 9 spring.datasource.driver-class-name=com.mysql.jdbc.Driver spring.datasource.url=jdbc:mysql://10.3.4.194:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8 spring.datasource.username=xxx spring.datasource.password=xxx
sh script/create-dolphinscheduler.sh
执行 script 目录下的创建表及导入基础数据脚本
4.修改运行参数,添加各调用组件路径
(可以先全部注释掉)
conf/env/dolphinscheduler_env.sh
> export PYTHON_HOME=/usr/bin/python
#this is fixed after 1.3.6. now need to set python binary path
5.由于没有ssh,需按照install.sh手动执行部署
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 https://www.yiibai.com/zookeeper/zookeeper_installation.html https://cloud.tencent.com/developer/article/1458839 修改conf/common.properties data.basedir.path=/tmp/dolphinscheduler resource.storage.type=HDFS resource.upload.path=/dolphinscheduler hdfs.root.user=hadoop fs.defaultFS=hdfs://LGJF-ZYC6-HCY-SVR553:9000 yarn.resourcemanager.ha.rm.ids= yarn.application.status.address=http://10.17.41.126:8088/ws/v1/cluster/apps/%s 集群配置其他须知: #NameNode启用了HA的话,还需要将hadoop的配置文件core-site.xml和hdfs-site.xml放到安装路径的conf目录下 # 如果ResourceManager是HA,则配置为ResourceManager节点的主备ip或者hostname,比如"192.168.xx.xx,192.168.xx.xx"; # 如果是单ResourceManager请配置yarnHaIps=""即可 yarn.resourcemanager.ha.rm.ids="192.168.xx.xx,192.168.xx.xx" yarn.resourcemanager.ha.rm.ids="" # 如果是单ResourceManager,请配置真实的ResourceManager主机名或者ip.否则保持默认值即可:yarn.application.status.address=http://10.17.41.126:8088/ws/v1/cluster/apps/%s 由于191集群有访问验证,当前通过白名单对集群服务器免密,需要配置如下参数:
yarn.resourcemanager.ha.rm.ids=10.3.4.191,10.17.41.21
yarn.application.status.address=http://ds1:8089/ws/v1/cluster/apps/%s
resource.manager.httpaddress.port=8089
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 测试命令`curl http://10.3.4.191:8089/ws/v1/cluster/apps/application_1622165044791_10946` 修改conf/zookeeper.properties zookeeper.quorum=localhost:2181 zookeeper.dolphinscheduler.root=/dolphinscheduler 修改conf/alert.properties [目前跳过] 修改conf/worker.properties [目前跳过,使用默认组] 目录bin conf lib script sql ui下,该节点对应的组 worker.groups=default
拷贝到其他服务器
dolphin各节点身份是靠worker.properties的group记录
6.启停命令
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 sh ./bin/dolphinscheduler-daemon.sh start master-server sh ./bin/dolphinscheduler-daemon.sh start worker-server sh ./bin/dolphinscheduler-daemon.sh start api-server sh ./bin/dolphinscheduler-daemon.sh start logger-server sh ./bin/dolphinscheduler-daemon.sh start alert-server sh ./bin/dolphinscheduler-daemon.sh stop master-server sh ./bin/dolphinscheduler-daemon.sh stop worker-server sh ./bin/dolphinscheduler-daemon.sh stop api-server sh ./bin/dolphinscheduler-daemon.sh stop logger-server sh ./bin/dolphinscheduler-daemon.sh stop alert-server
7.页面访问
http://10.17.41.126:12345/dolphinscheduler
管理员 admin dolphinscheduler123
用户 tester tester123
sudo su dolphinscheduler
8.添加worker节点
配置jdk
解压安装文件
复制mysql jar包
复制配置文件datasource、zookeeper、common、dolphinscheduler_env(按需修正)
是否worker master是看是否启动对应进程。
根据start-all启动脚本可知,worker只需要启动worker和logger
启停 1 2 3 4 5 6 7 8 9 10 11 sh ./bin/dolphinscheduler-daemon.sh stop master-server sh ./bin/dolphinscheduler-daemon.sh stop worker-server sh ./bin/dolphinscheduler-daemon.sh stop api-server sh ./bin/dolphinscheduler-daemon.sh stop logger-server sh ./bin/dolphinscheduler-daemon.sh stop alert-server sh ./bin/dolphinscheduler-daemon.sh start master-server sh ./bin/dolphinscheduler-daemon.sh start worker-server sh ./bin/dolphinscheduler-daemon.sh start api-server sh ./bin/dolphinscheduler-daemon.sh start logger-server sh ./bin/dolphinscheduler-daemon.sh start alert-server
日期参数配置 2.0.2 Remove “+1” (days) in complement dates
其中日期格式支持自定义DateTimeFormatter,如yyyy-MM-dd
后 N 年:$[add_months(yyyyMMdd,12*N)]
前 N 年:$[add_months(yyyyMMdd,-12*N)]
后 N 月:$[add_months(yyyyMMdd,N)]
前 N 月:$[add_months(yyyyMMdd,-N)]
后 N 周:$[yyyyMMdd+7*N]
前 N 周:$[yyyyMMdd-7*N]
后 N 天:$[yyyyMMdd+N]
前 N 天:$[yyyyMMdd-N]
后 N 小时:$[HHmmss+N/24]
前 N 小时:$[HHmmss-N/24]
后 N 分钟:$[HHmmss+N/24/60]
前 N 分钟:$[HHmmss-N/24/60]
${system.biz.date} : 日常调度实例定时的定时时间前一天,格式为 yyyyMMdd,补数据时,该日期 +1 ${system.biz.curdate} : 日常调度实例定时的定时时间,格式为 yyyyMMdd,补数据时,该日期 +1
${system.datetime} : 日常调度实例定时的定时时间,格式为 yyyyMMddHHmmss,补数据时,该日期 +1
参考调度样例 http://10.17.41.126:12345/dolphinscheduler/ui/#/projects/definition/list/14
比如azkaban中的$(new(“org.joda.time.DateTime”).minusDays(1).toString(“yyyy-MM-dd”))可以$[yyyy-MM-dd-1]
参数传递 原理是通过字符串打印出指定模式字符后设置参数
1 2 3 4 5 6 7 8 上游-需配置output: tmppath=/data/bdoc/ob_data tmporcpath=${tmppath}/data/ABLUM_SAAS_PRO/TABLE echo '${setValue(tmp='$tmporcpath')}' 下游 echo ${tmp}
指定绑定的ip common.properties
dolphin.scheduler.network.interface.preferred=网卡显示名称
源码 启动 org.apache.dolphinscheduler.server.master.MasterServer
MasterServer 主要成员 ZKMasterClient
MasterSchedulerService 核心服务
循环检测主节点的内存和CPU资源。过载或内存过低,跳过调度执行1s
zk状态正常则开始调度scheduleProcess
加锁zk后取command。getOneToRun
创建ProcessInstance,并启动线程MasterExecThread执行(有最大线程数限制)
线程创建成功,数据库存储saveProcessInstance、删除command
MasterExecThread#executeProcess
prepareProcess
runProcess
循环至finish/stop
超时检测
遍历运行节点MasterTaskExecThread[提交到DB并循环检测状态],检测task在worker结束后处理任务状态和队列
提交新任务submitStandByTask
更新工作流状态 updateProcessInstanceState
endProcess
netty注册处理类:
TaskResponseProcessor
TaskAckProcessor
TaskKillResponseProcessor
WorkerServer TaskExecuteProcessor
HDFS上的user目录 HDFS路径:/dolphinscheduler/hadoop/home
源码相关函数:getHdfsUserDir
问题:该目录只用在创建用户、更新用户和删除用户,没有实质作用
api变量参数启动任务 变量替换脚本写法如下,msg变量调用使用:”‘“变量”‘“
1 2 3 4 5 6 7 8 9 10 11 12 13 date="20231008" site="gz" server="10.3.68.166" tablesuffix=1 curl -i -X POST "http://10.27.48.1:12345/dolphinscheduler/projects/8653695427776/executors/start-process-instance" -H "Request-Origion:SwaggerBootstrapUi" -H "accept:*/*" -H "token:xxx" -H "Content-Type:application/x-www-form-urlencoded;charset=UTF-8" -d "failureStrategy=END" -d "processDefinitionCode=11174492918720" -d "processInstancePriority=MEDIUM" -d "scheduleTime= " -d "warningGroupId=100" -d "warningType=NONE" -d "execType=START_PROCESS" -d "runMode=RUN_MODE_SERIAL" -d 'startParams={"date":"'"${date}"'","server":"'"${server}"'","site":"'"${site}"'","tablesuffix":"'"${tablesuffix}"'"}' for i in {2..500}; do tablesuffix=$i echo $site $tablesuffix sleep 60; done
查询工作节点 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 SELECT p.name AS project_name,p.code, pro.name AS process_name,pro.code, t.name AS task_name,t.code FROM ( SELECT code, name, project_code FROM dolphinscheduler.t_ds_task_definition WHERE task_params LIKE '%10.17.41.127%' ) t LEFT JOIN dolphinscheduler.t_ds_project p ON p.code = t.project_code LEFT JOIN dolphinscheduler.t_ds_process_definition pro ON pro.locations like concat('%' ,t.code,'%' )
API获得节点状态 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 str=$(curl "http://10.17.41.129:12345/dolphinscheduler/monitor/master/list" -H 'token:xxx') 返回值 str=' {"code":0,"msg":"success","data":[{"id":130615,"host":"10.17.41.129","port":5678,"zkDirectory":"/dolphinscheduler/nodes/master/10.17.41.129:5678","resInfo":"{\"cpuUsage\":0.53,\"loadAverage\":6.53,\"memoryUsage\":0.7}","createTime":"2021-06-21T17:38:20.000+0800","lastHeartbeatTime":"2021-07-19T10:49:51.000+0800"},{"id":10348,"host":"10.17.41.130","port":5678,"zkDirectory":"/dolphinscheduler/nodes/master/10.17.41.130:5678","resInfo":"{\"cpuUsage\":0.06,\"loadAverage\":0.76,\"memoryUsage\":0.48}","createTime":"2021-07-07T17:11:24.000+0800","lastHeartbeatTime":"2021-07-19T10:49:54.000+0800"}]} ' {"code":0,"msg":"success","data":[{"id":125264,"host":"10.17.41.129","port":1234,"zkDirectories":["/dolphinscheduler/nodes/worker/default/10.17.41.129:1234"],"resInfo":"{\"cpuUsage\":0.24,\"loadAverage\":4.08,\"memoryUsage\":0.74}","createTime":"2021-07-14 14:50:53","lastHeartbeatTime":"2021-07-19 11:22:33"},{"id":86514,"host":"10.3.4.194","port":1234,"zkDirectories":["/dolphinscheduler/nodes/worker/default/10.3.4.194:1234"],"resInfo":"{\"cpuUsage\":0.05,\"loadAverage\":1.77,\"memoryUsage\":0.55}","createTime":"2021-07-14 14:11:30","lastHeartbeatTime":"2021-07-19 11:22:31"},{"id":55682,"host":"10.17.41.130","port":1234,"zkDirectories":["/dolphinscheduler/nodes/worker/default/10.17.41.130:1234"],"resInfo":"{\"cpuUsage\":0.03,\"loadAverage\":1.01,\"memoryUsage\":0.47}","createTime":"2021-07-14 14:51:32","lastHeartbeatTime":"2021-07-19 11:22:33"},{"id":53055,"host":"10.3.4.191","port":1234,"zkDirectories":["/dolphinscheduler/nodes/worker/default/10.3.4.191:1234"],"resInfo":"{\"cpuUsage\":0.02,\"loadAverage\":0.18,\"memoryUsage\":0.42}","createTime":"2021-07-15 16:59:24","lastHeartbeatTime":"2021-07-19 11:22:35"},{"id":105568,"host":"10.17.41.132","port":1234,"zkDirectories":["/dolphinscheduler/nodes/worker/default/10.17.41.132:1234"],"resInfo":"{\"cpuUsage\":0.44,\"loadAverage\":8.29,\"memoryUsage\":0.46}","createTime":"2021-07-19 10:30:21","lastHeartbeatTime":"2021-07-19 11:22:31"},{"id":34498,"host":"10.17.41.133","port":1234,"zkDirectories":["/dolphinscheduler/nodes/worker/default/10.17.41.133:1234"],"resInfo":"{\"cpuUsage\":0.43,\"loadAverage\":8.21,\"memoryUsage\":0.63}","createTime":"2021-07-14 14:52:25","lastHeartbeatTime":"2021-07-19 11:22:36"},{"id":9901,"host":"10.17.41.131","port":1234,"zkDirectories":["/dolphinscheduler/nodes/worker/default/10.17.41.131:1234"],"resInfo":"{\"cpuUsage\":0.44,\"loadAverage\":8.3,\"memoryUsage\":0.61}","createTime":"2021-07-19 10:29:24","lastHeartbeatTime":"2021-07-19 11:22:34"}]} if [[ $str =~ '10.17.41.129' ]] then echo "包含" else echo "不包含" fi
清理历史版本定义 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 import ioimport subprocessimport requestsimport jsonimport timeimport datetimebase_url = 'http://xxx:12345' token = 'xxx' keep_version=3 payload={} headers = { 'Connection' : 'keep-alive' , 'Accept' : 'application/json, text/plain, */*' , 'language' : 'zh_CN' , 'token' :token } def get_project_list (): url = "{base_url}/dolphinscheduler/projects?pageSize=100&pageNo=1&searchVal=" .format (base_url=base_url) response = requests.request("GET" , url, headers=headers, data=payload) response_data = json.loads(response.text) projects = response_data['data' ]['totalList' ] for project in projects: print (project['name' ], project['code' ]) return projects def get_definition_detail (project_code ): all_dags = [] pageNo = 1 while True : url = "{base_url}/dolphinscheduler/projects/{project_code}/process-definition?searchVal=&pageSize=50&pageNo={pageNo}" .format (project_code=project_code,pageNo=pageNo,base_url=base_url) response = requests.request("GET" , url, headers=headers, data=payload) response_data = json.loads(response.text) page_data = response_data['data' ]['totalList' ] totalPage = response_data['data' ]['totalPage' ] if len (page_data) == 0 : print ('工作定义列表为空,退出循环...' ) break all_dags.extend(page_data) if pageNo >= totalPage: break pageNo += 1 return all_dags def get_version_detail (project_code,dag_code,current_version ): del_version = [] pageNo = 1 while True : url = "{base_url}/dolphinscheduler/projects/{project_code}/process-definition/{dag_code}/versions?searchVal=&pageSize=20&pageNo={pageNo}" .format (project_code=project_code,dag_code=dag_code,pageNo=pageNo,base_url=base_url) response = requests.request("GET" , url, headers=headers, data=payload) response_data = json.loads(response.text) page_data = response_data['data' ]['totalList' ] totalPage = response_data['data' ]['totalPage' ] if len (page_data) == 0 : print ('version列表为空,退出循环...' ) break for page in page_data: version = int (page['version' ]) if version + keep_version <= current_version: del_version.append(version) if pageNo >= totalPage: break pageNo += 1 return del_version def delete (project_code,dag_code,version ): print (' Now deleting: ' ,project_code,dag_code,version) url = "{base_url}/dolphinscheduler/projects/{project_code}/process-definition/{dag_code}/versions/{version}" .format (project_code=project_code,dag_code=dag_code,version=version,base_url=base_url) response = requests.request("DELETE" , url, headers=headers, data=payload) print (' ' + response.text) if __name__ == '__main__' : projects = get_project_list() for project in projects: project_code = project['code' ] print ('Project:' + str (project_code) + project['name' ]) all_dags = get_definition_detail(project_code) for dag in all_dags: dag_code = dag['code' ] current_version = dag['version' ] if current_version > keep_version: print (str (dag_code)+' ' + str (current_version) + ' ' +dag['name' ]) del_version = get_version_detail(project_code,dag_code,current_version) print (del_version) if (len (del_version)>0 ): a=input () for v in del_version: delete(project_code,dag_code,v)
清理历史工作流 http://host:12345/dolphinscheduler/doc.html?language=zh_CN&lang=cn
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 import ioimport subprocessimport requestsimport jsonimport timeimport datetimebase_url = 'http://xxx:12345' token = 'xxx' payload={} headers = { 'Connection' : 'keep-alive' , 'Accept' : 'application/json, text/plain, */*' , 'language' : 'zh_CN' , 'token' :token } def get_project_list (): url = "{base_url}/dolphinscheduler/projects?pageSize=100&pageNo=1&searchVal=" .format (base_url=base_url) response = requests.request("GET" , url, headers=headers, data=payload) response_data = json.loads(response.text) projects = response_data['data' ]['totalList' ] for project in projects: print (project['name' ], project['code' ]) return projects def get_process_instances (project_code,date_before ): del_id = [] pageNo = 1 while True : url = "{base_url}/dolphinscheduler/projects/{project_code}/process-instances?pageSize=200&pageNo={pageNo}&searchVal=&endDate={date_before}%2000:00:00&startDate=2022-01-01%2000:00:00" .format ( base_url=base_url,project_code=project_code,pageNo=pageNo,date_before=date_before) response = requests.request("GET" , url, headers=headers, data=payload) response_data = json.loads(response.text) page_data = response_data['data' ]['totalList' ] totalPage = response_data['data' ]['totalPage' ] if len (page_data) == 0 : print ('列表为空,退出循环...' ) break for item in page_data: del_id.append(str (item['id' ])) if pageNo >= totalPage: break pageNo += 1 return del_id def delete_process_instances (project_code,id ): print (' Now deleting: ' ,project_code,id ) url = "{base_url}/dolphinscheduler/projects/{project_code}/process-instances/{id}" .format (project_code=project_code,id =id ,base_url=base_url) response = requests.request("DELETE" , url, headers=headers, data=payload) print (' ' + response.text) def batch_delete (del_id ): url = "{base_url}/dolphinscheduler/projects/{project_code}/process-instances/batch-delete" .format (project_code=project_code,base_url=base_url) response = requests.request("POST" , url, headers=headers, data={'processInstanceIds' :"," .join(del_id)}) print (' ' + response.text) if __name__ == '__main__' : projects = get_project_list() for project in projects: project_code = project['code' ] print (str (project_code) + project['name' ]) del_id=get_process_instances(project_code,'2024-01-01' ) print (len (del_id)) if (len (del_id)>0 ): a=input () batch_delete(del_id)
3.2.2 部署 初始化数据库 1 2 3 4 5 cp mysql-connector-java-8.0.16.jar alert-server/libs/ cp mysql-connector-java-8.0.16.jar api-server/libs/ cp mysql-connector-java-8.0.16.jar master-server/libs/ cp mysql-connector-java-8.0.16.jar worker-server/libs/ cp mysql-connector-java-8.0.16.jar tools/libs/
1 2 3 4 5 6 7 8 mysql -uroot -p CREATE DATABASE dolphinscheduler DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci; GRANT ALL PRIVILEGES ON dolphinscheduler.* TO '{user}'@'%' IDENTIFIED BY '{password}'; GRANT ALL PRIVILEGES ON dolphinscheduler.* TO '{user}'@'localhost' IDENTIFIED BY '{password}'; flush privileges;
配置 1 2 3 4 5 6 7 8 9 10 11 export DATABASE=${DATABASE:-mysql} export SPRING_PROFILES_ACTIVE=${DATABASE} export SPRING_DATASOURCE_URL="jdbc:mysql://xxx:3306,xxx:3306/ds?useUnicode=true&characterEncoding=UTF-8&useSSL=false" export SPRING_DATASOURCE_USERNAME=xx export SPRING_DATASOURCE_PASSWORD=xx #zk export REGISTRY_TYPE=${REGISTRY_TYPE:-zookeeper} export REGISTRY_ZOOKEEPER_CONNECT_STRING="xx:2181,x:2181" export REGISTRY_ZOOKEEPER_NAMESPACE=ds
1 2 3 4 5 6 7 8 resource.storage.type=HDFS resource.storage.upload.base.path=/dolphinscheduler2 resource.hdfs.root.user=hadoop resource.hdfs.fs.defaultFS=hdfs://bicluster ## copy core-site.xml and hdfs-site.xml to conf dir resource.manager.httpaddress.port=8088 yarn.resourcemanager.ha.rm.ids=10.27.48.1,10.27.48.7 sudo.enable=false
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 cp common.properties alert-server/conf/common.properties cp common.properties api-server/conf/common.properties cp common.properties master-server/conf/common.properties cp common.properties standalone-server/conf/common.properties cp common.properties tools/conf/common.properties cp common.properties worker-server/conf/common.properties cp /data/soft/hadoop/etc/hadoop/hdfs-site.xml alert-server/conf/ cp /data/soft/hadoop/etc/hadoop/hdfs-site.xml api-server/conf/ cp /data/soft/hadoop/etc/hadoop/hdfs-site.xml master-server/conf/ cp /data/soft/hadoop/etc/hadoop/hdfs-site.xml standalone-server/conf/ cp /data/soft/hadoop/etc/hadoop/hdfs-site.xml tools/conf/ cp /data/soft/hadoop/etc/hadoop/hdfs-site.xml worker-server/conf/ cp /data/soft/hadoop/etc/hadoop/core-site.xml alert-server/conf/ cp /data/soft/hadoop/etc/hadoop/core-site.xml api-server/conf/ cp /data/soft/hadoop/etc/hadoop/core-site.xml master-server/conf/ cp /data/soft/hadoop/etc/hadoop/core-site.xml standalone-server/conf/ cp /data/soft/hadoop/etc/hadoop/core-site.xml tools/conf/ cp /data/soft/hadoop/etc/hadoop/core-site.xml worker-server/conf/
worker
1 2 worker.tenant-config.auto-create-tenant-enabled = false worker.tenant-config.default-tenant-enabled = true
1235 trace漏洞 修改worker的application.yaml
历史数据表情况 1 2 3 4 5 6 7 8 use dolphinscheduler SELECT TABLE_NAME AS '表名' , TABLE_ROWS AS '行数' FROM information_schema.TABLES WHERE TABLE_SCHEMA = DATABASE();