Article January 23, 2024

seatunnel

Words count 37k Reading time 33 mins. Read count 0

install

1
2
3
export version="2.3.3"
wget "https://archive.apache.org/dist/incubator/seatunnel/${version}/apache-seatunnel-incubating-${version}-bin.tar.gz"
tar -xzvf "apache-seatunnel-incubating-${version}-bin.tar.gz"

Install connectors plugin

脚本安装

配置plugin_config指定所需插件
插件列表见${SEATUNNEL_HOME}/connectors/plugin-mapping.properties
如有不需要的插件,可到config/plugin_config修改自动安装的插件
sh bin/install_plugin.sh

1
2
3
4
5
6
7
8
|
├─lib
│ └─seatunnel-hadoop3-3.1.4-uber.jar
├─connectors
│ ├─都放这里
└─plugins
└─jdbc(connector name)
└─lib √ mysql-jdbc

手动安装

  • https://developer.aliyun.com/mvn/search 搜索下载jar包放入对应目录

    • 准确的artifact名称见connectors/plugin-mapping.properties,对应目录见开头
  • hadoop基础包放到lib目录: seatunnel-hadoop3-3.1.4-uber-2.3.3-optional.jar

    • mvn dependency:get -Dtransitive=false -DgroupId=org.apache.seatunnel -Dclassifier=optional -DartifactId=seatunnel-hadoop3-3.1.4-uber -Dversion=2.3.3 -Ddest=./
  • 常用的connectors-v2 放到connectors/seatunnel目录

    • connector-clickhouse-2.3.0.jar
    • connector-file-hadoop-2.3.0.jar
    • connector-file-sftp-2.3.0.jar
    • connector-file-ftp-2.3.0.jar
    • connector-file-local-2.3.0.jar
    • connector-jdbc-2.3.0.jar
    • mvn dependency:get -Dtransitive=false -DgroupId=org.apache.seatunnel -DartifactId=connector-jdbc -Dversion=2.3.3 -Ddest=./
  • 依赖jar,如mysql/oracle的jar包mysql-connector-java-xxx.jar``ojdbc6-xxx.jar放到plugins/jdbc/lib/

1
2
3
group = org.apache.seatunnel
artifact = seatunnel-connector-*
version = 2.3.3*

配置运行

配置文件

Hocon格式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
env {
}

source {
// 支持多个源
FakeSource {
}
}

transform {
}

sink {
Console {}
}

when multiple sources and multiple sinks, use result_table_name and source_table_name configurations in each step to indicate input/output data

SQL-config

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
  
CREATE TABLE test1 WITH (
'connector'='FakeSource',
'schema' = '{
fields {
id = "int",
name = "string",
c_time = "timestamp"
}
}',
'rows' = '[
{ fields = [21, "Eric", null], kind = INSERT },
{ fields = [22, "Andy", null], kind = INSERT }
]',
'type'='source'
);

CREATE TABLE test09 AS SELECT id,name, CAST(null AS TIMESTAMP) AS c_time FROM test1;

INSERT INTO test11 SELECT * FROM test09;

CREATE TABLE test11
WITH (
'connector'='jdbc',
'url' = 'jdbc:mysql://mysql-e2e:3306/seatunnel',
'driver' = 'com.mysql.cj.jdbc.Driver',
'user' = 'root',
'password' = 'Abc!@#135_seatunnel',
'generate_sink_sql' = 'true',
'database' = 'seatunnel',
'table' = 't_user',
'type'='sink'
);

配置spark环境变量

config/seatunnel-env.sh
SPARK_HOME=/data/soft/seatunnel/spark

创建任务文件

https://spark.apache.org/docs/latest/configuration.html#available-properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
env {  
job.mode = "BATCH"
job.name = ${jobName}
parallelism = 2
}

source {
}

transform {
}

sink {
}

./bin/seatunnel.sh -c <this_config_file> -i jobName='startjob'

执行测试任务

SeaTunnel Engine单机

org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient

./bin/seatunnel.sh -m local –config config/v2.batch.config.template

SeaTunnel Cluster daemon启动

org.apache.seatunnel.core.starter.seatunnel.SeaTunnelServer

SeaTunnel Engine分布式

./bin/seatunnel-cluster.sh
./bin/seatunnel-cluster.sh -d
./bin/stop-seatunnel-cluster.sh

./bin/seatunnel.sh –config 管理命令

spark引擎

启动类 org.apache.seatunnel.core.starter.spark.SparkStarter

1
2
3
4
5
6
7
# 此类最后转化为spark-submit命令  主类SeatunnelSpark.class 执行类SparkTaskExecuteCommand,每个环节分别交由[Source/Sink/Transform]ExecuteProcessor完成

./bin/start-seatunnel-spark-3-connector-v2.sh \
--master yarn \
--deploy-mode client \
--config config/v2.batch.config.template \
-i date=20230307

配置中待替换的参数还必须要有双引号,如”${date}”

常用任务模板

hdfs到本地

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
env {
job.name = "hdfs2local"
job.mode = "BATCH"
parallelism = 2
}

source {
// 支持多个源
HdfsFile {
fs.defaultFS = "hdfs://bicluster"
path = "/hive/warehouse/ods.db/bmp_t_bms_service_prov/year=2023/month=04/day=01"
file_format_type = "orc"
read_columns = ["serviceid","provcode"]
hdfs_site_path = "/data/soft/hadoop/etc/hadoop/hdfs-site.xml"
parse_partition_from_path = yes
result_table_name = "input_t"
}
}

transform {
Sql {
source_table_name = "input_t"
result_table_name = "input_t2"
query = "select serviceid, concat(provcode, '_') as provcode, day from input_t"
}
}

sink {
LocalFile {
source_table_name = "input_t2"
path = "/data/tmp/test2"
file_format = "csv"
sink_columns = ["serviceid","provcode","day"]
field_delimiter = ","
}
}

要点:

  • parse_partition_from_path 将分区字段添加到数据
  • transform加工数据

jdbc到hdfs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
env {
job.mode = "BATCH"
spark.yarn.queue = spark
spark.app.name = "SeaTunnel"
spark.driver.cores = 1
spark.driver.memory = 1g
spark.executor.instances = 2
spark.executor.cores = 5
spark.executor.memory = 5g
}

source {
JDBC {
url = "jdbc:mysql://ip:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8"
driver = "com.mysql.jdbc.Driver"
user = "xxx"
password = "xxx"
query = "select * from t_ds_user"
}
}

transform {

}

sink {
Console {}
}

HdfsFile {
fs.defaultFS = "hdfs://cluster"
hdfs_site_path = "/data/soft/hadoop/etc/hadoop/hdfs-site.xml"
path = "/tmp/test2"
file_format = "orc"
file_name_expression = "${transactionId}_${now}"
filename_time_format = "yyyyMMdd"
is_enable_transaction = true
sink_columns = ["xx","xxx"]
partition_by = ["xx"]
partition_dir_expression = "${k0}=${v0}"
is_partition_field_write_in_file = false
}

jdbc到hdfs 多源

注意 目前仅spark&flink支持参数替换,seatunnel引擎不支持

只能串行执行,会启动parallelism个task,但实际工作是1个task

因此 多源数据合并需要使用transform模块,将不同源进行union

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76

basic_sql = "select userid,TO_TIMESTAMP(TO_CHAR(EFECTIVETIME,'yyyy-MM-dd HH24:mi:ss')...from ndmc.t_ai_user "

env {
job.mode = "BATCH"
parallelism = 4
spark.app.name = "user"
spark.driver.cores = 1
spark.driver.memory = 1g
spark.executor.instances = 2
spark.executor.cores = 5
spark.executor.memory = 5g
}

source {
JDBC {
result_table_name = "t1"
url = "jdbc:oracle:thin:@"
driver = "oracle.jdbc.OracleDriver"
user = ""
password = ""
query = ${basic_sql}
}
JDBC {
result_table_name = "t16"
url = "jdbc:oracle:thin:@"
driver = "oracle.jdbc.OracleDriver"
user = ""
password = ""
query = ${basic_sql}" where regexp_like(USERID,'^1[B-C]')"
}
}

select_columns = " select userid,... from "

transform {
sql {
result_table_name = "merged"
sql = ${select_columns}"t1 union all"${select_columns}"t16 "
}
}


sink {
HdfsFile {
source_table_name = "merged"
file_name_expression = "${transactionId}_${now}"
fs.defaultFS = "hdfs://bicluster"
hdfs_site_path = "/data/soft/hadoop/etc/hadoop/hdfs-site.xml"
path = ${out_parent_dir}
file_format = "orc"
filename_time_format = "yyyyMMdd"
is_enable_transaction = true
batch_size = 5000000
}
HdfsFile {
source_table_name = "t1"
file_name_expression = "01_${transactionId}_${now}"
path = "/tmp/ndmc_t_ai_user/tp="${date}""
fs.defaultFS = "hdfs://bicluster"
hdfs_site_path = "/data/soft/hadoop/etc/hadoop/hdfs-site.xml"
file_format = "orc"
filename_time_format = "yyyyMMdd"
is_enable_transaction = true
}
HdfsFile {
source_table_name = "t16"
file_name_expression = "16_${transactionId}_${now}"
path = "/tmp/ndmc_t_ai_user/tp="${date}""
fs.defaultFS = "hdfs://bicluster"
hdfs_site_path = "/data/soft/hadoop/etc/hadoop/hdfs-site.xml"
file_format = "orc"
filename_time_format = "yyyyMMdd"
is_enable_transaction = true
}
}

ftp到console和hdfs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
env {
execution.parallelism = 2
job.mode = "BATCH"
checkpoint.interval = 10000
}

source {
SftpFile {
host = 10.27.48.17
port = 22
user = hcycp
password = "xxx"
path = "caiyunbi/test"
file_format_type = csv
schema = {
fields {
phone = "string"
}
}
result_table_name = "test"
}

}

sink {
HDFSFile {
source_table_name = "test"
fs.defaultFS = "hdfs://bicluster"
hdfs_site_path = /data/soft/hadoop/etc/hadoop/hdfs-site.xml
path = /tmp/SeaTunnelTest
file_format_type = orc
compress_codec = snappy
}
Console {
source_table_name = "test"
}
}

SeaTunnel Engine 常驻server服务

修改bin/seatunnel-cluster.s 增加一行 JAVA_OPTS=”-Xms2G -Xmx2G”

./bin/seatunnel-cluster.sh -d
./bin/stop-seatunnel-cluster.sh

Web

需要先启动 SeaTunnel Engine/Zeta Server

source

  • org.apache.seatunnel.api.source.SeaTunnelSource
  • org.apache.seatunnel.api.sink.SeaTunnelSink
  • org.apache.seatunnel.api.table.type.SeaTunnelRow
  • SparkTaskExecuteCommand
  • org.apache.seatunnel.engine.server.rest.RestJobExecutionEnvironment#getLogicalDag
0%