快速开始

https://kafka.apache.org/quickstart

有必要特殊说明一下的是Kafka的版本号,例如笔记记录时用的版本是kafka_2.11-2.0.1.tgz https://legacy.gitbook.com/book/lotusching/ops/edit#

  • 2.11: Scala 语言版本
  • 2.0.1: Kafka 软件版本

环境依赖

  • JDK
  • Zookeeper

Kafka 安装启动

# wget https://www.apache.org/dyn/closer.cgi?path=/kafka/2.1.0/kafka_2.11-2.1.0.tgz
# tar -xzf kafka_2.11-2.1.0.tgz
# cd kafka_2.11-2.1.0
# ./bin/kafka-server-start.sh config/server.properties

Kafka 基础配置

这是最基础的一些配置参数,详细的Broker配置参数,可以查看这里,https://kafka.apache.org/documentation/#brokerconfigs

  • broker.id: 节点ID标识
  • port: Kafka 监听端口
  • advertised.host.name: 注册到ZK上的主机名
  • advertised.port: 注册上ZK上的端口
  • log.dirs: Kafka日志目录
  • num.partitions: 默认topic分区数
  • log.retention.hours: 日志保留时间
  • zookeeper.connect: zk地址,逗号分割
  • zookeeper.connection.timeout.ms: zk超时配置

快速上手

创建Topic

# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

查看已创建的topic列表

# ./bin/kafka-topics.sh --list --zookeeper localhost:2181

生产数据

通过Python模拟生产数据

➜  kafka git:(master) ✗ cat /prodata/scripts/kafka/producer_1.py 
import sys
import time
from uuid import uuid4
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='192.168.1.36:19001')
topic_name = 't-topic'
i = 1
while i < 4:
    msg = 'Message: ' + ''.join(str(uuid4()).split('-'))[:5]
    res = producer.send(topic_name, msg.encode())
    print('Send Message to {} topic, Message value is {} Result: {}'.format(topic_name, msg, res))
    i += 1
    time.sleep(3)

消费数据

➜  kafka git:(master) ✗ cat /prodata/scripts/kafka/consumer_1.py 
import sys
from kafka import KafkaConsumer
topic_name = 'da'
group_name = sys.argv[1]
consumer = KafkaConsumer(topic_name, bootstrap_servers='192.168.1.150:19092', group_id=group_name)
for msg in consumer:
    info = 'Topic: {} Partition: {} Offset: {} Message: {}'.format(msg.topic, msg.partition, msg.offset, msg.value)
    print(info)

results matching ""

    No results matching ""