快速开始
https://kafka.apache.org/quickstart
有必要特殊说明一下的是Kafka的版本号,例如笔记记录时用的版本是kafka_2.11-2.0.1.tgz
https://legacy.gitbook.com/book/lotusching/ops/edit#
- 2.11: Scala 语言版本
- 2.0.1: Kafka 软件版本
环境依赖
- JDK
- Zookeeper
Kafka 安装启动
# wget https://www.apache.org/dyn/closer.cgi?path=/kafka/2.1.0/kafka_2.11-2.1.0.tgz
# tar -xzf kafka_2.11-2.1.0.tgz
# cd kafka_2.11-2.1.0
# ./bin/kafka-server-start.sh config/server.properties
Kafka 基础配置
这是最基础的一些配置参数,详细的Broker配置参数,可以查看这里,https://kafka.apache.org/documentation/#brokerconfigs
- broker.id: 节点ID标识
- port: Kafka 监听端口
- advertised.host.name: 注册到ZK上的主机名
- advertised.port: 注册上ZK上的端口
- log.dirs: Kafka日志目录
- num.partitions: 默认topic分区数
- log.retention.hours: 日志保留时间
- zookeeper.connect: zk地址,逗号分割
- zookeeper.connection.timeout.ms: zk超时配置
快速上手
创建Topic
# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看已创建的topic列表
# ./bin/kafka-topics.sh --list --zookeeper localhost:2181
生产数据
通过Python模拟生产数据
➜ kafka git:(master) ✗ cat /prodata/scripts/kafka/producer_1.py
import sys
import time
from uuid import uuid4
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='192.168.1.36:19001')
topic_name = 't-topic'
i = 1
while i < 4:
msg = 'Message: ' + ''.join(str(uuid4()).split('-'))[:5]
res = producer.send(topic_name, msg.encode())
print('Send Message to {} topic, Message value is {} Result: {}'.format(topic_name, msg, res))
i += 1
time.sleep(3)
消费数据
➜ kafka git:(master) ✗ cat /prodata/scripts/kafka/consumer_1.py
import sys
from kafka import KafkaConsumer
topic_name = 'da'
group_name = sys.argv[1]
consumer = KafkaConsumer(topic_name, bootstrap_servers='192.168.1.150:19092', group_id=group_name)
for msg in consumer:
info = 'Topic: {} Partition: {} Offset: {} Message: {}'.format(msg.topic, msg.partition, msg.offset, msg.value)
print(info)