Contents
  1. 1. install
  2. 2. Install connectors plugin
    1. 2.1. 脚本安装
    2. 2.2. 手动安装
  3. 3. 配置运行
    1. 3.1. 配置spark环境变量
    2. 3.2. 创建任务文件
    3. 3.3. 执行测试任务
  4. 4. 常用任务模板
    1. 4.1. hdfs到本地
    2. 4.2. jdbc到hdfs 多源
    3. 4.3. ftp到console和hdfs
  5. 5. SeaTunnel Engine 常驻server服务
  6. 6. Web

install

1
2
3
export version="2.3.3"
wget "https://archive.apache.org/dist/incubator/seatunnel/${version}/apache-seatunnel-incubating-${version}-bin.tar.gz"
tar -xzvf "apache-seatunnel-incubating-${version}-bin.tar.gz"

Install connectors plugin

脚本安装

配置plugin_config指定所需插件
插件列表见${SEATUNNEL_HOME}/connectors/plugin-mapping.properties
如有不需要的插件,可到config/plugin_config修改自动安装的插件
sh bin/install_plugin.sh

1
2
3
4
5
6
7
8
|
├─lib
│ └─seatunnel-hadoop3-3.1.4-uber-2.3.3-optional.jar
├─connectors
│ ├─seatunnel √ v2都放这里
└─plugins
└─jdbc
└─lib √ mysql-jdbc

手动安装

  • https://developer.aliyun.com/mvn/search 搜索下载jar包放入对应目录

    • 准确的artifact名称见connectors/plugin-mapping.properties,对应目录见开头
  • hadoop基础包放到lib目录: seatunnel-hadoop3-3.1.4-uber-2.3.3-optional.jar

    • mvn dependency:get -DgroupId=org.apache.seatunnel -Dclassifier=optional -DartifactId=seatunnel-hadoop3-3.1.4-uber -Dversion=2.3.3 -Ddest=./
  • 常用的connectors-v2 放到connectors/seatunnel目录

    • connector-clickhouse-2.3.0.jar
    • connector-file-hadoop-2.3.0.jar
    • connector-file-sftp-2.3.0.jar
    • connector-file-ftp-2.3.0.jar
    • connector-file-local-2.3.0.jar
    • connector-jdbc-2.3.0.jar
    • mvn dependency:get -DgroupId=org.apache.seatunnel -DartifactId=connector-jdbc -Dversion=2.3.3 -Ddest=./
  • 依赖jar,如mysql/oracle的jar包mysql-connector-java-xxx.jar``ojdbc6-xxx.jar放到plugins/jdbc/lib/

1
2
3
group = org.apache.seatunnel
artifact = seatunnel-connector-*
version = 2.3.3*

配置运行

配置文件结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
env {
}

source {
// 支持多个源
FakeSource {
}
}

transform {
}

sink {
Console {}
}

when multiple sources and multiple sinks, use result_table_name and source_table_name configurations
in each step to indicate input/output data

配置spark环境变量

config/seatunnel-env.sh
SPARK_HOME=/data/soft/seatunnel/spark

创建任务文件

https://spark.apache.org/docs/latest/configuration.html#available-properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
env {
job.mode = "BATCH"
spark.yarn.queue = spark
spark.app.name = "SeaTunnel"
spark.driver.cores = 1
spark.driver.memory = 1g
spark.executor.instances = 2
spark.executor.cores = 5
spark.executor.memory = 5g
}

source {
JDBC {
url = "jdbc:mysql://ip:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8"
driver = "com.mysql.jdbc.Driver"
user = "xxx"
password = "xxx"
query = "select * from t_ds_user"
}
}

transform {

}

sink {
Console {}
}

HdfsFile {
fs.defaultFS = "hdfs://cluster"
hdfs_site_path = "/data/soft/hadoop/etc/hadoop/hdfs-site.xml"
path = "/tmp/test2"
file_format = "orc"
file_name_expression = "${transactionId}_${now}"
filename_time_format = "yyyyMMdd"
is_enable_transaction = true
sink_columns = ["xx","xxx"]
partition_by = ["xx"]
partition_dir_expression = "${k0}=${v0}"
is_partition_field_write_in_file = false
}

执行测试任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# SeaTunnel Engine单机 org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient
./bin/seatunnel.sh -e local --config config/v2.batch.config.template

# SeaTunnel Engine分布式 org.apache.seatunnel.core.starter.seatunnel.SeaTunnelServer
./bin/seatunnel-cluster.sh
# daemon启动
./bin/seatunnel-cluster.sh -d
$SEATUNNEL_HOME/bin/seatunnel.sh --config $SEATUNNEL_HOME/config/v2.batch.config.template

# spark引擎启动类 org.apache.seatunnel.core.starter.spark.SparkStarter
# 此类最后转化为spark-submit命令 主类SeatunnelSpark.class 执行类SparkApiTaskExecuteCommand
./bin/start-seatunnel-spark-3-connector-v2.sh \
--master yarn \
--deploy-mode client \
--config config/v2.batch.config.template \
-i date=20230307

配置中待替换的参数还必须要有双引号,如”${date}”

常用任务模板

hdfs到本地

./bin/seatunnel.sh –config config/test.conf -e local

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
env {
job.name = "hdfs2local"
job.mode = "BATCH"
parallelism = 5
}

source {
// 支持多个源
HdfsFile {
fs.defaultFS = "hdfs://10.3.4.191:9000"
path = "/hive/warehouse/dw.db/burpoint_content_info/year=2023/month=02/day=01"
type = "orc"
}
}

transform {
}

sink {
LocalFile {
path = "/data/tmp/test2"
file_format = "orc"
sink_columns = ["page_id","content_id","phone","tack_time"]
}
}

jdbc到hdfs 多源

注意 目前仅spark&flink支持参数替换,seatunnel引擎不支持

只能串行执行,会启动parallelism个task,但实际工作是1个task

因此 多源数据合并需要使用transform模块,将不同源进行union

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76

basic_sql = "select userid,TO_TIMESTAMP(TO_CHAR(EFECTIVETIME,'yyyy-MM-dd HH24:mi:ss')...from ndmc.t_ai_user "

env {
job.mode = "BATCH"
parallelism = 4
spark.app.name = "user"
spark.driver.cores = 1
spark.driver.memory = 1g
spark.executor.instances = 2
spark.executor.cores = 5
spark.executor.memory = 5g
}

source {
JDBC {
result_table_name = "t1"
url = "jdbc:oracle:thin:@"
driver = "oracle.jdbc.OracleDriver"
user = ""
password = ""
query = ${basic_sql}
}
JDBC {
result_table_name = "t16"
url = "jdbc:oracle:thin:@"
driver = "oracle.jdbc.OracleDriver"
user = ""
password = ""
query = ${basic_sql}" where regexp_like(USERID,'^1[B-C]')"
}
}

select_columns = " select userid,... from "

transform {
sql {
result_table_name = "merged"
sql = ${select_columns}"t1 union all"${select_columns}"t16 "
}
}


sink {
HdfsFile {
source_table_name = "merged"
file_name_expression = "${transactionId}_${now}"
fs.defaultFS = "hdfs://bicluster"
hdfs_site_path = "/data/soft/hadoop/etc/hadoop/hdfs-site.xml"
path = ${out_parent_dir}
file_format = "orc"
filename_time_format = "yyyyMMdd"
is_enable_transaction = true
batch_size = 5000000
}
HdfsFile {
source_table_name = "t1"
file_name_expression = "01_${transactionId}_${now}"
path = "/tmp/ndmc_t_ai_user/tp="${date}""
fs.defaultFS = "hdfs://bicluster"
hdfs_site_path = "/data/soft/hadoop/etc/hadoop/hdfs-site.xml"
file_format = "orc"
filename_time_format = "yyyyMMdd"
is_enable_transaction = true
}
HdfsFile {
source_table_name = "t16"
file_name_expression = "16_${transactionId}_${now}"
path = "/tmp/ndmc_t_ai_user/tp="${date}""
fs.defaultFS = "hdfs://bicluster"
hdfs_site_path = "/data/soft/hadoop/etc/hadoop/hdfs-site.xml"
file_format = "orc"
filename_time_format = "yyyyMMdd"
is_enable_transaction = true
}
}

ftp到console和hdfs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
env {
execution.parallelism = 2
job.mode = "BATCH"
checkpoint.interval = 10000
}

source {
SftpFile {
host = 10.27.48.17
port = 22
user = hcycp
password = "xxx"
path = "caiyunbi/test"
file_format_type = csv
schema = {
fields {
phone = "string"
}
}
result_table_name = "test"
}

}

sink {
HDFSFile {
source_table_name = "test"
fs.defaultFS = "hdfs://bicluster"
hdfs_site_path = /data/soft/hadoop/etc/hadoop/hdfs-site.xml
path = /tmp/SeaTunnelTest
file_format_type = orc
compress_codec = snappy
}
Console {
source_table_name = "test"
}
}

SeaTunnel Engine 常驻server服务

修改bin/seatunnel-cluster.s 增加一行 JAVA_OPTS=”-Xms2G -Xmx2G”

./bin/seatunnel-cluster.sh -d
./bin/stop-seatunnel-cluster.sh

Web

需要先启动 SeaTunnel Engine/Zeta Server