Flink介绍与使用
[TOC]
Apache Flink 01
Resume
Stateful Computations over Data Streams
基于数据源的【状态计算】:
通过维护和利用状态信息来处理流数据的一种机制,它允许开发者专注于业务逻辑的实现,而Flink 则负责处理状态的存储、访问、故障恢复等底层细节,从而提高了系统的可扩展性和容错性Flink的状态计算主要涉及到有状态的流处理,即根据当前输入的数据和一些已经处理过的数据共同转换输出结果的过程。这些已经处理过的数据被称为状态,由任务维护,并且可以被任务的业务逻辑访问。
Flink 提供了一套完整的状态管理机制:【状态一致性、高效存储和访问、持久化保存和故障恢复以及资源扩展时的调整】,使得开发者可以专注于业务逻辑的开发,而无需过多关注状态管理的细节。
Flink 的状态可以分为托管状态和原始状态。托管状态由 Flink 统一管理,而原始状态则需要开发者自定义,通常只在托管状态无法满足需求时使用。托管状态进一步细分为算子状态和按键状态。而按键状态则适用于按键分组的流处理任务,同一个键共享一个状态。
算子状态:Operator State:一个并行子任务处理的数据共享的一个状态
ListState:将状态表示为一组数据的列表
BroadcastState:只读状态:一个算子有多项任务,而他的每项任务状态都相同键控状态:Keyed State:仅能在【键控数据流(根据键对数据和状态进行分区)】上使用的算子状态。
Flink 的状态管理机制还包括容错性,通过自动按一定时间间隔产生快照,并在任务失败后进行恢复,确保了系统的稳定性。此外,Flink 还提供了多种状态后端,包括内存、RocksDB等,分布式文件系统状态后端。
Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.
Apache Flink 是一个【框架和分布式处理引擎】,用于对无界和有界数据流进行有状态计算。
能够在所有常见的集群环境中运行。
能够以内存速度进行计算。
能够支持任意规模的数据处理。
Concept
Flink’s APIs
![]()
- Low-level
最低级别的抽象仅提供【有状态】且【及时的流处理】功能。
它通过处理函数(Process Function)嵌入到DataStream API中。
它允许用户自由地从一个或多个流中处理事件,并提供一致且容错的状态。
用户可以注册事件时间和处理时间回调,从而使程序能够执行复杂的计算。
- Core
实际上,许多应用程序并不需要上述的低级抽象,而是可以使用核心API(DataStream API,包括有界/无界流)进行编程。这些流畅的API为数据处理提供了常见的构建块,如用户指定的各种转换、连接、聚合、窗口、状态等。这些API中处理的数据类型在相应的编程语言中以类的形式表示。低级的处理函数(Process Function)与DataStream API集成,使得可以根据需要使用低级抽象。DataSet API在有限数据集上提供了额外的原语,如循环/迭代。
- Table
Table API是一个围绕表格的声明式领域特定语言(DSL),这些表格可能是动态变化的(当表示流时)。Table API遵循(扩展的)关系模型:表格附有模式(类似于关系数据库中的表格),并且API提供了类似的操作,如选择、投影、连接、分组、聚合等。Table API程序以声明方式定义应执行的逻辑操作,而不是指定执行该操作的代码的确切外观。尽管Table API可以通过各种用户定义函数进行扩展,但其表达能力不如核心API,但使用起来更为简洁(编写代码更少)。此外,Table API程序还会在执行前通过优化器应用优化规则。
- SQL
用户可以在表格和DataStream/DataSet之间进行无缝转换,从而允许程序将Table API与DataStream和DataSet API混合使用。Flink提供的最高级别抽象是SQL。这种抽象在语义和表达能力上与Table API相似,但它将程序表示为SQL查询表达式。SQL抽象与Table API紧密交互,SQL查询可以在Table API中定义的表格上执行。
Flink Streaming
1、DataStream
DataStream API 的名称来源于 Flink 程序中 DataStream 类。是包含可能重复的数据的不可变集合。有限或无限 API 相同。
DataStream 类似于常规的 Java 集合,但在一些关键方面存在显著差异:不可变的,一旦创建,只能通过 DataStream API 提供的操作(也称为转换)处理。
在 Flink 程序中,通过添加数据源来创建初始的 DataStream。然后使用如 map、filter 等 API 方法从该 DataStream 派生新的 DataStream,并将它们组合起来。这些操作允许你对数据流进行转换和处理,以满足你的数据处理需求。
2、DataSource
Name Version Source Sink Filesystem Bounded and Unbounded Scan, Lookup Streaming Sink, Batch Sink Elasticsearch 6.x & 7.x Not supported Streaming Sink, Batch Sink 👉 Apache Kafka 0.10+ Unbounded Scan Streaming Sink, Batch Sink JDBC Bounded Scan, Lookup Streaming Sink, Batch Sink Apache HBase 1.4.x & 2.2.x Bounded Scan, Lookup Streaming Sink, Batch Sink Apache Hive 3.1.0~3.1.2 Unbounded Scan, Bounded Scan, Lookup Streaming Sink, Batch Sink
依赖
1 |
|
创建 flink 程序的步骤
每个 Flink 程序都由以下基本部分组成:
1、Envrionment 创建
创建执行环境(Execution Environment),设置和执行 Flink 程序的入口。可以是针对批处理(Batch)或流处理(Streaming)的特定环境。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import modules.env.Environments.EnvBuilder
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.time.Time
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.state.hashmap.{HashMapStateBackend, HashMapStateBackendFactory}
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.{CheckpointConfig, StreamExecutionEnvironment}
import java.time.Duration
class Environments{
def build():EnvBuilder = {
new EnvBuilder {
private val see: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
/**
* 启用检查点
* @param hdfs 检查点存储路径:一般存储在 hdfs
* @param timeoutS 检查点创建超时设置,超出时间视为失败
* @param timeBetS 两次检查点创建之间的时间间隔
* @param maxCP 最大保存检查点数量
*/
override def enableCheckpoint(hdfs:String, timeoutS:Long=12,
timeBetS:Long=3, maxCP:Int=2):EnvBuilder = {
val cpConfig: CheckpointConfig = see.getCheckpointConfig
cpConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
cpConfig.setCheckpointStorage(hdfs)
cpConfig.setForceUnalignedCheckpoints(true)
cpConfig.setAlignmentTimeout(Duration.ofSeconds(timeBetS))
cpConfig.setCheckpointInterval(timeBetS*1000)
cpConfig.setCheckpointTimeout(timeoutS*1000)
cpConfig.setMaxConcurrentCheckpoints(2)
this
}
/**
* 计算阶段重启策略:固定失败率,固定延迟,不重启 ...
* @param retry 最大重试次数
* @param retryBetS 两次重试之间时间间隔(秒)
*/
override def enableRetries(retry:Int=3,retryBetS:Int=1):EnvBuilder = {
see.setRestartStrategy(
RestartStrategies
.fixedDelayRestart(retry,Time.seconds(retryBetS))
)
this
}
/**
* 启用状态后端
* @param stateBackend 状态后端存储类型(数据结构):hashmap 与 rocksdb 可选
* @param async 是否开启异步状态后端
* @param incr 是否开启增量状态后端(只有 rocksdb 支持)
*/
override def enableStateBackend(
stateBackend:String="hashmap",
async:Boolean=true,
incr:Boolean=false):EnvBuilder = {
if(!stateBackend.matches("hashmap|rocksdb")){
throw new RuntimeException("only support : hashmap or rocksdb")
}
val config = new Configuration()
config.setString("state.backend",stateBackend)
config.setBoolean("state.backend.async",async)
if(stateBackend.equals("rocksdb") && incr){
config.setBoolean("state.backend.incremental",incr)
}
val backend: HashMapStateBackend = new HashMapStateBackendFactory()
.createFromConfig(config, getClass.getClassLoader)
see.setStateBackend(backend)
this
}
/**
* 基础优化
* @param mode 执行模式:stream(推荐) | batch | automatic
* @param numParallel 默认并行度
* @param maxParallel 最大并行度
*/
override def finish(
mode:RuntimeExecutionMode=RuntimeExecutionMode.STREAMING,
numParallel:Int=3,maxParallel:Int=9):StreamExecutionEnvironment = {
see.setRuntimeMode(RuntimeExecutionMode.STREAMING)
see.setParallelism(numParallel)
see.setMaxParallelism(maxParallel)
see
}
}
}
}
object Environments {
def apply(): Environments = new Environments()
trait EnvBuilder{
def enableCheckpoint(
hdfs:String,timeoutS:Long=12,timeBetS:Long=3,maxCP:Int=2):EnvBuilder
def enableRetries(retry:Int=3,retryBetS:Int=1):EnvBuilder
def enableStateBackend(
stateBackend:String="hashmap", async:Boolean=true,
incr:Boolean=false):EnvBuilder
def finish(
mode:RuntimeExecutionMode=RuntimeExecutionMode.STREAMING,
numParallel:Int=2,maxParallel:Int=4):StreamExecutionEnvironment
}
}
StreamExecutionEnvironment see = new Environments()
.build()
.enableCheckpoint("file:///D:/fink_state_backend", 3, 1, 1)
.enableRetries(3, 1)
.enableStateBackend("hashmap", true, false)
.finish(RuntimeExecutionMode.STREAMING, 1, 3);
/**
核心代码 ...
*/
see.execute("cep_second");
2、Stream 创建
加载或创建初始的数据流(DataStream)可以通过连接到外部数据源(如文件、数据库、👉消息队列等)或使用内部数据源(如程序内定义的集合)来完成。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/*
SourceFunction[OUT]
1、void run(SourceContext<OUT> ctx) throws Exception;
RichSourceFunction[OUT]
2、void open(Configuration parameters) throws Exception;
3、void close() throws Exception;
RichParallelSourceFunction
2、void open(Configuration parameters) throws Exception;
3、void close() throws Exception;
4、void setRuntimeContext(RuntimeContext t)
*/
addSource(SourceFunction<OUT> function) : DataStreamSource<OUT>
addSource(
SourceFunction<OUT> function,
String sourceName) : DataStreamSource<OUT>
addSource(
SourceFunction<OUT> function,
TypeInformation<OUT> typeInfo) : DataStreamSource<OUT>
👉 addSource(
final SourceFunction<OUT> function,
final String sourceName,
@Nullable final TypeInformation<OUT> typeInfo,
final Boundedness boundedness) : DataStreamSource<OUT>
👉 val kafkaSource: KafkaSource[String] = KafkaSource
.builder()
.setTopics("test02")
.setBootstrapServers("master01:9092,master02:9092,worker01:9092")
.setGroupId("flink-kafka-test02-01")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build()
// experimental 实验方法
👉 fromSource(
Source<OUT, ?, ?> source,
WatermarkStrategy<OUT> timestampsAndWatermarks,
String sourceName,
TypeInformation<OUT> typeInfo) : DataStreamSource<OUT>
fromSequence(long from, long to) : DataStreamSource<Long>
fromElements(Class<OUT> type, OUT... data) : DataStreamSource<OUT>
fromCollection(Collection<OUT> data) : DataStreamSource<OUT>
fromCollection(
Iterator<OUT> data,
Class<OUT> type) : DataStreamSource<OUT>
fromCollection(
Iterator<OUT> data,
TypeInformation<OUT> typeInfo) : DataStreamSource<OUT>
fromParallelCollection(
SplittableIterator<OUT> iterator,
Class<OUT> type) : DataStreamSource<OUT>
fromParallelCollection(
SplittableIterator<OUT> iterator,
TypeInformation<OUT> typeInfo) : DataStreamSource<OUT>
fromParallelCollection(
SplittableIterator<OUT> iterator,
TypeInformation<OUT> typeInfo,
String operatorName) : DataStreamSource<OUT>
readTextFile(String filePath) : DataStreamSource<String>
readTextFile(
String filePath,
String charsetName) : DataStreamSource<String>
socketTextStream(
String hostname,
int port,
String delimiter,
long maxRetry) : DataStreamSource<String>
/** 注册缓存文件
运行时,缓存文件可以在任何用户自定义函数中本地路径下读取,文件可以是本地文件或分布式文件系统,需要时运行过程中会将文件临时拷贝到本地缓存。
*/
registerCachedFile(String filePath, String name)
/** 注册任务侦听器
在指定的任务状态改变时,任务侦听器会被通知触发
*/
registerJobListener(JobListener jobListener)
clearJobListeners()
3、Transform 数据转换
对数据流指定转换操作。这些操作定义了如何处理数据,包括过滤、映射、聚合、窗口操作等。通过调用 DataStream API 提供的方法来定义这些转换。
通过算子运算后流的类型会发生转换
数据流:DataStream[OUT]
创建或合并(union)获得
连接流:ConnectedStreams[OUT, OUT2]
广播流:BroadcastStream[OUT]
键值流:KeyedStream[OUT, KEY]
键内联流:JoinedStreams[OUT, OUT2] :1对1
滚动窗口 join
滑动窗口 join
会话窗口 join
键全外流:CoGroupedStreams[OUT, OUT2] :多对多
3.1、DataStream 算子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// 若算子泛型指定 TypeInformation 必须导入包
import org.apache.flink.streaming.api.scala.createTypeInformation
def process(processFunction: ProcessFunction[OUT])
: DataStream[OUT]
def map[R: TypeInformation](fun: T => R)
: DataStream[OUT]
def flatMap[R: TypeInformation](fun: T => TraversableOnce[R])
: DataStream[OUT]
def filter(fun: T => Boolean)
: DataStream[OUT]
def broadcast(broadcastStateDescriptors: MapStateDescriptor[_, _]*)
: BroadcastStream[OUT]
/**
按键进行内连接关联
*/
def join[T2](otherStream: DataStream[T2])
: JoinedStreams[OUT, OUT2]
/**
按键进行全外关联
*/
def coGroup[T2](otherStream: DataStream[T2])
: CoGroupedStreams[OUT, OUT2]
/**
以【FIFO】的方式,【不去重】,将多个【元素类型相同】的流合并成一个数据流
*/
def union(dataStreams: DataStream[T]*)
: DataStream[OUT]
/**
与 union 类似
区别:只能是【两个流】合并,流中元素【类型可以不一致】,两个流可以【共享状态】但【差异化处理】,
常用于通过一个流控制另一个流
*/
def connect[T2](dataStream: DataStream[T2])
: ConnectedStreams[OUT, OUT2]
def keyBy[K: TypeInformation](fun: T => K)
: KeyedStream[T, K]
3.2、KeyedStream 算子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 参数 String field | int poisition
// Field expressions are only supported on POJO types, tuples, and 👉 case classes
max
maxBy
min
minBy
reduce
new ReduceFunction<Tuple3>() {
@Override
public Tuple3 reduce(Tuple3 v1, Tuple3 v2) throws Exception {
Integer t1 = (Integer)v1._2();
Integer t2 = (Integer)v2._2();
return t1<=t2 ? v2 : v1;
}
}
aggregate
new AggregateFunction<Tuple3, V, V>() {
// 初始化
@Override
public V createAccumulator() {
return new V();
}
// map : 叠加
@Override
public V add(Tuple3 value, V accumulator) {
if(accumulator.getId()==0){
accumulator.setId((Integer) value._1());
}
accumulator.acc((Integer) value._2());
return accumulator;
}
// 出口
@Override
public V getResult(V accumulator) {
return accumulator;
}
// combiner | reduce : 聚合
@Override
public V merge(V a, V b) {
b.acc(a.getTemperature());
return b;
}
}
sideOutputLateData(OutputTag<T> outputTag) // 侧输出流 ???
4、Sink 创建
指定将计算结果发送到目的地。这通常是通过定义接收器(Sink)来完成的,接收器负责将结果数据写入外部系统,如文件、数据库、消息队列或标准输出等。只有添加了 sink 的流才可以被执行。
1
2
def addSink(SinkFunction<T> sinkFunction) : DataStreamSink<T>
def sinkTo(Sink<T, ?, ?, ?> sink) : DataStreamSink<T>
5、execute 触发执行
触发程序的执行。在 Flink 中,这通常是通过调用执行环境的 execute()
方法来完成的,该方法会启动 Flink 作业的执行流程,包括数据流的处理和结果的输出。
1
2
def JobExecutionResult execute(String jobName) : JobExecutionResult
def executeAsync(String jobName) : JobClient
执行模式
批处理与流处理
批处理(BATCH)执行模式仅适用于有界作业 / Flink 程序。有界性是一个数据源的特性,它告诉我们该数据源在执行前是否已知所有输入,或者是否会有新数据出现,且这种新数据的出现可能是无限的。数据无界作业无界,否则有界。
👉 流处理(STREAMING)执行模式既可用于有界作业,也可用于无界作业。
作为一般规则,当你的程序是有界的时,你应该使用批处理执行模式,因为这将更加高效。而当你的程序是无界的时,你必须使用流处理执行模式,因为只有这种模式足够通用,能够处理连续的数据流。
状态后端
👉 在流处理(STREAMING)模式下,Flink 使用 StateBackend 来控制状态的存储方式和检查点(checkpointing)的工作机制。StateBackend 定义了状态数据(如窗口、键值对状态等)在 Flink 集群中的存储和恢复方式。
在批处理(BATCH)模式下,配置的 StateBackend 会被忽略。
失败恢复
在流式执行模式下,Flink 使用检查点(checkpointing)来进行失败恢复。检查点用于失败恢复的一个特点是,Flink 在遇到失败时会从检查点重启所有正在运行的任务。在批处理执行模式下,Flink 会尝试回溯到仍然有中间结果的先前处理阶段。
处理顺序
在 Flink 中,操作符或用户定义函数(UDFs)处理记录的顺序在批处理(BATCH)和流处理(STREAMING)执行模式之间可能会有所不同。
在流处理模式下,用户定义函数不应该对输入记录的顺序做任何假设。数据一旦到达就会被处理,因此输入记录的顺序可能受到数据源、网络延迟、并行度等多种因素的影响。
在批处理执行模式下,Flink 对某些操作保证了顺序性。这种顺序性可能是特定任务调度、网络洗牌(shuffle)和状态后端(如上所述)的副作用,也可能是系统有意识的选择。然而,需要注意的是,并非所有批处理操作都保证顺序性,这取决于具体的操作类型和配置。
我们可以区分出三种一般类型的输入:
- 广播输入(broadcast input):来自广播流的输入(也参见广播状态)
- 常规输入(regular input):既不是广播也不是键控的输入
- 键控输入(keyed input):来自 KeyedStream 的输入
对于消耗多种输入类型的函数或操作符,它们将按照以下顺序处理这些输入:
- 首先处理广播输入
- 然后处理常规输入
- 最后处理键控输入