Kafka介绍安装

Kafka

分布式消息队列

  • 多副本
    • 容错:in-sync replicas
      • I S R =zoo keeper=> Leader 副本丢失,从 I S R中选择新的 Leader
      • I S R 宕机 => 从剩余的 follower 中选择替代
    • 读写分离
      • Leader 负责写操作
      • I S R 中任何一个 replica 都可以读、
  • 多分区:M P => multiple partitions
    • 低延时
    • 主题分区数量最优设计:节点数 * 物理核数
  • 零拷贝
  • 产销解耦
    • 生产者 —> Kafka <— 消费者
    • 生产者数量:分区数个生产者轮询写入,均匀分布
    • 消费者数量:分区数个消费者一对一读取,并行消费
    • 消费模式
      • 指定位置 –from-beginning
      • 分组消费 –group-id
        • 在服务端存储分组名,主题和偏移量的映射数据
      • 客户端记录消费位置
        • r e d i s
        • m y s q l

Install

single

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
##################
# zookeeper 管理 #
##################
# 配置
config/server.properties
#----------------------------------
num.partitions=1 #分区数
#超过一周或文件大小超过1个G,删除历史数据
delete.topic.enable=true #允许删除主题
log.retention.hours=168 #数据保留一周
log.segment.bytes=1073741824‬ #数据最大为4G
log.retention.check.interval.ms=300000 #多久检查一次根据规则是否删除数据

zookeeper.connect=single:2181
#----------------------------------

# 启动服务
zkServer.sh start
kafka-server-start.sh config/server.properties
nohup kafka-server-start.sh /opt/software/kafka_2.12_2.8.0/config/server.properties 1>/dev/null 2>&1 &

##############
# kraft 模式 #
##############
# 配置
config/kraft/server.properties
#----------------------------------
advertised.listeners=PLAINTEXT://single:9092
#----------------------------------

kafka-storage.sh random-uuid #=> sd1Ksjx8Sh6fkpxy0YhQQw
kafka-storage.sh format -t sd1Ksjx8Sh6fkpxy0YhQQw -c config/kraft/server.properties
#如果为集群每个节点都要执行

#启动服务
kafka-server-start.sh config/kraft/server.properties

cluster

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 配置
config/server.properties
#----------------------------------
broker.id=0 #集群多节点编号 0,1,2
delete.topic.enable=true #允许删除主题
num.partitions=3 #最佳配置为broker的数量
#超过一周或文件大小超过1个G,删除历史数据
log.retention.hours=168 #数据保留一周
log.segment.bytes=1073741824‬ #数据最大为4G
log.retention.check.interval.ms=300000 #多久检查一次根据规则是否删除数据
zookeeper.connect=why:2181,zzy:2181,zyh:2181 #zookeeper集群
#----------------------------------

# 分发(远程拷贝)至其他节点

# 群启zk和kafka服务

commands

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#shell 控制台处理
#查看主题
kafka-topics.sh --list --bootstrap-server single:9092

#创建主题
kafka-topics.sh --create --topic test01 --partitions 1 --replication-factor 1 --bootstrap-server single:9092

#查看主题详情
kafka-topics.sh --describe --topic test01 --bootstrap-server single:9092

#创建控制台消费者
#默认的键值反序列化都是字符串:
# --key|value-deserializer
# org.apache.kafka.common.serialization.StringDeserializer
# --group group01
kafka-console-consumer.sh --bootstrap-server single:9092 --topic test01 --property print.key=true --from-beginning

#创建控制台生产者 vim events.log
kafka-console-producer.sh --broker-list single:9092 --topic test01 < events.log

# 删除主题和数据(不能被正在生产或消费)
kafka-topics.sh --bootstrap-server single:9092 --delete --topic test01

j a v a 处理

  • 依赖

    1
    2
    3
    4
    5
    6
    <!-- 2.8.0 -->
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>${kafka.version}</version>
    </dependency>
  • 生产者配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
      bootstrap.servers=single01:9092
    batch.size=16384
    linger.ms=3000
    acks=1
    retries=3
    retry.backoff.ms=100
    key.serializer=org.apache.kafka.common.serialization.IntegerSerializer
    value.serializer=org.apache.kafka.common.serialization.StringSerializer
    fake.user.id=10000
    fake.cmm.id=100000
    fake.shop.id=1000
    fake.action.types=click,hover,roll
    fake.event.types=KeyWordSearch,ClassifiedSearch,AddToCart,RemoveFromCart,Collect,DirectPay,CartPay
    fake.kafka.topic=kb16_press_t1
  • 消费者配置

    1
    2
    3
    4
    5
    6
    7
    bootstrap.servers=single:9092
    key.deserializer=classOf[StringDeserializer]
    value.deserializer=classOf[StringDeserializer]
    group.id=test_01
    auto.offset.reset=latest
    enable.auto.commit=true
    auto.commit.interval.ms=200
1
2



Kafka介绍安装
https://leaf-domain.gitee.io/2024/08/01/bigdata/kafka/kafka_install/
作者
叶域
发布于
2024年8月1日
许可协议