install
1 2 3
| export version="2.3.3" wget "https://archive.apache.org/dist/incubator/seatunnel/${version}/apache-seatunnel-incubating-${version}-bin.tar.gz" tar -xzvf "apache-seatunnel-incubating-${version}-bin.tar.gz"
|
Install connectors plugin
脚本安装
配置plugin_config指定所需插件
插件列表见${SEATUNNEL_HOME}/connectors/plugin-mapping.properties
如有不需要的插件,可到config/plugin_config
修改自动安装的插件
sh bin/install_plugin.sh
1 2 3 4 5 6 7 8
| | ├─lib │ └─seatunnel-hadoop3-3.1.4-uber-2.3.3-optional.jar ├─connectors │ ├─seatunnel √ v2都放这里 └─plugins └─jdbc └─lib √ mysql-jdbc
|
手动安装
到 https://developer.aliyun.com/mvn/search 搜索下载jar包放入对应目录
- 准确的artifact名称见
connectors/plugin-mapping.properties
,对应目录见开头
hadoop基础包放到lib目录: seatunnel-hadoop3-3.1.4-uber-2.3.3-optional.jar
mvn dependency:get -DgroupId=org.apache.seatunnel -Dclassifier=optional -DartifactId=seatunnel-hadoop3-3.1.4-uber -Dversion=2.3.3 -Ddest=./
常用的connectors-v2 放到connectors/seatunnel
目录
- connector-clickhouse-2.3.0.jar
- connector-file-hadoop-2.3.0.jar
- connector-file-sftp-2.3.0.jar
- connector-file-ftp-2.3.0.jar
- connector-file-local-2.3.0.jar
- connector-jdbc-2.3.0.jar
mvn dependency:get -DgroupId=org.apache.seatunnel -DartifactId=connector-jdbc -Dversion=2.3.3 -Ddest=./
依赖jar,如mysql/oracle的jar包mysql-connector-java-xxx.jar``ojdbc6-xxx.jar
放到plugins/jdbc/lib/
1 2 3
| group = org.apache.seatunnel artifact = seatunnel-connector-* version = 2.3.3*
|
配置运行
配置文件结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| env { }
source { // 支持多个源 FakeSource { } }
transform { }
sink { Console {} }
|
when multiple sources and multiple sinks, use result_table_name
and source_table_name
configurations
in each step to indicate input/output data
配置spark环境变量
config/seatunnel-env.sh
SPARK_HOME=/data/soft/seatunnel/spark
创建任务文件
https://spark.apache.org/docs/latest/configuration.html#available-properties
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| env { job.mode = "BATCH" spark.yarn.queue = spark spark.app.name = "SeaTunnel" spark.driver.cores = 1 spark.driver.memory = 1g spark.executor.instances = 2 spark.executor.cores = 5 spark.executor.memory = 5g }
source { JDBC { url = "jdbc:mysql://ip:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8" driver = "com.mysql.jdbc.Driver" user = "xxx" password = "xxx" query = "select * from t_ds_user" } }
transform {
}
sink { Console {} }
HdfsFile { fs.defaultFS = "hdfs://cluster" hdfs_site_path = "/data/soft/hadoop/etc/hadoop/hdfs-site.xml" path = "/tmp/test2" file_format = "orc" file_name_expression = "${transactionId}_${now}" filename_time_format = "yyyyMMdd" is_enable_transaction = true sink_columns = ["xx","xxx"] partition_by = ["xx"] partition_dir_expression = "${k0}=${v0}" is_partition_field_write_in_file = false }
|
执行测试任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| # SeaTunnel Engine单机 org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient ./bin/seatunnel.sh -e local --config config/v2.batch.config.template
# SeaTunnel Engine分布式 org.apache.seatunnel.core.starter.seatunnel.SeaTunnelServer ./bin/seatunnel-cluster.sh # daemon启动 ./bin/seatunnel-cluster.sh -d $SEATUNNEL_HOME/bin/seatunnel.sh --config $SEATUNNEL_HOME/config/v2.batch.config.template
# spark引擎启动类 org.apache.seatunnel.core.starter.spark.SparkStarter # 此类最后转化为spark-submit命令 主类SeatunnelSpark.class 执行类SparkApiTaskExecuteCommand ./bin/start-seatunnel-spark-3-connector-v2.sh \ --master yarn \ --deploy-mode client \ --config config/v2.batch.config.template \ -i date=20230307
|
配置中待替换的参数还必须要有双引号,如”${date}”
常用任务模板
hdfs到本地
./bin/seatunnel.sh –config config/test.conf -e local
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| env { job.name = "hdfs2local" job.mode = "BATCH" parallelism = 5 }
source {
HdfsFile { fs.defaultFS = "hdfs://10.3.4.191:9000" path = "/hive/warehouse/dw.db/burpoint_content_info/year=2023/month=02/day=01" type = "orc" } }
transform { }
sink { LocalFile { path = "/data/tmp/test2" file_format = "orc" sink_columns = ["page_id","content_id","phone","tack_time"] } }
|
jdbc到hdfs 多源
注意 目前仅spark&flink支持参数替换,seatunnel引擎不支持
只能串行执行,会启动parallelism个task,但实际工作是1个task
因此 多源数据合并需要使用transform模块,将不同源进行union
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
| basic_sql = "select userid,TO_TIMESTAMP(TO_CHAR(EFECTIVETIME,'yyyy-MM-dd HH24:mi:ss')...from ndmc.t_ai_user "
env { job.mode = "BATCH" parallelism = 4 spark.app.name = "user" spark.driver.cores = 1 spark.driver.memory = 1g spark.executor.instances = 2 spark.executor.cores = 5 spark.executor.memory = 5g }
source { JDBC { result_table_name = "t1" url = "jdbc:oracle:thin:@" driver = "oracle.jdbc.OracleDriver" user = "" password = "" query = ${basic_sql} } JDBC { result_table_name = "t16" url = "jdbc:oracle:thin:@" driver = "oracle.jdbc.OracleDriver" user = "" password = "" query = ${basic_sql}" where regexp_like(USERID,'^1[B-C]')" } }
select_columns = " select userid,... from "
transform { sql { result_table_name = "merged" sql = ${select_columns}"t1 union all"${select_columns}"t16 " } }
sink { HdfsFile { source_table_name = "merged" file_name_expression = "${transactionId}_${now}" fs.defaultFS = "hdfs://bicluster" hdfs_site_path = "/data/soft/hadoop/etc/hadoop/hdfs-site.xml" path = ${out_parent_dir} file_format = "orc" filename_time_format = "yyyyMMdd" is_enable_transaction = true batch_size = 5000000 } HdfsFile { source_table_name = "t1" file_name_expression = "01_${transactionId}_${now}" path = "/tmp/ndmc_t_ai_user/tp="${date}"" fs.defaultFS = "hdfs://bicluster" hdfs_site_path = "/data/soft/hadoop/etc/hadoop/hdfs-site.xml" file_format = "orc" filename_time_format = "yyyyMMdd" is_enable_transaction = true } HdfsFile { source_table_name = "t16" file_name_expression = "16_${transactionId}_${now}" path = "/tmp/ndmc_t_ai_user/tp="${date}"" fs.defaultFS = "hdfs://bicluster" hdfs_site_path = "/data/soft/hadoop/etc/hadoop/hdfs-site.xml" file_format = "orc" filename_time_format = "yyyyMMdd" is_enable_transaction = true } }
|
ftp到console和hdfs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| env { execution.parallelism = 2 job.mode = "BATCH" checkpoint.interval = 10000 }
source { SftpFile { host = 10.27.48.17 port = 22 user = hcycp password = "xxx" path = "caiyunbi/test" file_format_type = csv schema = { fields { phone = "string" } } result_table_name = "test" } }
sink { HDFSFile { source_table_name = "test" fs.defaultFS = "hdfs://bicluster" hdfs_site_path = /data/soft/hadoop/etc/hadoop/hdfs-site.xml path = /tmp/SeaTunnelTest file_format_type = orc compress_codec = snappy } Console { source_table_name = "test" } }
|
SeaTunnel Engine 常驻server服务
修改bin/seatunnel-cluster.s
增加一行 JAVA_OPTS=”-Xms2G -Xmx2G”
./bin/seatunnel-cluster.sh -d
./bin/stop-seatunnel-cluster.sh
Web
需要先启动 SeaTunnel Engine/Zeta Server