install
| 12
 3
 
 | export version="2.3.3"wget "https://archive.apache.org/dist/incubator/seatunnel/${version}/apache-seatunnel-incubating-${version}-bin.tar.gz"
 tar -xzvf "apache-seatunnel-incubating-${version}-bin.tar.gz"
 
 | 
Install connectors plugin
脚本安装
配置plugin_config指定所需插件
插件列表见${SEATUNNEL_HOME}/connectors/plugin-mapping.properties
如有不需要的插件,可到config/plugin_config修改自动安装的插件
sh bin/install_plugin.sh
| 12
 3
 4
 5
 6
 7
 8
 
 | |├─lib
 │  └─seatunnel-hadoop3-3.1.4-uber.jar
 ├─connectors
 │  ├─都放这里
 └─plugins
 └─jdbc(connector name)
 └─lib √  mysql-jdbc
 
 | 
手动安装
- 到 https://developer.aliyun.com/mvn/search 搜索下载jar包放入对应目录 - 
- 准确的artifact名称见connectors/plugin-mapping.properties,对应目录见开头
 
- hadoop基础包放到lib目录: seatunnel-hadoop3-3.1.4-uber-2.3.3-optional.jar - 
- mvn dependency:get  -Dtransitive=false -DgroupId=org.apache.seatunnel -Dclassifier=optional -DartifactId=seatunnel-hadoop3-3.1.4-uber -Dversion=2.3.3 -Ddest=./
 
- 常用的connectors-v2 放到- connectors/seatunnel目录
 - 
- connector-clickhouse-2.3.0.jar
- connector-file-hadoop-2.3.0.jar
- connector-file-sftp-2.3.0.jar  
- connector-file-ftp-2.3.0.jar
- connector-file-local-2.3.0.jar
- connector-jdbc-2.3.0.jar
- mvn dependency:get  -Dtransitive=false -DgroupId=org.apache.seatunnel -DartifactId=connector-jdbc -Dversion=2.3.3 -Ddest=./
 
- 依赖jar,如mysql/oracle的jar包- mysql-connector-java-xxx.jar``ojdbc6-xxx.jar放到- plugins/jdbc/lib/
 
| 12
 3
 
 | group = org.apache.seatunnelartifact = seatunnel-connector-*
 version = 2.3.3*
 
 | 
配置运行
配置文件
Hocon格式
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 
 | env {}
 
 source {
 // 支持多个源
 FakeSource {
 }
 }
 
 transform {
 }
 
 sink {
 Console {}
 }
 
 | 
when multiple sources and multiple sinks, use result_table_name and source_table_name configurations in each step to indicate input/output data
SQL-config
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 
 |   CREATE TABLE test1 WITH (
 'connector'='FakeSource',
 'schema' = '{
 fields {
 id = "int",
 name = "string",
 c_time = "timestamp"
 }
 }',
 'rows' = '[
 { fields = [21, "Eric", null], kind = INSERT },
 { fields = [22, "Andy", null], kind = INSERT }
 ]',
 'type'='source'
 );
 
 CREATE TABLE test09 AS SELECT id,name, CAST(null AS TIMESTAMP) AS c_time FROM test1;
 
 INSERT INTO test11 SELECT * FROM test09;
 
 CREATE TABLE test11
 WITH (
 'connector'='jdbc',
 'url' = 'jdbc:mysql://mysql-e2e:3306/seatunnel',
 'driver' = 'com.mysql.cj.jdbc.Driver',
 'user' = 'root',
 'password' = 'Abc!@#135_seatunnel',
 'generate_sink_sql' = 'true',
 'database' = 'seatunnel',
 'table' = 't_user',
 'type'='sink'
 );
 
 | 
配置spark环境变量
config/seatunnel-env.sh
SPARK_HOME=/data/soft/seatunnel/spark
创建任务文件
https://spark.apache.org/docs/latest/configuration.html#available-properties
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 
 | env {  job.mode = "BATCH"
 job.name = ${jobName}
 parallelism = 2
 }
 
 source {
 }
 
 transform {
 }
 
 sink {
 }
 
 
 | 
./bin/seatunnel.sh -c <this_config_file>  -i jobName='startjob'
执行测试任务
SeaTunnel Engine单机
org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient
./bin/seatunnel.sh  -m local –config  config/v2.batch.config.template
SeaTunnel Cluster daemon启动
org.apache.seatunnel.core.starter.seatunnel.SeaTunnelServer
SeaTunnel Engine分布式
./bin/seatunnel-cluster.sh
./bin/seatunnel-cluster.sh -d
./bin/stop-seatunnel-cluster.sh
./bin/seatunnel.sh –config  管理命令
spark引擎
启动类 org.apache.seatunnel.core.starter.spark.SparkStarter
| 12
 3
 4
 5
 6
 7
 
 | # 此类最后转化为spark-submit命令  主类SeatunnelSpark.class 执行类SparkTaskExecuteCommand,每个环节分别交由[Source/Sink/Transform]ExecuteProcessor完成
 ./bin/start-seatunnel-spark-3-connector-v2.sh \
 --master yarn \
 --deploy-mode client \
 --config  config/v2.batch.config.template \
 -i date=20230307
 
 | 
配置中待替换的参数还必须要有双引号,如”${date}”
常用任务模板
hdfs到本地
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 
 | env {job.name = "hdfs2local"
 job.mode = "BATCH"
 parallelism = 2
 }
 
 source {
 
 HdfsFile {
 fs.defaultFS = "hdfs://bicluster"
 path = "/hive/warehouse/ods.db/bmp_t_bms_service_prov/year=2023/month=04/day=01"
 file_format_type = "orc"
 read_columns = ["serviceid","provcode"]
 hdfs_site_path = "/data/soft/hadoop/etc/hadoop/hdfs-site.xml"
 parse_partition_from_path = yes
 result_table_name = "input_t"
 }
 }
 
 transform {
 Sql {
 source_table_name = "input_t"
 result_table_name = "input_t2"
 query = "select serviceid, concat(provcode, '_') as provcode, day from input_t"
 }
 }
 
 sink {
 LocalFile {
 source_table_name = "input_t2"
 path = "/data/tmp/test2"
 file_format  = "csv"
 sink_columns = ["serviceid","provcode","day"]
 field_delimiter = ","
 }
 }
 
 | 
要点:
- parse_partition_from_path 将分区字段添加到数据
- transform加工数据
jdbc到hdfs
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 
 | env {job.mode = "BATCH"
 spark.yarn.queue = spark
 spark.app.name = "SeaTunnel"
 spark.driver.cores = 1
 spark.driver.memory = 1g
 spark.executor.instances = 2
 spark.executor.cores = 5
 spark.executor.memory = 5g
 }
 
 source {
 JDBC {
 url = "jdbc:mysql://ip:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8"
 driver = "com.mysql.jdbc.Driver"
 user = "xxx"
 password = "xxx"
 query = "select * from t_ds_user"
 }
 }
 
 transform {
 
 }
 
 sink {
 Console {}
 }
 
 HdfsFile {
 fs.defaultFS = "hdfs://cluster"
 hdfs_site_path = "/data/soft/hadoop/etc/hadoop/hdfs-site.xml"
 path = "/tmp/test2"
 file_format = "orc"
 file_name_expression = "${transactionId}_${now}"
 filename_time_format = "yyyyMMdd"
 is_enable_transaction = true
 sink_columns = ["xx","xxx"]
 partition_by = ["xx"]
 partition_dir_expression = "${k0}=${v0}"
 is_partition_field_write_in_file = false
 }
 
 
 | 
jdbc到hdfs 多源
注意 目前仅spark&flink支持参数替换,seatunnel引擎不支持
只能串行执行,会启动parallelism个task,但实际工作是1个task
因此 多源数据合并需要使用transform模块,将不同源进行union
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 
 | basic_sql = "select userid,TO_TIMESTAMP(TO_CHAR(EFECTIVETIME,'yyyy-MM-dd HH24:mi:ss')...from ndmc.t_ai_user "
 
 env {
 job.mode = "BATCH"
 parallelism = 4
 spark.app.name = "user"
 spark.driver.cores = 1
 spark.driver.memory = 1g
 spark.executor.instances = 2
 spark.executor.cores = 5
 spark.executor.memory = 5g
 }
 
 source {
 JDBC {
 result_table_name = "t1"
 url = "jdbc:oracle:thin:@"
 driver = "oracle.jdbc.OracleDriver"
 user = ""
 password = ""
 query = ${basic_sql}
 }
 JDBC {
 result_table_name = "t16"
 url = "jdbc:oracle:thin:@"
 driver = "oracle.jdbc.OracleDriver"
 user = ""
 password = ""
 query = ${basic_sql}" where regexp_like(USERID,'^1[B-C]')"
 }
 }
 
 select_columns = " select userid,... from "
 
 transform {
 sql {
 result_table_name = "merged"
 sql = ${select_columns}"t1 union all"${select_columns}"t16 "
 }
 }
 
 
 sink {
 HdfsFile {
 source_table_name = "merged"
 file_name_expression = "${transactionId}_${now}"
 fs.defaultFS = "hdfs://bicluster"
 hdfs_site_path = "/data/soft/hadoop/etc/hadoop/hdfs-site.xml"
 path = ${out_parent_dir}
 file_format = "orc"
 filename_time_format = "yyyyMMdd"
 is_enable_transaction = true
 batch_size = 5000000
 }
 HdfsFile {
 source_table_name = "t1"
 file_name_expression = "01_${transactionId}_${now}"
 path = "/tmp/ndmc_t_ai_user/tp="${date}""
 fs.defaultFS = "hdfs://bicluster"
 hdfs_site_path = "/data/soft/hadoop/etc/hadoop/hdfs-site.xml"
 file_format = "orc"
 filename_time_format = "yyyyMMdd"
 is_enable_transaction = true
 }
 HdfsFile {
 source_table_name = "t16"
 file_name_expression = "16_${transactionId}_${now}"
 path = "/tmp/ndmc_t_ai_user/tp="${date}""
 fs.defaultFS = "hdfs://bicluster"
 hdfs_site_path = "/data/soft/hadoop/etc/hadoop/hdfs-site.xml"
 file_format = "orc"
 filename_time_format = "yyyyMMdd"
 is_enable_transaction = true
 }
 }
 
 | 
ftp到console和hdfs
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 
 | env {execution.parallelism = 2
 job.mode = "BATCH"
 checkpoint.interval = 10000
 }
 
 source {
 SftpFile {
 host = 10.27.48.17
 port = 22
 user = hcycp
 password = "xxx"
 path = "caiyunbi/test"
 file_format_type = csv
 schema = {
 fields {
 phone = "string"
 }
 }
 result_table_name = "test"
 }
 
 }
 
 sink {
 HDFSFile {
 source_table_name = "test"
 fs.defaultFS = "hdfs://bicluster"
 hdfs_site_path = /data/soft/hadoop/etc/hadoop/hdfs-site.xml
 path = /tmp/SeaTunnelTest
 file_format_type = orc
 compress_codec = snappy
 }
 Console {
 source_table_name = "test"
 }
 }
 
 
 | 
SeaTunnel Engine 常驻server服务
修改bin/seatunnel-cluster.s 增加一行 JAVA_OPTS=”-Xms2G -Xmx2G”
./bin/seatunnel-cluster.sh -d
./bin/stop-seatunnel-cluster.sh 
Web
需要先启动 SeaTunnel Engine/Zeta Server
source
- org.apache.seatunnel.api.source.SeaTunnelSource 
- org.apache.seatunnel.api.sink.SeaTunnelSink
- org.apache.seatunnel.api.table.type.SeaTunnelRow
- SparkTaskExecuteCommand
- org.apache.seatunnel.engine.server.rest.RestJobExecutionEnvironment#getLogicalDag