消费组和偏移量的小实验
实验目的
- 验证通过不同消费组是否可以获取所有消息,即每个消费组都可以获取到完整的消息数据
- 手动消息确认学习尝试
- 分区偏移量加深理解
实验思路
- 创建测试Topic为
t-topic
- 为特定Topic创建新的Consumer Group,t1-group,t2-group
- 运行Producer脚本生产几条数据到特定t-topic
- 此时查看记录t-topic的CURRENT OFFSET、LOG-END-OFFSET、LAG
- CURRENT OFFSET: 组当前消费分区偏移量
- LOG-END-OFFSET:分区最新偏移量,它和CURRENT OFFSET的计数规则一样,都是最新消息OFFSET + 1,例如最先生成出的消息OFFSET为3,此时最新的OFFSET会为4,具体会在后面实验中体现出来
- LAG:我理解的是为延迟、未读取消息数
- 创建消费者对象,设置group_id,关闭自动提交,启动消费者系消费数据
- 查看偏移量是否变化,理论上是未变化
- 手动提交消息确认,然后检查偏移量是否变化,此时偏移量应该会变化
开始实验
1. 创建测试topic
# ./bin/kafka-topics.sh --create --zookeeper 192.168.1.150:12181 --replication-factor 1 --partitions 1 --topic t-topic
Created topic "t-topic".
2. 创建t1、t2 消费组
# ./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:19092 --describe --group t1-group
# ./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:19092 --describe --group t2-group
# ./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:19092 --list
test-group
t2-group
t1-group
test-consumer-group
3. 生成几条数据
In [1]: import sys
In [2]: import time
In [3]: from uuid import uuid4
In [4]: from uuid import uuid4
In [5]: from kafka import KafkaProducer
In [6]: producer = KafkaProducer(bootstrap_servers='192.168.1.150:19092')
In [7]: topic_name = 't-topic'
In [8]: msg = 'Message: ' + ''.join(str(uuid4()).split('-'))[:5]
In [9]: msg
Out[9]: 'Message: 2679c'
In [10]: res = producer.send(topic_name, msg.encode())
In [11]: res.value
Out[11]: RecordMetadata(topic='t-topic', partition=0, topic_partition=TopicPartition(topic='t-topic', partition=0), offset=2, timestamp=1542594659750, checksum=None, serialized_key_size=-1, serialized_value_size=14)
In [12]: print('Send Message to {} topic, Message value is {} Partition is {} offset {}'.format(topic_name, msg, res.value.partition, res.value.offset))
Send Message to t-topic topic, Message value is Message: 2679c Partition is 0 offset 2
In [13]: msg = 'Message: ' + ''.join(str(uuid4()).split('-'))[:5]
In [14]: res = producer.send(topic_name, msg.encode())
In [15]: print('Send Message to {} topic, Message value is {} Partition is {} offset {}'.format(topic_name, msg, res.value.partition, res.value.offset))
Send Message to t-topic topic, Message value is Message: 9168a Partition is 0 offset 3
...
4. 此时查看记录t-topic
可以看到当前偏移量是0,而最新的偏移量为4,LAG延迟为4,两个消费组都处于未接受消息数据的状态
# ./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:19092 --describe --group t1-group
Consumer group 't1-group' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
t-topic 0 0 4 4 - - -
# ./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:19092 --describe --group t2-group
Consumer group 't2-group' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
t-topic 0 0 4 4 - - -
5. 初始化消费者,消费数据
# 导入相关需要的库
In [1]: from kafka import KafkaConsumer
In [2]: from kafka import TopicPartition
# 初始化消费者对象,设置组ID,关闭自动提交
In [3]: consumer = KafkaConsumer('t-topic', bootstrap_servers='192.168.1.150:19092', group_id='t1-group', enable_auto_commit=False, auto_offset_reset='earliest')
# 通过Python查看t-topic有多少个分区
In [4]: consumer.partitions_for_topic('t-topic')
Out[4]: {0}
# 查看分区当前的偏移量
In [5]: topic_partition = TopicPartition('t-topic', 0)
In [6]: consumer.committed(topic_partition)
Out[6]: 0
# 获取t-topic中消息数据
In [7]: for msg in consumer:
...: info = 'Topic: {} Partition: {} Offset: {} Message: {}'.format(msg.topic, msg.partition, msg.offset, msg.value)
...: print(info)
Topic: t-topic Partition: 0 Offset: 0 Message: b'Message: 006ba'
Topic: t-topic Partition: 0 Offset: 1 Message: b'Message: dedf7'
Topic: t-topic Partition: 0 Offset: 2 Message: b'Message: 2679c'
Topic: t-topic Partition: 0 Offset: 3 Message: b'Message: 9168a'
# CTRL + C 停止接收消息
In [8]: topic_partition = TopicPartition('t-topic', 0)
# 再次查看分区偏移量仍旧没有变化,因为还未提交上去
In [9]: topic_partition
Out[9]: TopicPartition(topic='t-topic', partition=0)
在服务器端通过命令查看,结果也是一样的
# ./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:19092 --describe --group t1-group
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
t-topic 0 0 4 4 kafka-python-1.4.3-9d55a10d-b5eb-409d-b19b-4c845f342b3c /172.17.0.1 kafka-python-1.4.3
6. 此时将读取到的消息进行确认
In [10]: consumer.commit_async()
# 再次查看t1-group校服组,分区偏移量
In [11]: consumer.committed(topic_partition)
Out[11]: 4
# 查看t2-group消费组分区偏移量
In [12]: consumer = KafkaConsumer('t-topic', bootstrap_servers='192.168.1.150:19092', group_id='t2-group', enable_auto_commit=False, auto_offset_rese
...: t='earliest')
In [13]: consumer.committed(topic_partition)
Out[13]: 0
8. 最后确认两个分组的消费情况
此时通过命令查看偏移量进行确认,可以看到此时t1分组已经消费了所有的消息,而t2一条消息都没有消费
# ./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:19092 --describe --group t1-group
Consumer group 't1-group' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
t-topic 0 4 4 0 - - -
# ./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:19092 --describe --group t2-group
Consumer group 't2-group' has no active members.
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
t-topic 0 0 4 4 - - -