flume

flume

install
1
2
3
4
5
6
7
8
9
10
# 复制 hadoop 相关依赖
cp /opt/software/hadoop-3.1.3/share/hadoop/*/*.jar /opt/software/flume-1.9.0/lib
# 复制 hive 相关依赖
cp /opt/software/hive-3.1.2/hcatalog/share/hcatalog/*.jar /opt/software/flume-1.9.0/lib
cp /opt/software/hive-3.1.2/lib/hive-*.jar /opt/software/flume-1.9.0/lib
cp /opt/software/hive-3.1.2/lib/antlr*.jar /opt/software/flume-1.9.0/lib

# java.lang.OutOfMemoryError: GC overhead limit exceeded
vim bin/flume-ng
JAVA_OPTS="-Xmx1024m"
conf file
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# hdfs sink
a1.sources = r1
a1.channels = c1
a1.sinks = s1

a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /root/project_ebs/act_log_extract/flume_config/position-file/transaction_pos.log
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /root/ebs_act_log/transaction_log/part-.*
a1.sources.r1.fileHeader = false

a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /root/project_ebs/act_log_extract/flume_config/channel-checkpoint
a1.channels.c1.dataDirs = /root/project_ebs/act_log_extract/flume_config/channel-data

a1.sinks.s1.type = hdfs
a1.sinks.s1.hdfs.fileType = DataStream
a1.sinks.s1.hdfs.writeFormat = Text
a1.sinks.s1.hdfs.path = hdfs://single:9000/external_ebs/transaction
a1.sinks.s1.hdfs.filePrefix = event-
a1.sinks.s1.hdfs.fileSuffix = .json
a1.sinks.s1.hdfs.rollInterval = 180
a1.sinks.s1.hdfs.rollSize = 134217728
a1.sinks.s1.hdfs.rollCount = 0

a1.sinks.s1.channel = c1
a1.sources.r1.channels = c1


# hive sink
a1.sources = r1
a1.channels = c1
a1.sinks = s1

a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /root/project_events/tail_dir_log/tail_pos.log
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /root/project_events/store/.*.csv
a1.sources.r1.fileHeader = true
a1.sources.r1.headers.f1.headerKey1 = store

a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /root/project_events/checkpoint
a1.channels.c1.dataDirs = /root/project_events/file_channel_data

a1.sinks.s1.type = hive
a1.sinks.s1.hive.metastore = thrift://master01:9083
a1.sinks.s1.hive.database = ods_eb
a1.sinks.s1.hive.table = ods_store_review
#a1.sinks.s1.hive.partition = 2018
a1.sinks.s1.useLocalTimeStamp = false
a1.sinks.s1.round = true
a1.sinks.s1.roundValue = 10
a1.sinks.s1.roundUnit = minute
a1.sinks.s1.serializer = DELIMITED
a1.sinks.s1.serializer.delimiter = ","
a1.sinks.s1.serializer.fieldnames = transaction_id,store_id,review_score

a1.sinks.s1.channel = c1
a1.sources.r1.channels = c1
flume-ng
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
flume-ng agent --name a1 --conf /opt/software/apache-flume-1.9.0-bin/conf/ --conf-file flume_conf/flume_tail_file_hive_tran.conf -Dflume.root.logger=INFO,console

flume-ng agent -n a1 -c /opt/software/apache-flume-1.9.0-bin/conf/ -f flume_file_hbase.conf -Dflume.root.logger=INFO,console

# 交易行为日志采集
/opt/software/flume-1.9.0/bin/flume-ng agent \
--name a1 \
--conf /opt/software/flume-1.9.0/conf \
--conf-file /root/project_ebs/act_log_extract/flume_config/conf-file/transaction.cnf \
-Dflume.root.logger=INFO,console

/opt/software/flume-1.9.0/bin/flume-ng agent \
--name a1 \
--conf /opt/software/flume-1.9.0/conf \
--conf-file /root/project_ebs/act_log_extract/flume_config/conf-file/review.cnf \
-Dflume.root.logger=INFO,console

/opt/software/flume-1.9.0/bin/flume-ng agent \
--name a1 \
--conf /opt/software/flume-1.9.0/conf \
--conf-file /root/project_ebs/act_log_extract/flume_config/conf-file/transaction_hive.cnf \
-Dflume.root.logger=INFO,console
[hive sink 异常]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<!--追加 hive-site.xml 配置-->
<property>
<name>hive.txn.manager</name>
<value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
</property>
<property>
<name>hive.compactor.initiator.on</name>
<value>true</value>
</property>
<property>
<name>hive.compactor.worker.threads</name>
<value>1</value>
</property>
<property>
<name>hive.support.concurrency</name>
<value>true</value>
</property>
<property>
<name>hive.enforce.bucketing</name>
<value>true</value>
</property>
<property>
<name>hive.in.test</name>
<value>false</value>
</property>
hive sink template
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#### tail_file_hive_store_review
a1.sources = r1
a1.channels = c1
a1.sinks = s1

a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /root/project_events/tail_dir_log/tail_pos.log
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /root/project_events/store/.*.csv
a1.sources.r1.fileHeader = true
a1.sources.r1.headers.f1.headerKey1 = store

a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /root/project_events/checkpoint
a1.channels.c1.dataDirs = /root/project_events/file_channel_data
a1.channels.c1.transactionCapacity = 10000

a1.sinks.s1.type = hive
a1.sinks.s1.hive.metastore = thrift://192.168.83.130:9083
a1.sinks.s1.hive.database = ods_eb
a1.sinks.s1.hive.table = ods_store_review
a1.sinks.s1.useLocalTimeStamp = false
a1.sinks.s1.batchSize = 10000
a1.sinks.s1.round = true
a1.sinks.s1.roundValue = 10
a1.sinks.s1.roundUnit = minute
a1.sinks.s1.serializer = DELIMITED
a1.sinks.s1.serializer.delimiter = ","
a1.sinks.s1.serializer.fieldnames = transaction_id,store_id,review_score

a1.sinks.s1.channel = c1
a1.sources.r1.channels = c1

#### tail_file_hive_par_transaction
a1.sources = r1
a1.channels = c1
a1.sinks = s1

a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /root/project_events/tail_dir_log/tail_pos.log
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /root/project_events/transaction/.*.csv
a1.sources.r1.fileHeader = true
a1.sources.r1.headers.f1.headerKey1 = tran

a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /root/project_events/checkpoint
a1.channels.c1.dataDirs = /root/project_events/file_channel_data
a1.channels.c1.transactionCapacity = 10000

a1.sinks.s1.type = hive
a1.sinks.s1.hive.metastore = thrift://192.168.83.130:9083
a1.sinks.s1.hive.database = ods_eb
a1.sinks.s1.hive.table = ods_transaction
a1.sinks.s1.hive.partition = 2018
a1.sinks.s1.useLocalTimeStamp = false
a1.sinks.s1.batchSize = 10000
a1.sinks.s1.round = true
a1.sinks.s1.roundValue = 10
a1.sinks.s1.roundUnit = minute
a1.sinks.s1.serializer = DELIMITED
a1.sinks.s1.serializer.delimiter = "^A"
a1.sinks.s1.serializer.fieldnames = line

a1.sinks.s1.channel = c1
a1.sources.r1.channels = c1
hbase sink template
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /root/project_events/tail_dir_log/tail_pos.log
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /root/flume_file_hbase/student_info.txt
a1.sources.r1.fileHeader = true

a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /root/project_events/checkpoint
a1.channels.c1.dataDirs = /root/project_events/file_channel_data

a1.sinks.k1.type = hbase2
a1.sinks.k1.table = schooldb:student
a1.sinks.k1.columnFamily = base
a1.sinks.k1.serializer.regex = (.*?),(.*?),(.*?),(.*?),(.*?),(.*?)
a1.sinks.k1.serializer = org.apache.flume.sink.hbase2.RegexHBase2EventSerializer
a1.sinks.k1.serializer.colNames = ROW_KEY,stu_name,stu_pid,stu_gender,add_province,add_city
a1.sinks.k1.serializer.rowKeyIndex = 0
a1.sinks.k1.batchSize = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
kafka source hive sink
序号 参数 描述
1 type org.apache.flume.source.kafka.KafkaSource *
2 kafka.bootstrap.servers kafka服务器地址 *
3 kafka.topics 多个标题之间用逗号 *
4 kafka.consumer.group.id 消费者分组ID,组内用户共享Partition
默认空白字符串
5 kafka.consumer.auto.offset.reset 读取偏移量,默认latest
可选smallest,largest, none(无Offset抛异常)
6 batchSize 批量读取,默认 100条
过大会导致内存积压,性能下降
7 batchDurationMillis 检查消息更新的时间间隔,默认 1000ms
过于频繁检查会增加网络和CPU负载,影响性能
8 kafka.consumer.auto.commit.enable 是否自动提交 offset,默认 true
若为false,需通过Channel Processor手动提交
9 kafka.consumer.max.poll.records 一次最多从Kafka中读取的记录数,默认 500
10 kafka.key.deserializer 默认 org.apache.kafka.common
.serialization.StringDeserializer
11 kafka.value.deserializer 默认 org.apache.kafka.common
.serialization.ByteArrayDeserializer
12 parseAsFlumeEvent 是否解析成Flume时间,即封装为
KafkaEvent对象,默认 false
13 selector.type 事件选择器类型,默认 replicating 将事件
复制到所有连接的 Channel; multiplexing
将事件发送到通过拦截器链指定的单个 Channel
14 selector.optional 当selector.type 为 multiplexing 时,
是否允许 Channel 缺失,默认为 false
15 maxConcurrentPartitions 最大并发分区数,默认值为 1
多分区读取并发度,较高值可以提高吞吐量
但不能超过实际分区数
16 pollTimeout 从 Kafka 中读取消息的轮询超时时间
默认 5000 ,即5秒中
17 consumer.timeout.ms 客户端等待 Broker 返回消息响应超时时间
默认 120000 ,即2分钟
18 kafka.topic.whitelist 用于白名单过滤,指定需要被消费的topic列表
19 kafka.topic.blacklist 用于黑名单过滤,指定不需要被消费的topic列表
20 topicHeader 将消息主题添加到 Flume 事件的头中
21 keyHeader 将消息键添加到 Flume 事件的头中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# Kafka面试题
# 同一个分组内:消费者的数量超过分区的数量会发生什么?

a1.sources = r1
a1.channels = c1
a1.sinks = s1

#### kafka source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /root/project_events/tail_dir_log/tail_pos.log
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /root/project_events/store/.*.csv
a1.sources.r1.fileHeader = true
a1.sources.r1.headers.f1.headerKey1 = store

#### file channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /root/project_events/checkpoint
a1.channels.c1.dataDirs = /root/project_events/file_channel_data
a1.channels.c1.transactionCapacity = 10000

#### hive sink partitioned
a1.sinks.s1.type = hive
a1.sinks.s1.hive.metastore = thrift://192.168.83.130:9083
a1.sinks.s1.hive.database = ods_eb
a1.sinks.s1.hive.table = ods_transaction
a1.sinks.s1.hive.partition = 2018
a1.sinks.s1.useLocalTimeStamp = false
a1.sinks.s1.batchSize = 10000
a1.sinks.s1.round = true
a1.sinks.s1.roundValue = 10
a1.sinks.s1.roundUnit = minute
a1.sinks.s1.serializer = DELIMITED
a1.sinks.s1.serializer.delimiter = "^A"
a1.sinks.s1.serializer.fieldnames = line

a1.sinks.s1.channel = c1
a1.sources.r1.channels = c1

flume
https://leaf-domain.gitee.io/2025/03/22/bigdata/others/flume_cha02/
作者
叶域
发布于
2025年3月22日
许可协议