消费组和偏移量的小实验

实验目的

  • 验证通过不同消费组是否可以获取所有消息,即每个消费组都可以获取到完整的消息数据
  • 手动消息确认学习尝试
  • 分区偏移量加深理解

实验思路

  1. 创建测试Topic为t-topic
  2. 为特定Topic创建新的Consumer Group,t1-group,t2-group
  3. 运行Producer脚本生产几条数据到特定t-topic
  4. 此时查看记录t-topic的CURRENT OFFSET、LOG-END-OFFSET、LAG
    • CURRENT OFFSET: 组当前消费分区偏移量
    • LOG-END-OFFSET:分区最新偏移量,它和CURRENT OFFSET的计数规则一样,都是最新消息OFFSET + 1,例如最先生成出的消息OFFSET为3,此时最新的OFFSET会为4,具体会在后面实验中体现出来
    • LAG:我理解的是为延迟、未读取消息数
  5. 创建消费者对象,设置group_id,关闭自动提交,启动消费者系消费数据
  6. 查看偏移量是否变化,理论上是未变化
  7. 手动提交消息确认,然后检查偏移量是否变化,此时偏移量应该会变化

开始实验

1. 创建测试topic

# ./bin/kafka-topics.sh --create --zookeeper 192.168.1.150:12181 --replication-factor 1 --partitions 1 --topic t-topic
Created topic "t-topic".

2. 创建t1、t2 消费组

# ./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:19092 --describe --group t1-group
# ./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:19092 --describe --group t2-group
# ./bin/kafka-consumer-groups.sh  --bootstrap-server 127.0.0.1:19092 --list              
test-group
t2-group
t1-group
test-consumer-group

3. 生成几条数据

In [1]: import sys

In [2]: import time

In [3]: from uuid import uuid4

In [4]: from uuid import uuid4

In [5]: from kafka import KafkaProducer

In [6]: producer = KafkaProducer(bootstrap_servers='192.168.1.150:19092')

In [7]: topic_name = 't-topic'

In [8]: msg = 'Message: ' + ''.join(str(uuid4()).split('-'))[:5]

In [9]: msg
Out[9]: 'Message: 2679c'

In [10]: res = producer.send(topic_name, msg.encode())

In [11]: res.value
Out[11]: RecordMetadata(topic='t-topic', partition=0, topic_partition=TopicPartition(topic='t-topic', partition=0), offset=2, timestamp=1542594659750, checksum=None, serialized_key_size=-1, serialized_value_size=14)

In [12]: print('Send Message to {} topic, Message value is {} Partition is {} offset {}'.format(topic_name, msg, res.value.partition, res.value.offset))
Send Message to t-topic topic, Message value is Message: 2679c Partition is 0 offset 2

In [13]: msg = 'Message: ' + ''.join(str(uuid4()).split('-'))[:5]

In [14]: res = producer.send(topic_name, msg.encode())

In [15]: print('Send Message to {} topic, Message value is {} Partition is {} offset {}'.format(topic_name, msg, res.value.partition, res.value.offset))
Send Message to t-topic topic, Message value is Message: 9168a Partition is 0 offset 3

...

4. 此时查看记录t-topic

可以看到当前偏移量是0,而最新的偏移量为4,LAG延迟为4,两个消费组都处于未接受消息数据的状态

# ./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:19092 --describe --group t1-group
Consumer group 't1-group' has no active members.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
t-topic         0          0               4               4               -               -               -


# ./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:19092 --describe --group t2-group
Consumer group 't2-group' has no active members.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
t-topic         0          0               4               4               -               -               -

5. 初始化消费者,消费数据

# 导入相关需要的库
In [1]: from kafka import KafkaConsumer
In [2]: from kafka import TopicPartition

# 初始化消费者对象,设置组ID,关闭自动提交
In [3]: consumer = KafkaConsumer('t-topic', bootstrap_servers='192.168.1.150:19092', group_id='t1-group', enable_auto_commit=False, auto_offset_reset='earliest')

# 通过Python查看t-topic有多少个分区
In [4]: consumer.partitions_for_topic('t-topic')
Out[4]: {0}

# 查看分区当前的偏移量
In [5]: topic_partition = TopicPartition('t-topic', 0)

In [6]: consumer.committed(topic_partition)
Out[6]: 0

# 获取t-topic中消息数据
In [7]: for msg in consumer:
   ...:     info = 'Topic: {} Partition: {} Offset: {} Message: {}'.format(msg.topic, msg.partition, msg.offset, msg.value)
   ...:     print(info)
Topic: t-topic Partition: 0 Offset: 0 Message: b'Message: 006ba'
Topic: t-topic Partition: 0 Offset: 1 Message: b'Message: dedf7'
Topic: t-topic Partition: 0 Offset: 2 Message: b'Message: 2679c'
Topic: t-topic Partition: 0 Offset: 3 Message: b'Message: 9168a'

# CTRL + C 停止接收消息

In [8]: topic_partition = TopicPartition('t-topic', 0)

# 再次查看分区偏移量仍旧没有变化,因为还未提交上去
In [9]: topic_partition
Out[9]: TopicPartition(topic='t-topic', partition=0)

在服务器端通过命令查看,结果也是一样的

# ./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:19092 --describe --group t1-group

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                             HOST            CLIENT-ID
t-topic         0          0               4               4               kafka-python-1.4.3-9d55a10d-b5eb-409d-b19b-4c845f342b3c /172.17.0.1     kafka-python-1.4.3

6. 此时将读取到的消息进行确认

In [10]: consumer.commit_async()


# 再次查看t1-group校服组,分区偏移量
In [11]: consumer.committed(topic_partition)
Out[11]: 4

# 查看t2-group消费组分区偏移量
In [12]: consumer = KafkaConsumer('t-topic', bootstrap_servers='192.168.1.150:19092', group_id='t2-group', enable_auto_commit=False, auto_offset_rese
   ...: t='earliest')

In [13]: consumer.committed(topic_partition)
Out[13]: 0

8. 最后确认两个分组的消费情况

此时通过命令查看偏移量进行确认,可以看到此时t1分组已经消费了所有的消息,而t2一条消息都没有消费

# ./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:19092 --describe --group t1-group
Consumer group 't1-group' has no active members.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
t-topic         0          4               4               0               -               -               -

# ./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:19092 --describe --group t2-group
Consumer group 't2-group' has no active members.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
t-topic         0          0               4               4               -               -               -

results matching ""

    No results matching ""