Contents
  1. 1. flink
  2. 2. install check case
    1. 2.1. Application Mode
    2. 2.2. Session Mode
  3. 3. 端口防护
  4. 4. demo
  5. 5. read
    1. 5.1. txt
    2. 5.2. csv
    3. 5.3. kafka

flink

Flink applications code -> JobGraph -> JobManager -> TaskManagers

环境

  • ExecutionEnvironment
  • StreamExecutionEnvironment
  • TableEnvironment

并行度
env.setParallelism(3)

数据源

  • env.fromSequence(from, to)
  • env.fromElements(elements: _*) // 添加数组作为数据输入源
  • env.fromCollection(Seq/Iterator) // 添加List集合作为数据输入源
  • env.fromParallelCollection(SplittableIterator) // 添加List集合作为数据输入源
  • env.socketTextStream(hostname:Ip地址, port:端口, delimiter:分隔符, maxRetry:最大重试次数) // 添加Socket作为数据输入源
  • env.readTextFile(“path”) // 添加文件源
  • env.readFile(fileInputFormat, path)
  • env.readFile(fileInputFormat, path, watchType, interval, pathFilter)
  • env.addSource 自定义source

转Dataset/DataStream

处理
process
keyby

输出-sink
Only streams with sinks added will be executed once the StreamExecutionEnvironment.execute(...) method is called.

  • print() / printToErr() [实际为addSink(PrintSinkFunction)]在标准输出/标准错误流上print每个元素的toString()值。还可以选择在输出之前增加prefix(msg)来帮助区分不同的打印调用。如果并行度大于1,输出还将加上生成输出的任务的标识符。
  • writeAsText() / TextOutputFormat 以字符串的形式逐行写入元素。字符串是通过调用每个元素的toString()方法获得的。
  • writeAsCsv() / CsvOutputFormat 将元组写入以逗号分隔的value文件。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法。
  • writeUsingOutputFormat() / FileOutputFormat: 方法和基类自定义文件输出,支持自定义对象到字节的转换。
  • writeToSocket 根据SerializationSchema将元素写入Socket。
  • addSink 自定义sink函数, Kafka

执行
env.execute(“name”)

install check case

Application Mode

./bin/flink run-application -t yarn-application ./examples/streaming/TopSpeedWindowing.jar
yarn application -kill <ApplicationId>

Session Mode

./bin/yarn-session.sh

1
2
3
./bin/flink run -t yarn-session \
-Dyarn.application.id=application_XXXX_YY \
./examples/streaming/TopSpeedWindowing.jar

端口防护

限定绑定的端口范围 然后做防火墙策略

1
rest.bind-port=4060-4069

防火墙策略

1
DROP       tcp  --  0.0.0.0/0            0.0.0.0/0            tcp dpts:4040:4069

demo

1
2
3
4
5
6
7
8
9
10
11
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object S {


def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()
env.socketTextStream("172.25.172.132", 7777).print()
env.execute("Window Stream WordCount")
}
}

maven

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.17.0</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.17.0</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.17.0</version>
<scope>provided</scope>
</dependency>


<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>1.17.1</version>
<scope>provided</scope>
</dependency>
1
nc -lk 7777

read

https://www.jianshu.com/p/0d4cb05d6971

txt

1
2
3
4
5
6
7
val env = ExecutionEnvironment.getExecutionEnvironment
env.readTextFile("C:\\Users\\Manhua\\IdeaProjects\\spark-test\\burpoint_action_mapping_android.csv")
.map(row => {
row.split(",")(0).length.toInt
}).reduce(_+_)
.print()

有修改时,整个文件/目录读取

1
2
3
4
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI()  
env.readFile(new TextInputFormat(null), "./testdata",
FileProcessingMode.PROCESS_CONTINUOUSLY,10000).print()
env.execute()

csv

1
2
3
4
5
6
7
8
9
val env = ExecutionEnvironment.getExecutionEnvironment
val a=env.readCsvFile("C:\\Users\\Manhua\\IdeaProjects\\spark-test\\burpoint_action_mapping_android.csv")
.types(classOf[String], classOf[String], classOf[String], classOf[String], classOf[String], classOf[String], classOf[String], classOf[String], classOf[String], classOf[String], classOf[String], classOf[String], classOf[String], classOf[String], classOf[String], classOf[String])

a.map(row => {
row.f2 //.length.toInt
}).print()
println("x")

kafka

1
2
3
4
5
6
7
8
9
val ks = KafkaSource.builder[String]()  
.setBootstrapServers("localhost:9092")
.setTopics("quickstart-events")
.setGroupId("s")
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(classOf[StringDeserializer]))
.build()
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.fromSource[String](ks,WatermarkStrategy.noWatermarks[String](), "Kafka Source").print()
env.execute()