CHEATSHEET March 19, 2024

Flink CheatSheet

Words count 15k Reading time 14 mins. Read count 0

flink

Flink applications code -> JobGraph -> JobManager -> TaskManagers

环境

  • ExecutionEnvironment
  • StreamExecutionEnvironment
  • TableEnvironment

并行度
env.setParallelism(3)

数据源

  • env.fromSequence(from, to)
  • env.fromElements(elements: _*) // 添加数组作为数据输入源
  • env.fromCollection(Seq/Iterator) // 添加List集合作为数据输入源
  • env.fromParallelCollection(SplittableIterator) // 添加List集合作为数据输入源
  • env.socketTextStream(hostname:Ip地址, port:端口, delimiter:分隔符, maxRetry:最大重试次数) // 添加Socket作为数据输入源
  • env.readTextFile(“path”) // 添加文件源
  • env.readFile(fileInputFormat, path)
  • env.readFile(fileInputFormat, path, watchType, interval, pathFilter)
  • env.addSource 自定义source

转Dataset/DataStream

处理
process
keyby

输出-sink
Only streams with sinks added will be executed once the StreamExecutionEnvironment.execute(...) method is called.

  • print() / printToErr() [实际为addSink(PrintSinkFunction)]在标准输出/标准错误流上print每个元素的toString()值。还可以选择在输出之前增加prefix(msg)来帮助区分不同的打印调用。如果并行度大于1,输出还将加上生成输出的任务的标识符。
  • writeAsText() / TextOutputFormat 以字符串的形式逐行写入元素。字符串是通过调用每个元素的toString()方法获得的。
  • writeAsCsv() / CsvOutputFormat 将元组写入以逗号分隔的value文件。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法。
  • writeUsingOutputFormat() / FileOutputFormat: 方法和基类自定义文件输出,支持自定义对象到字节的转换。
  • writeToSocket 根据SerializationSchema将元素写入Socket。
  • addSink 自定义sink函数, Kafka

执行
env.execute(“name”)

install check case

Application Mode

./bin/flink run-application -t yarn-application ./examples/streaming/TopSpeedWindowing.jar
yarn application -kill <ApplicationId>

Session Mode

./bin/yarn-session.sh

1
2
3
./bin/flink run -t yarn-session \
-Dyarn.application.id=application_XXXX_YY \
./examples/streaming/TopSpeedWindowing.jar

端口防护

限定绑定的端口范围 然后做防火墙策略

1
rest.bind-port=4060-4069

防火墙策略

1
DROP       tcp  --  0.0.0.0/0            0.0.0.0/0            tcp dpts:4040:4069

demo

1
2
3
4
5
6
7
8
9
10
11
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object S {


def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
env.socketTextStream("172.25.172.132", 7777).print()
env.execute("Window Stream WordCount")
}
}

maven

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.17.0</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.17.0</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.17.0</version>
<scope>provided</scope>
</dependency>


<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>1.17.1</version>
<scope>provided</scope>
</dependency>
1
nc -lk 7777

read

https://www.jianshu.com/p/0d4cb05d6971

txt

1
2
3
4
5
6
7
val env = ExecutionEnvironment.getExecutionEnvironment
env.readTextFile("C:\\Users\\Manhua\\IdeaProjects\\spark-test\\burpoint_action_mapping_android.csv")
.map(row => {
row.split(",")(0).length.toInt
}).reduce(_+_)
.print()

有修改时,整个文件/目录读取

1
2
3
4
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()  
env.readFile(new TextInputFormat(null), "./testdata",
FileProcessingMode.PROCESS_CONTINUOUSLY,10000).print()
env.execute()

csv

1
2
3
4
5
6
7
8
9
val env = ExecutionEnvironment.getExecutionEnvironment
val a=env.readCsvFile("C:\\Users\\Manhua\\IdeaProjects\\spark-test\\burpoint_action_mapping_android.csv")
.types(classOf[String], classOf[String], classOf[String], classOf[String], classOf[String], classOf[String], classOf[String], classOf[String], classOf[String], classOf[String], classOf[String], classOf[String], classOf[String], classOf[String], classOf[String], classOf[String])

a.map(row => {
row.f2 //.length.toInt
}).print()
println("x")

kafka

1
2
3
4
5
6
7
8
9
val ks = KafkaSource.builder[String]()  
.setBootstrapServers("localhost:9092")
.setTopics("quickstart-events")
.setGroupId("s")
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(classOf[StringDeserializer]))
.build()
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.fromSource[String](ks,WatermarkStrategy.noWatermarks[String](), "Kafka Source").print()
env.execute()
0%