install
1 2 3
| export version="2.3.3" wget "https://archive.apache.org/dist/incubator/seatunnel/${version}/apache-seatunnel-incubating-${version}-bin.tar.gz" tar -xzvf "apache-seatunnel-incubating-${version}-bin.tar.gz"
|
Install connectors plugin
脚本安装
配置plugin_config指定所需插件
插件列表见${SEATUNNEL_HOME}/connectors/plugin-mapping.properties
如有不需要的插件,可到config/plugin_config
修改自动安装的插件
sh bin/install_plugin.sh
1 2 3 4 5 6 7 8
| | ├─lib │ └─seatunnel-hadoop3-3.1.4-uber.jar ├─connectors │ ├─都放这里 └─plugins └─jdbc(connector name) └─lib √ mysql-jdbc
|
手动安装
到 https://developer.aliyun.com/mvn/search 搜索下载jar包放入对应目录
- 准确的artifact名称见
connectors/plugin-mapping.properties
,对应目录见开头
hadoop基础包放到lib目录: seatunnel-hadoop3-3.1.4-uber-2.3.3-optional.jar
mvn dependency:get -Dtransitive=false -DgroupId=org.apache.seatunnel -Dclassifier=optional -DartifactId=seatunnel-hadoop3-3.1.4-uber -Dversion=2.3.3 -Ddest=./
常用的connectors-v2 放到connectors/seatunnel
目录
- connector-clickhouse-2.3.0.jar
- connector-file-hadoop-2.3.0.jar
- connector-file-sftp-2.3.0.jar
- connector-file-ftp-2.3.0.jar
- connector-file-local-2.3.0.jar
- connector-jdbc-2.3.0.jar
mvn dependency:get -Dtransitive=false -DgroupId=org.apache.seatunnel -DartifactId=connector-jdbc -Dversion=2.3.3 -Ddest=./
依赖jar,如mysql/oracle的jar包mysql-connector-java-xxx.jar``ojdbc6-xxx.jar
放到plugins/jdbc/lib/
1 2 3
| group = org.apache.seatunnel artifact = seatunnel-connector-* version = 2.3.3*
|
配置运行
配置文件
Hocon格式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| env { }
source { // 支持多个源 FakeSource { } }
transform { }
sink { Console {} }
|
when multiple sources and multiple sinks, use result_table_name
and source_table_name
configurations in each step to indicate input/output data
SQL-config
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| CREATE TABLE test1 WITH ( 'connector'='FakeSource', 'schema' = '{ fields { id = "int", name = "string", c_time = "timestamp" } }', 'rows' = '[ { fields = [21, "Eric", null], kind = INSERT }, { fields = [22, "Andy", null], kind = INSERT } ]', 'type'='source' ); CREATE TABLE test09 AS SELECT id,name, CAST(null AS TIMESTAMP) AS c_time FROM test1; INSERT INTO test11 SELECT * FROM test09; CREATE TABLE test11 WITH ( 'connector'='jdbc', 'url' = 'jdbc:mysql://mysql-e2e:3306/seatunnel', 'driver' = 'com.mysql.cj.jdbc.Driver', 'user' = 'root', 'password' = 'Abc!@#135_seatunnel', 'generate_sink_sql' = 'true', 'database' = 'seatunnel', 'table' = 't_user', 'type'='sink' );
|
配置spark环境变量
config/seatunnel-env.sh
SPARK_HOME=/data/soft/seatunnel/spark
创建任务文件
https://spark.apache.org/docs/latest/configuration.html#available-properties
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| env { job.mode = "BATCH" job.name = ${jobName} parallelism = 2 }
source { }
transform { }
sink { }
|
./bin/seatunnel.sh -c <this_config_file> -i jobName='startjob'
执行测试任务
SeaTunnel Engine单机
org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient
./bin/seatunnel.sh -m local –config config/v2.batch.config.template
SeaTunnel Cluster daemon启动
org.apache.seatunnel.core.starter.seatunnel.SeaTunnelServer
SeaTunnel Engine分布式
./bin/seatunnel-cluster.sh
./bin/seatunnel-cluster.sh -d
./bin/stop-seatunnel-cluster.sh
./bin/seatunnel.sh –config 管理命令
spark引擎
启动类 org.apache.seatunnel.core.starter.spark.SparkStarter
1 2 3 4 5 6 7
| # 此类最后转化为spark-submit命令 主类SeatunnelSpark.class 执行类SparkTaskExecuteCommand,每个环节分别交由[Source/Sink/Transform]ExecuteProcessor完成
./bin/start-seatunnel-spark-3-connector-v2.sh \ --master yarn \ --deploy-mode client \ --config config/v2.batch.config.template \ -i date=20230307
|
配置中待替换的参数还必须要有双引号,如”${date}”
常用任务模板
hdfs到本地
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| env { job.name = "hdfs2local" job.mode = "BATCH" parallelism = 2 }
source {
HdfsFile { fs.defaultFS = "hdfs://bicluster" path = "/hive/warehouse/ods.db/bmp_t_bms_service_prov/year=2023/month=04/day=01" file_format_type = "orc" read_columns = ["serviceid","provcode"] hdfs_site_path = "/data/soft/hadoop/etc/hadoop/hdfs-site.xml" parse_partition_from_path = yes result_table_name = "input_t" } }
transform { Sql { source_table_name = "input_t" result_table_name = "input_t2" query = "select serviceid, concat(provcode, '_') as provcode, day from input_t" } }
sink { LocalFile { source_table_name = "input_t2" path = "/data/tmp/test2" file_format = "csv" sink_columns = ["serviceid","provcode","day"] field_delimiter = "," } }
|
要点:
- parse_partition_from_path 将分区字段添加到数据
- transform加工数据
jdbc到hdfs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| env { job.mode = "BATCH" spark.yarn.queue = spark spark.app.name = "SeaTunnel" spark.driver.cores = 1 spark.driver.memory = 1g spark.executor.instances = 2 spark.executor.cores = 5 spark.executor.memory = 5g }
source { JDBC { url = "jdbc:mysql://ip:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8" driver = "com.mysql.jdbc.Driver" user = "xxx" password = "xxx" query = "select * from t_ds_user" } }
transform {
}
sink { Console {} }
HdfsFile { fs.defaultFS = "hdfs://cluster" hdfs_site_path = "/data/soft/hadoop/etc/hadoop/hdfs-site.xml" path = "/tmp/test2" file_format = "orc" file_name_expression = "${transactionId}_${now}" filename_time_format = "yyyyMMdd" is_enable_transaction = true sink_columns = ["xx","xxx"] partition_by = ["xx"] partition_dir_expression = "${k0}=${v0}" is_partition_field_write_in_file = false }
|
jdbc到hdfs 多源
注意 目前仅spark&flink支持参数替换,seatunnel引擎不支持
只能串行执行,会启动parallelism个task,但实际工作是1个task
因此 多源数据合并需要使用transform模块,将不同源进行union
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
| basic_sql = "select userid,TO_TIMESTAMP(TO_CHAR(EFECTIVETIME,'yyyy-MM-dd HH24:mi:ss')...from ndmc.t_ai_user "
env { job.mode = "BATCH" parallelism = 4 spark.app.name = "user" spark.driver.cores = 1 spark.driver.memory = 1g spark.executor.instances = 2 spark.executor.cores = 5 spark.executor.memory = 5g }
source { JDBC { result_table_name = "t1" url = "jdbc:oracle:thin:@" driver = "oracle.jdbc.OracleDriver" user = "" password = "" query = ${basic_sql} } JDBC { result_table_name = "t16" url = "jdbc:oracle:thin:@" driver = "oracle.jdbc.OracleDriver" user = "" password = "" query = ${basic_sql}" where regexp_like(USERID,'^1[B-C]')" } }
select_columns = " select userid,... from "
transform { sql { result_table_name = "merged" sql = ${select_columns}"t1 union all"${select_columns}"t16 " } }
sink { HdfsFile { source_table_name = "merged" file_name_expression = "${transactionId}_${now}" fs.defaultFS = "hdfs://bicluster" hdfs_site_path = "/data/soft/hadoop/etc/hadoop/hdfs-site.xml" path = ${out_parent_dir} file_format = "orc" filename_time_format = "yyyyMMdd" is_enable_transaction = true batch_size = 5000000 } HdfsFile { source_table_name = "t1" file_name_expression = "01_${transactionId}_${now}" path = "/tmp/ndmc_t_ai_user/tp="${date}"" fs.defaultFS = "hdfs://bicluster" hdfs_site_path = "/data/soft/hadoop/etc/hadoop/hdfs-site.xml" file_format = "orc" filename_time_format = "yyyyMMdd" is_enable_transaction = true } HdfsFile { source_table_name = "t16" file_name_expression = "16_${transactionId}_${now}" path = "/tmp/ndmc_t_ai_user/tp="${date}"" fs.defaultFS = "hdfs://bicluster" hdfs_site_path = "/data/soft/hadoop/etc/hadoop/hdfs-site.xml" file_format = "orc" filename_time_format = "yyyyMMdd" is_enable_transaction = true } }
|
ftp到console和hdfs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| env { execution.parallelism = 2 job.mode = "BATCH" checkpoint.interval = 10000 }
source { SftpFile { host = 10.27.48.17 port = 22 user = hcycp password = "xxx" path = "caiyunbi/test" file_format_type = csv schema = { fields { phone = "string" } } result_table_name = "test" } }
sink { HDFSFile { source_table_name = "test" fs.defaultFS = "hdfs://bicluster" hdfs_site_path = /data/soft/hadoop/etc/hadoop/hdfs-site.xml path = /tmp/SeaTunnelTest file_format_type = orc compress_codec = snappy } Console { source_table_name = "test" } }
|
SeaTunnel Engine 常驻server服务
修改bin/seatunnel-cluster.s
增加一行 JAVA_OPTS=”-Xms2G -Xmx2G”
./bin/seatunnel-cluster.sh -d
./bin/stop-seatunnel-cluster.sh
Web
需要先启动 SeaTunnel Engine/Zeta Server
source
- org.apache.seatunnel.api.source.SeaTunnelSource
- org.apache.seatunnel.api.sink.SeaTunnelSink
- org.apache.seatunnel.api.table.type.SeaTunnelRow
- SparkTaskExecuteCommand
- org.apache.seatunnel.engine.server.rest.RestJobExecutionEnvironment#getLogicalDag