flink
Contents
flink
Flink applications code -> JobGraph -> JobManager -> TaskManagers
环境
- ExecutionEnvironment
- StreamExecutionEnvironment
- TableEnvironment
并行度
env.setParallelism(3)
数据源
- env.fromSequence(from, to)
- env.fromElements(elements: _*) // 添加数组作为数据输入源
- env.fromCollection(Seq/Iterator) // 添加List集合作为数据输入源
- env.fromParallelCollection(SplittableIterator) // 添加List集合作为数据输入源
- env.socketTextStream(hostname:Ip地址, port:端口, delimiter:分隔符, maxRetry:最大重试次数) // 添加Socket作为数据输入源
- env.readTextFile(“path”) // 添加文件源
- env.readFile(fileInputFormat, path)
- env.readFile(fileInputFormat, path, watchType, interval, pathFilter)
- env.addSource 自定义source
转Dataset/DataStream
处理
process
keyby
输出-sinkOnly streams with sinks added will be executed once the StreamExecutionEnvironment.execute(...) method is called.
- print() / printToErr() [实际为addSink(PrintSinkFunction)]在标准输出/标准错误流上print每个元素的toString()值。还可以选择在输出之前增加prefix(msg)来帮助区分不同的打印调用。如果并行度大于1,输出还将加上生成输出的任务的标识符。
- writeAsText() / TextOutputFormat 以字符串的形式逐行写入元素。字符串是通过调用每个元素的toString()方法获得的。
- writeAsCsv() / CsvOutputFormat 将元组写入以逗号分隔的value文件。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法。
- writeUsingOutputFormat() / FileOutputFormat: 方法和基类自定义文件输出,支持自定义对象到字节的转换。
- writeToSocket 根据SerializationSchema将元素写入Socket。
- addSink 自定义sink函数, Kafka
执行
env.execute(“name”)
install check case
Application Mode
./bin/flink run-application -t yarn-application ./examples/streaming/TopSpeedWindowing.jar
yarn application -kill <ApplicationId>
Session Mode
./bin/yarn-session.sh
1 | ./bin/flink run -t yarn-session \ |
端口防护
限定绑定的端口范围 然后做防火墙策略
1 | rest.bind-port=4060-4069 |
防火墙策略
1 | DROP tcp -- 0.0.0.0/0 0.0.0.0/0 tcp dpts:4040:4069 |
demo
1 | import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment |
maven
1 | <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-scala --> |
1 | nc -lk 7777 |
read
https://www.jianshu.com/p/0d4cb05d6971
txt
1 | val env = ExecutionEnvironment.getExecutionEnvironment |
有修改时,整个文件/目录读取
1 | val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI() |
csv
1 | val env = ExecutionEnvironment.getExecutionEnvironment |
kafka
1 | val ks = KafkaSource.builder[String]() |