使用SSL加密传输(单向认证)
创建证书存放目录
# cd /usr/local/kafka && mkdir jks
创建服务私钥及证书
# keytool -keystore server.keystore.jks -alias localhost -validity 3650 -genkey -keyalg RSA
创建CA私钥、证书
# openssl req -new -x509 -keyout ca-key -out ca-cert -days 3650
导入CA证书到客户端、服务器端TrustStore信任密钥库,TrustStore仅仅用来包含客户端信任的证书
# keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
# keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
根据server的jks私钥证书文件生成证书签署请求
# keytool -keystore server.keystore.jks -alias localhost -certreq -file server.csr
使用ca私钥证书签发证书
# openssl x509 -req -CA ca-cert -CAkey ca-key -in server.csr -out server.crt -days 3650 -CAcreateserial -passin pass:ca-cert
上面是导入CA证书到KeyStore,其中包括
# keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
导入ca签发后的server证书到server端KeyStore,与上面导入TrustStore不同,这里会导入更多的信息,包括公钥、数字证书、数字签名
# keytool -keystore server.keystore.jks -alias CARoot -import -file server.crt
这里导入了两次,我还不清楚为什么?
# keytool -keystore server.keystore.jks -alias localhost -import -file server.crt
配置文件 server.properties
# egrep -v "#|^$" config/server.properties
broker.id=0
port=19092
num.network.threads=2
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=504857600
log.dirs=/usr/local/kafka/logs
num.partitions=2
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=60000
log.cleaner.enable=true
zookeeper.connect=192.168.1.150:12181
zookeeper.connection.timeout.ms=1000000
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
listeners=PLAINTEXT://172.17.0.19:19092,SSL://172.17.0.19:19093
advertised.listeners=PLAINTEXT://172.17.0.19:19092,SSL://172.17.0.19:19093
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL
ssl.keystore.location=/usr/local/kafka/jks/server.keystore.jks
ssl.keystore.password=server
ssl.key.password=server
ssl.truststore.location=/usr/local/kafka/jks/server.truststore.jks
ssl.truststore.password=server
ssl.protocol=TLS
重启服务后检查ZK上注册的元数据是否正确
# ./bin/zkCli.sh
Connecting to localhost:2181
...
[zk: localhost:2181(CONNECTED) 4] ls /brokers/ids
[0]
[zk: localhost:2181(CONNECTED) 11] get /brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT","SSL":"SSL"},"endpoints":["PLAINTEXT://172.17.0.19:19092","SSL://172.17.0.19:19093"],"jmx_port":-1,"host":"172.17.0.19","timestamp":"1542619599648","port":19092,"version":4}
cZxid = 0x1c2
ctime = Mon Nov 19 17:26:39 CST 2018
mZxid = 0x1c2
mtime = Mon Nov 19 17:26:39 CST 2018
pZxid = 0x1c2
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x167166808960015
dataLength = 232
numChildren = 0
Python通过SSL端口写入数据
# cat producer_ssl.py
import sys
import time
from uuid import uuid4
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='172.17.0.19:19093', security_protocol='SSL', ssl_cafile='/tmp/ca-cert', ssl_check_hostname=False)
topic_name = 't-topic'
i = 1
while i < 4:
msg = 'Message: ' + ''.join(str(uuid4()).split('-'))[:5]
res = producer.send(topic_name, msg.encode())
print('Send Message to {} topic, Message value is {} Result: {}'.format(topic_name, msg, res))
i += 1
time.sleep(3)
Python通过SSL读取数据
# cat consumer_ssl.py
import sys
from kafka import KafkaConsumer
topic_name = 't-topic'
group_name = sys.argv[1]
consumer = KafkaConsumer(topic_name, bootstrap_servers=' 172.17.0.19:19093', group_id=group_name, security_protocol='SSL', ssl_cafile='/tmp/ca-cert', ssl_check_hostname=False, auto_offset_reset='earliest')
for msg in consumer:
info = 'Topic: {} Partition: {} Offset: {} Message: {}'.format(msg.topic, msg.partition, msg.offset, msg.value)
print(info)