分布式消息队列Kafka
简介: Kafka 是一个分布式消息队列系统,用于处理实时数据流。消息按照主题(Topic)进行分类存储,发送消息的实体称为 Producer,接收消息的实体称为 Consumer。Kafka 集群由多个 Kafka 实例(Server)组成,每个实例称为 Broker。
主要用途:广泛应用于构建实时数据管道和流应用程序,适用于需要高吞吐量和低延迟的数据处理场景
依赖:Kafka 集群和消费者依赖 Zookeeper 集群来保存元数据信息(如偏移量、Broker 列表等),以确保系统的可用性和一致性。Zookeeper 是 Kafka 的协调服务,用于管理和监控 Kafka 的状态。
一、Kafka 数据交互与 JMS 模型的借鉴
1.1 数据交互
- 线程间交互:通过共享堆内存实现。
- 进程间交互:通过 Socket 实现。
使用消息中间件的原因:缓冲与解耦
- 资源管理:在生产速度远大于消费速度时,使用消息中间件可以防止内存和磁盘资源耗尽。
- 降低负担:消息中间件可以帮助生产者将数据分类并分发给不同的消费者,从而减少生产者的处理负担。
- 系统解耦:通过消息中间件,生产者与消费者无需直接连接,降低系统耦合度,提高系统的可用性和可扩展性。
1.2 Kafka 与 JMS 的借鉴与区别
- JMS (Java Message Service) 模型:
- P2P(点对点)模型:一条消息只会被消费一次,具有反馈机制。
- PS(发布/订阅)模型:消息可以被多个消费者订阅和消费。
- Kafka 的借鉴与区别:
- 借鉴但不完全遵循 JMS:虽然 Kafka 参考了 JMS 的模型,但它与传统的消息中间件(如 RabbitMQ、ActiveMQ、RocketMQ)有所不同。
- 术语对比
- 消息(Message) 在 Kafka 中称为 记录(Record)。
- JMS Provider 在 Kafka 中称为 Kafka Broker。
- 消费顺序的索引 在 Kafka 中称为 偏移量(Offset)。
二、Kafka 特性
2.1 核心特性
- 高吞吐量:Kafka 能够处理大量数据流,特别适用于实时数据管道和流应用程序,能够在大规模数据传输中保持高性能。
- 可扩展性:Kafka 可以通过增加 Broker 和分区的方式水平扩展,支持数据处理能力的动态增长。
- 容错性:Kafka 采用多副本机制和分布式架构,确保数据在副本之间的同步,从而提供高可用性和容错能力。
- 持久性和可复制性:Kafka 支持将消息持久化存储在磁盘上,并在多个副本间进行复制,以保证数据的安全性和一致性。
2.2 多副本机制
- 容错性(In-Sync Replicas, ISR)
- Kafka 通过多副本机制确保数据的高可用性。ISR 是一组与 Leader 副本保持同步的副本,当 Leader 副本丢失时,会从 ISR 中选出一个新的 Leader。
- 如果 ISR 中的副本宕机,Kafka 将从剩余的副本中选择替代者,以保持系统的可用性和数据的完整性。
- 读写分离
- 在 Kafka 中,Leader 副本负责写操作,ISR 中的任何副本都可以处理读操作。这种设计能够提高系统的读写性能,避免单点性能瓶颈。
2.3 多分区机制
- 分区(Partitions)
- Kafka 的主题(Topic)被分成多个分区,每个分区可以分布在不同的节点上。通过这种方式,Kafka 实现了数据的并行处理能力,提高了系统的吞吐量。
- 分区数量的设计通常考虑节点数与物理核数,以保证在实现高性能的同时最大化资源的利用率。
- 低延迟
- Kafka 的多分区机制有助于实现低延迟的数据处理。由于数据可以在多个分区上并行处理,Kafka 能够在处理大规模数据流时保持较低的延迟。
2.4 零拷贝技术
- Kafka 采用零拷贝(Zero-Copy)技术,通过直接将数据从磁盘传输到网络,减少了 CPU 的负载,显著提高了数据传输的效率。这种技术使得 Kafka 能够在高吞吐量的场景中保持低资源消耗。
2.5 产销解耦
- 生产者和消费者的解耦:
- Kafka 实现了生产者与消费者的解耦。生产者将数据写入 Kafka,而消费者从 Kafka 中读取数据,这种设计使得生产者和消费者可以独立扩展。
- 分区机制使得多个生产者可以通过轮询的方式将数据均匀地写入到不同的分区,从而实现负载均衡。同样,多个消费者也可以并行地从不同的分区读取数据,提高了数据处理的并发性。
- 消费模式:
- 指定位置消费:消费者可以从指定的位置开始消费数据,例如使用
--from-beginning
标志从头开始消费。
- 分组消费(Group Consumption):消费者可以通过指定
--group-id
参数加入消费组,Kafka 在服务端会存储分组名、主题和偏移量的映射关系,从而实现多消费者的协调消费。
- 客户端记录消费位置:消费者还可以自行记录消费位置,如使用 Redis 或 MySQL 来记录偏移量,方便在消费者重启或故障恢复时继续从上次消费的地方开始。
三、Kafka 场景应用
Kafka 广泛应用于各种需要处理大规模实时数据流的场景
3.1 日志收集与聚合
- 应用日志收集
- Kafka 常用于集中式日志管理,将来自不同应用的日志收集到统一的 Kafka 主题中,然后通过消费者将这些日志写入到持久化存储(如 HDFS、Elasticsearch)进行分析和检索。
- 这种方法可以实时监控应用程序的运行状况,快速定位问题,并支持日志数据的长期保存和历史回溯。
3.2 实时数据流处理
- 实时数据分析
- Kafka 作为数据流的中转站,将来自各种数据源的实时数据传递给流处理框架(如 Apache Storm、Apache Flink、Apache Spark Streaming)进行实时分析。
- 这种场景下,Kafka 可以处理点击流、用户行为日志、传感器数据等,并将分析结果实时反馈到业务系统中。
3.3 数据管道与ETL
- 数据管道
- Kafka 常用于构建跨系统的数据管道,将数据从一个系统可靠地传输到另一个系统。Kafka 能够确保数据传输过程中的高吞吐量和低延迟,同时支持大规模的分布式数据处理。
- 在典型的 ETL(Extract, Transform, Load)场景中,Kafka 被用作数据的传输通道,支持数据的实时采集、转换和加载。
3.4 消息队列
- 事件驱动架构
- Kafka 作为消息队列用于事件驱动架构中,将系统中的事件消息传递给多个独立的服务进行异步处理。
- 通过 Kafka,开发者可以实现应用程序内各模块之间的松耦合,使得应用程序更加灵活和可扩展。
3.5 用户行为跟踪
- 点击流分析
- 在电商网站或社交媒体平台,Kafka 可以用于捕获和处理用户的点击流数据。这些数据可以被用于实时个性化推荐、广告投放优化、用户行为分析等场景。
- Kafka 的高吞吐量特性使其非常适合处理大量的用户交互数据,支持实时响应和数据分析。
3.6 监控与报警
- 系统监控
- Kafka 可以用于系统监控数据的收集和处理。监控系统可以通过 Kafka 将各种指标数据(如 CPU 使用率、内存使用率、网络流量等)发送到集中式监控平台,如 Prometheus 或 Grafana 进行可视化和告警。
- 这种方式能够实时监控系统的健康状况,并及时处理异常情况。
3.7 物联网(IoT)
- 传感器数据收集与处理
- 在物联网场景中,Kafka 可以用于收集和处理来自各类传感器的大量实时数据。Kafka 能够高效地将这些数据传递给数据处理和分析系统,以便实时监控和自动化决策。
- 通过 Kafka,物联网系统可以在不同的地点之间高效、可靠地传输数据,并确保数据的一致性和完整性。
四、Kafka的安装
4.1 下载 Kafka
官方下载地址:Kafka 下载
1 2 3 4 5 6 7 8
| wget https://downloads.apache.org/kafka/3.8.0/kafka_2.12-3.8.0.tgz
tar -xzf kafka_2.12-3.8.0.tgz
mv kafka_2.12-3.8.0 kafka
|
4.2 配置 Kafka
在解压后的 Kafka 目录中,有几个重要的配置文件:
- server.properties:Kafka 服务器的主要配置文件。
- zookeeper.properties:Zookeeper 的配置文件。
4.2.1 配置 Zookeeper
Zookeeper 是 Kafka 的分布式协调服务,Kafka 在启动前需要先启动 Zookeeper。
4.2.2 配置 Kafka 服务器
编辑 server.properties
文件来配置 Kafka 服务器。
常用配置项:
1 2 3 4 5
| broker.id=0 log.dirs=/tmp/kafka-logs zookeeper.connect=localhost:2181
|
4.3 启动、停止 Zookeeper 和 Kafka
Kafka 依赖 Zookeeper,因此需要先启动 Zookeeper。
1 2 3 4 5 6
| bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties nohup kafka-server-start.sh /opt/software/kafka_2.12-3.3.1/config/server.properties >/dev/null 2>&1 &
|
1 2 3 4 5
| bin/kafka-server-stop.sh
bin/zookeeper-server-stop.sh
|
五、Kafka命令行操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
|
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test
kafka-topics.sh --bootstrap-server localhost:9092 --list
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic test
kafka-topics.sh --bootstrap-server localhost:9092 --topic test --alter --partitions 2
kafka-topics.sh --bootstrap-server localhost:9092 --topic test --delete
|
1 2 3 4 5 6 7 8
| kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
kafka-console-producer.sh --broker-list localhost:9092 --topic test < xxx.text
|
六、Kafka Scala API
6.1 依赖
1 2 3 4 5 6 7 8 9 10 11 12
| <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.6.1</version> </dependency>
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>3.6.1</version> </dependency>
|
6.2 AdminAPI
Admin API 用于管理 Kafka 集群、主题、分区等资源。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| package kafkaAPI
import java.util.{Collections, Properties} import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewTopic}
object AdminAPI { def main(args: Array[String]): Unit = { val props = new Properties() props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "master01:9092")
val adminClient = AdminClient.create(props)
val newTopic = new NewTopic("my-new-topic", 1, 1.toShort) adminClient.createTopics(Collections.singletonList(newTopic)).all().get()
adminClient.close() } }
|
6.3 Producer API
生产者 API 用于将记录发布到一个或多个 Kafka 主题中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| package kafkaAPI
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.serialization.StringSerializer
import java.util.Properties
object ProducerAPI { def main(args: Array[String]): Unit = { val props = new Properties() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master01:9092") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName) props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
val producer = new KafkaProducer[String, String](props)
val record = new ProducerRecord[String, String]("my-topic", "key2", "value2") producer.send(record)
producer.close() } }
|
6.4 ConsumerAPI
消费者 API 用于从 Kafka 主题中读取记录,支持自动和手动提交偏移量。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| package kafkaAPI
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} import org.apache.kafka.common.serialization.StringDeserializer import java.time.Duration import java.util.{Collections, Properties}
object ConsumerAPI { def main(args: Array[String]): Unit = { val props = new Properties() props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "master02:9092") props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group") props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName) props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)
val consumer = new KafkaConsumer[String, String](props) consumer.subscribe(Collections.singletonList("my-topic"))
while (true) { val records = consumer.poll(Duration.ofMillis(100)) records.forEach(record => { println(s"offset = ${record.offset()}, key = ${record.key()}, value = ${record.value()}") }) } } }
|
6.5 Streams API
Streams API 用于构建具有状态和无状态的流处理应用程序。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| package kafkaAPI
import java.util.Properties import org.apache.kafka.streams.{KafkaStreams, StreamsBuilder, StreamsConfig} import org.apache.kafka.streams.kstream.KStream import org.apache.kafka.common.serialization.Serdes
object StreamsAPI { def main(args: Array[String]): Unit = { val props = new Properties() props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream") props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "master01:9092") props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass.getName) props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass.getName)
val builder = new StreamsBuilder() val stream: KStream[String, String] = builder.stream("input-topic")
val processedStream: KStream[String, String] = stream.mapValues(value => value + "-out")
processedStream.to("output-topic")
val streams = new KafkaStreams(builder.build(), props) streams.start() } }
|
七、Kafka与大数据的集成
Kafka 能够在大数据生态系统中充当数据流的核心传输和处理管道。
Kafka 可以与多种大数据技术集成,以实现强大的数据流处理能力。
与 Hadoop 集成,通过 Kafka Connect 将数据导入 HDFS 进行批量处理;
与 Spark 集成,使用 Spark Streaming 进行实时数据处理;
与 Flink 集成,提供低延迟流处理和复杂事件处理;
与 Elasticsearch 集成,实现实时数据索引和搜索;
与 MongoDB 集成,进行实时数据存储和查询;
以及与 Redis 集成,用于实时缓存和快速数据访问。
7.1 Kafka flume
Flume 可以将数据从各种来源传输到 Kafka 集群中,然后 Kafka 再将这些数据传输到其他系统进行处理和存储
flume采集数据到Kafka的配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| a1.sources = r1 a1.channels = c1
a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/data/test.log a1.sources.r1.positionFile = /opt/software/flume-1.9.0/taildir_position.json
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = single01:9092 a1.channels.c1.kafka.topic = test a1.channels.c1.parseAsFlumeEvent = false
a1.sources.r1.channels = c1
|
执行flume操作采集数据到Kafka
1 2 3 4
| cd /opt/module/flume
bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf
|
bin/flume-ng agent
启动 Flume 代理。
-n a1
指定代理名称为 a1
。
-c conf/
指定配置目录为 conf/
。
-f job/file_to_kafka.conf
指定具体的 Flume 配置文件。
7.2 Kafka Spark Streaming
使用 Spark Streaming 进行实时数据处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| <spark.scala.version>2.12</spark.scala.version>
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${spark.scala.version}</artifactId> <version>${spark.version}</version> </dependency>
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${spark.scala.version}</artifactId> <version>${spark.version}</version> </dependency>
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_${spark.scala.version}</artifactId> <version>${spark.kafka.version}</version> </dependency>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.kafka.common.serialization.StringDeserializer import shaded.parquet.org.codehaus.jackson.map.deser.std.StdDeserializer.IntegerDeserializer
object KafkaSparkStream {
def main(args: Array[String]): Unit = { val conf = new SparkConf() .setAppName("spark-streaming-kafka-01") .setMaster("local[*]")
val spark: SparkSession = SparkSession .builder() .config(conf) .getOrCreate()
import spark.implicits._ import org.apache.spark.sql.functions._
val ssc = new StreamingContext(spark.sparkContext, Seconds(5)) val topic = "test"
val kafkaParams = Map( (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hostname:9092"), (ConsumerConfig.GROUP_ID_CONFIG, "group01"), (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName), (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName), )
KafkaUtils.createDirectStream[Int, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[Int, String](Array(topic), kafkaParams) ) .foreachRDD( ) ssc.start() ssc.awaitTermination()
} }
|
7.3 Kafka Flink
Flink是分布式计算引擎,是一款非常强大的实时分布式计算框架,可以将Kafka作为数据源进行处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.17.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>1.17.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>1.17.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>1.17.0</version> </dependency>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.flink.streaming.api.datastream.DataStream import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.StringDeserializer import org.apache.flink.api.common.serialization.SimpleStringSchema
import java.util.Properties
object KafkaFlink { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties() properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test") properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName) properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getName)
val consumer = new FlinkKafkaConsumer[String]("my-topic", new SimpleStringSchema(), properties) val stream: DataStream[String] = env.addSource(consumer)
stream.print()
env.execute("Flink Kafka Example") } }
|