Apache Flink
是一个用于处理数据流的开源分布式计算框架,具有以下主要特点和功能:
1. 实时流处理:
- 低延迟: Flink 能够以亚秒级的延迟处理数据流,非常适合对时间敏感的应用,如实时分析、监控和告警系统。
- 状态管理: Flink 提供了对状态的内置支持,使得开发有状态的流式处理应用变得容易,如窗口操作、复杂事件处理等。
2. 批处理和流处理的统一:
- Flink 既可以用于流处理,也可以用于批处理,允许用户在一个框架中编写应用程序,而不必在批处理和流处理之间切换。
- 事件时间处理: Flink 支持事件时间语义,可以基于数据本身的时间戳进行处理,而不是数据到达的时间,这对于处理无序数据流非常重要。
3. 高度可扩展性:
- Flink 能够在大规模分布式集群上运行,处理从几千到上百万个事件每秒的数据流。
- 弹性和容错: Flink 使用检查点和保存点机制来提供容错能力,确保在发生故障时可以从之前的状态恢复,减少数据丢失。
4. 支持多种数据源和接收器:
- Flink 能够与多种数据源和接收器集成,如 Kafka、HDFS、Cassandra、Elasticsearch 等,使其可以轻松地处理和存储来自不同系统的数据。
5. 丰富的 API 和库:
- DataStream API: 用于流处理,允许开发者定义复杂的数据流处理逻辑。
- DataSet API: 用于批处理,提供了丰富的操作符来处理静态数据集。
- Table API 和 SQL: 提供了一个更高级别的 API,允许开发者使用 SQL 查询来处理数据流和数据集。
- 机器学习和图处理库: Flink 提供了机器学习库(FlinkML)和图处理库(Gelly),适用于高级分析任务。
6. 部署灵活性:
- Flink 可以部署在多种环境中,如独立集群、YARN、Kubernetes、Mesos 以及本地环境中。
- 流批一体: Flink 支持将批处理和流处理集成到同一个应用程序中,简化了部署和管理。
7. 社区与生态系统:
- Flink 由一个活跃的开源社区维护和发展,生态系统日益壮大,支持越来越多的第三方工具和集成。
典型应用场景
- 实时数据分析: Flink 可用于处理实时事件流,提供实时分析、告警和监控。
- 复杂事件处理: Flink 能够处理和识别复杂事件模式,用于金融监控、欺诈检测等。
- 日志处理: 可以实时处理和分析来自各种系统的日志数据,提取有价值的信息。
- 机器学习: Flink 的流处理能力可以用于实时更新机器学习模型,或在流数据上直接进行预测。
Apache Flink 适用于各种需要实时和批处理的应用程序,尤其是在处理大规模数据流时表现出色。·
wordCount 1 批处理
1 2 3 4 5 6 7 8 9 10 11 12
| val env = ExecutionEnvironment.getExecutionEnvironment
env.readTextFile("data/words.txt") .flatMap(_.replaceAll("[^a-zA-Z ]", "").split("\\s+")) .map((_, 1)) .groupBy(0) .sum(1) .print()
|
文件数据(有界流)处理
1 2 3 4 5 6 7 8 9 10 11 12
| val env = StreamExecutionEnvironment.getExecutionEnvironment
env.readTextFile("data/words.txt") .flatMap(_.replaceAll("[^a-zA-Z ]", "").split("\\s+")) .map((_, 1)) .keyBy(_._1) .sum(1) .print()
env.execute("Word Count Example")
|
Socket (无界流) 处理
1 2 3 4 5 6 7 8 9 10
| val env = StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("master01", 9999) .flatMap(_.replaceAll("[^a-zA-Z ]", "").split("\\s+")) .map((_, 1)) .keyBy(_._1) .sum(1) .print()
env.execute("socket stream")
|
二、Flink下载,集群安装配置
官方下载地址:Downloads | Apache Flink
2.0 下载、解压、配置环境变量
1 2 3 4 5 6 7
| wget https://dlcdn.apache.org/flink/flink-1.20.0/flink-1.20.0-bin-scala_2.12.tgz
tar -zxvf flink-1.20.0 -C /opt/software
vim /etc/profile.d/myenv.sh
|
2.1 进入 Flink 配置目录:
2.2 编辑 masters
文件:
在文件中指定 JobManager 的主机名或 IP 地址。如果有多个 JobManager(用于高可用性),每个 JobManager 使用一行。
1 2 3 4 5 6 7
| <JobManager1>:<port> <JobManager2>:<port>
master01:8081 master02:8081
|
2.3 编辑 workers
文件:
在文件中列出所有 TaskManager 的主机名或 IP 地址,每个 TaskManager 使用一行
1 2 3 4 5 6 7
| <TaskManager1> <TaskManager2>
worker01 worker02
|
2.4 编辑 flink-conf.yaml
文件:
flink-conf.yaml
是 Flink 的主要配置文件,用于配置各种集群参数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| jobmanager.rpc.address:
jobmanager.bind-host: 0.0.0.0
taskmanager.host: master01
taskmanager.bind-host: 0.0.0.0
rest.address: master01
rest.bind-address: 0.0.0.0
|
以下看需配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| taskmanager.memory.process.size: 1024m
taskmanager.numberOfTaskSlots: 4
high-availability.type: zookeeper high-availability.zookeeper.quorum: localhost:2181 high-availability.zookeeper.path.root: /flink high-availability.cluster-id: /cluster_one high-availability.storageDir: hdfs:///flink/recovery state.checkpoints.dir: hdfs:///flink/checkpoints state.savepoints.dir: hdfs:///flink/savepoints
parallelism.default: 4
taskmanager.log.dir: /var/log/flink
fs.default-scheme: hdfs://namenode:9000
|
2.5 启动 Flink 集群
1 2
| $FLINK_HOME/bin/stop-cluster.sh $FLINK_HOME/bin/start-cluster.sh
|
三、提交Flink任务
