Contents
  1. 1. install
  2. 2. Install connectors plugin
    1. 2.1. 脚本安装
    2. 2.2. 手动安装
  3. 3. 配置运行
    1. 3.1. 配置文件
      1. 3.1.1. Hocon格式
      2. 3.1.2. SQL-config
    2. 3.2. 配置spark环境变量
    3. 3.3. 创建任务文件
    4. 3.4. 执行测试任务
    5. 3.5. SeaTunnel Engine单机
    6. 3.6. SeaTunnel Cluster daemon启动
      1. 3.6.1. SeaTunnel Engine分布式
    7. 3.7. spark引擎
  4. 4. 常用任务模板
    1. 4.1. hdfs到本地
    2. 4.2. jdbc到hdfs
    3. 4.3. jdbc到hdfs 多源
    4. 4.4. ftp到console和hdfs
  5. 5. SeaTunnel Engine 常驻server服务
  6. 6. Web
  7. 7. source

install

1
2
3
export version="2.3.3"
wget "https://archive.apache.org/dist/incubator/seatunnel/${version}/apache-seatunnel-incubating-${version}-bin.tar.gz"
tar -xzvf "apache-seatunnel-incubating-${version}-bin.tar.gz"

Install connectors plugin

脚本安装

配置plugin_config指定所需插件
插件列表见${SEATUNNEL_HOME}/connectors/plugin-mapping.properties
如有不需要的插件,可到config/plugin_config修改自动安装的插件
sh bin/install_plugin.sh

1
2
3
4
5
6
7
8
|
├─lib
│ └─seatunnel-hadoop3-3.1.4-uber.jar
├─connectors
│ ├─都放这里
└─plugins
└─jdbc(connector name)
└─lib √ mysql-jdbc

手动安装

  • https://developer.aliyun.com/mvn/search 搜索下载jar包放入对应目录

    • 准确的artifact名称见connectors/plugin-mapping.properties,对应目录见开头
  • hadoop基础包放到lib目录: seatunnel-hadoop3-3.1.4-uber-2.3.3-optional.jar

    • mvn dependency:get -Dtransitive=false -DgroupId=org.apache.seatunnel -Dclassifier=optional -DartifactId=seatunnel-hadoop3-3.1.4-uber -Dversion=2.3.3 -Ddest=./
  • 常用的connectors-v2 放到connectors/seatunnel目录

    • connector-clickhouse-2.3.0.jar
    • connector-file-hadoop-2.3.0.jar
    • connector-file-sftp-2.3.0.jar
    • connector-file-ftp-2.3.0.jar
    • connector-file-local-2.3.0.jar
    • connector-jdbc-2.3.0.jar
    • mvn dependency:get -Dtransitive=false -DgroupId=org.apache.seatunnel -DartifactId=connector-jdbc -Dversion=2.3.3 -Ddest=./
  • 依赖jar,如mysql/oracle的jar包mysql-connector-java-xxx.jar``ojdbc6-xxx.jar放到plugins/jdbc/lib/

1
2
3
group = org.apache.seatunnel
artifact = seatunnel-connector-*
version = 2.3.3*

配置运行

配置文件

Hocon格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
env {
}

source {
// 支持多个源
FakeSource {
}
}

transform {
}

sink {
Console {}
}

when multiple sources and multiple sinks, use result_table_name and source_table_name configurations in each step to indicate input/output data

SQL-config

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
  
CREATE TABLE test1 WITH (
'connector'='FakeSource',
'schema' = '{
fields {
id = "int",
name = "string",
c_time = "timestamp"
}
}',
'rows' = '[
{ fields = [21, "Eric", null], kind = INSERT },
{ fields = [22, "Andy", null], kind = INSERT }
]',
'type'='source'
);

CREATE TABLE test09 AS SELECT id,name, CAST(null AS TIMESTAMP) AS c_time FROM test1;

INSERT INTO test11 SELECT * FROM test09;

CREATE TABLE test11
WITH (
'connector'='jdbc',
'url' = 'jdbc:mysql://mysql-e2e:3306/seatunnel',
'driver' = 'com.mysql.cj.jdbc.Driver',
'user' = 'root',
'password' = 'Abc!@#135_seatunnel',
'generate_sink_sql' = 'true',
'database' = 'seatunnel',
'table' = 't_user',
'type'='sink'
);

配置spark环境变量

config/seatunnel-env.sh
SPARK_HOME=/data/soft/seatunnel/spark

创建任务文件

https://spark.apache.org/docs/latest/configuration.html#available-properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
env {  
job.mode = "BATCH"
job.name = ${jobName}
parallelism = 2
}

source {
}

transform {
}

sink {
}

./bin/seatunnel.sh -c <this_config_file> -i jobName='startjob'

执行测试任务

SeaTunnel Engine单机

org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient

./bin/seatunnel.sh -m local –config config/v2.batch.config.template

SeaTunnel Cluster daemon启动

org.apache.seatunnel.core.starter.seatunnel.SeaTunnelServer

SeaTunnel Engine分布式

./bin/seatunnel-cluster.sh
./bin/seatunnel-cluster.sh -d
./bin/stop-seatunnel-cluster.sh

./bin/seatunnel.sh –config 管理命令

spark引擎

启动类 org.apache.seatunnel.core.starter.spark.SparkStarter

1
2
3
4
5
6
7
# 此类最后转化为spark-submit命令  主类SeatunnelSpark.class 执行类SparkTaskExecuteCommand,每个环节分别交由[Source/Sink/Transform]ExecuteProcessor完成

./bin/start-seatunnel-spark-3-connector-v2.sh \
--master yarn \
--deploy-mode client \
--config config/v2.batch.config.template \
-i date=20230307

配置中待替换的参数还必须要有双引号,如”${date}”

常用任务模板

hdfs到本地

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
env {
job.name = "hdfs2local"
job.mode = "BATCH"
parallelism = 2
}

source {
// 支持多个源
HdfsFile {
fs.defaultFS = "hdfs://bicluster"
path = "/hive/warehouse/ods.db/bmp_t_bms_service_prov/year=2023/month=04/day=01"
file_format_type = "orc"
read_columns = ["serviceid","provcode"]
hdfs_site_path = "/data/soft/hadoop/etc/hadoop/hdfs-site.xml"
parse_partition_from_path = yes
result_table_name = "input_t"
}
}

transform {
Sql {
source_table_name = "input_t"
result_table_name = "input_t2"
query = "select serviceid, concat(provcode, '_') as provcode, day from input_t"
}
}

sink {
LocalFile {
source_table_name = "input_t2"
path = "/data/tmp/test2"
file_format = "csv"
sink_columns = ["serviceid","provcode","day"]
field_delimiter = ","
}
}

要点:

  • parse_partition_from_path 将分区字段添加到数据
  • transform加工数据

jdbc到hdfs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
env {
job.mode = "BATCH"
spark.yarn.queue = spark
spark.app.name = "SeaTunnel"
spark.driver.cores = 1
spark.driver.memory = 1g
spark.executor.instances = 2
spark.executor.cores = 5
spark.executor.memory = 5g
}

source {
JDBC {
url = "jdbc:mysql://ip:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8"
driver = "com.mysql.jdbc.Driver"
user = "xxx"
password = "xxx"
query = "select * from t_ds_user"
}
}

transform {

}

sink {
Console {}
}

HdfsFile {
fs.defaultFS = "hdfs://cluster"
hdfs_site_path = "/data/soft/hadoop/etc/hadoop/hdfs-site.xml"
path = "/tmp/test2"
file_format = "orc"
file_name_expression = "${transactionId}_${now}"
filename_time_format = "yyyyMMdd"
is_enable_transaction = true
sink_columns = ["xx","xxx"]
partition_by = ["xx"]
partition_dir_expression = "${k0}=${v0}"
is_partition_field_write_in_file = false
}

jdbc到hdfs 多源

注意 目前仅spark&flink支持参数替换,seatunnel引擎不支持

只能串行执行,会启动parallelism个task,但实际工作是1个task

因此 多源数据合并需要使用transform模块,将不同源进行union

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76

basic_sql = "select userid,TO_TIMESTAMP(TO_CHAR(EFECTIVETIME,'yyyy-MM-dd HH24:mi:ss')...from ndmc.t_ai_user "

env {
job.mode = "BATCH"
parallelism = 4
spark.app.name = "user"
spark.driver.cores = 1
spark.driver.memory = 1g
spark.executor.instances = 2
spark.executor.cores = 5
spark.executor.memory = 5g
}

source {
JDBC {
result_table_name = "t1"
url = "jdbc:oracle:thin:@"
driver = "oracle.jdbc.OracleDriver"
user = ""
password = ""
query = ${basic_sql}
}
JDBC {
result_table_name = "t16"
url = "jdbc:oracle:thin:@"
driver = "oracle.jdbc.OracleDriver"
user = ""
password = ""
query = ${basic_sql}" where regexp_like(USERID,'^1[B-C]')"
}
}

select_columns = " select userid,... from "

transform {
sql {
result_table_name = "merged"
sql = ${select_columns}"t1 union all"${select_columns}"t16 "
}
}


sink {
HdfsFile {
source_table_name = "merged"
file_name_expression = "${transactionId}_${now}"
fs.defaultFS = "hdfs://bicluster"
hdfs_site_path = "/data/soft/hadoop/etc/hadoop/hdfs-site.xml"
path = ${out_parent_dir}
file_format = "orc"
filename_time_format = "yyyyMMdd"
is_enable_transaction = true
batch_size = 5000000
}
HdfsFile {
source_table_name = "t1"
file_name_expression = "01_${transactionId}_${now}"
path = "/tmp/ndmc_t_ai_user/tp="${date}""
fs.defaultFS = "hdfs://bicluster"
hdfs_site_path = "/data/soft/hadoop/etc/hadoop/hdfs-site.xml"
file_format = "orc"
filename_time_format = "yyyyMMdd"
is_enable_transaction = true
}
HdfsFile {
source_table_name = "t16"
file_name_expression = "16_${transactionId}_${now}"
path = "/tmp/ndmc_t_ai_user/tp="${date}""
fs.defaultFS = "hdfs://bicluster"
hdfs_site_path = "/data/soft/hadoop/etc/hadoop/hdfs-site.xml"
file_format = "orc"
filename_time_format = "yyyyMMdd"
is_enable_transaction = true
}
}

ftp到console和hdfs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
env {
execution.parallelism = 2
job.mode = "BATCH"
checkpoint.interval = 10000
}

source {
SftpFile {
host = 10.27.48.17
port = 22
user = hcycp
password = "xxx"
path = "caiyunbi/test"
file_format_type = csv
schema = {
fields {
phone = "string"
}
}
result_table_name = "test"
}

}

sink {
HDFSFile {
source_table_name = "test"
fs.defaultFS = "hdfs://bicluster"
hdfs_site_path = /data/soft/hadoop/etc/hadoop/hdfs-site.xml
path = /tmp/SeaTunnelTest
file_format_type = orc
compress_codec = snappy
}
Console {
source_table_name = "test"
}
}

SeaTunnel Engine 常驻server服务

修改bin/seatunnel-cluster.s 增加一行 JAVA_OPTS=”-Xms2G -Xmx2G”

./bin/seatunnel-cluster.sh -d
./bin/stop-seatunnel-cluster.sh

Web

需要先启动 SeaTunnel Engine/Zeta Server

source

  • org.apache.seatunnel.api.source.SeaTunnelSource
  • org.apache.seatunnel.api.sink.SeaTunnelSink
  • org.apache.seatunnel.api.table.type.SeaTunnelRow
  • SparkTaskExecuteCommand
  • org.apache.seatunnel.engine.server.rest.RestJobExecutionEnvironment#getLogicalDag