传输加密(双向认证)
基于单向认证的基础上,在加上验证客户端身份,这就是双向认证,在请求时客户端需要带上服务器端信任CA签发的客户端证书,以及客户端私钥才能完成SSL请求。
为client生成证书
# keytool -genkey -keystore client.keystore.jks -alias localhost -validity 3650 -keyalg RSA
导出客户端证书请求
# keytool -keystore client.keystore.jks -alias localhost -certreq -file client.csr
使用CA证书私钥签署客户端证书请求
# openssl x509 -req -CA ca-cert -CAkey ca-key -in client.csr -out client.crt -days 3650 -CAcreateserial -passin pass:ca-cert
导入CA证书中信息到client端KeyStore
# keytool -keystore client.keystore.jks -alias CARoot -import -file ca-cert
导入CA签发client证书中信息到client端KeyStore
# keytool -keystore client.keystore.jks -alias localhost -import -file client.crt
到此,证书相关初步完成了,不过如果客户端不仅仅有Java,还有Python等,而Python在使用SSL双向认证时,需要CA证书、客户端证书、客户端私钥,所以此时证书还需要在处理下,需要从client.keystore.jks导出私钥文件才能完成正常使用
转换客户端keystore JKS证书文件格式为P12
# keytool -importkeystore -srckeystore client.keystore.jks -destkeystore client.keystore.p12 -deststoretype PKCS12 -srcalias localhost -deststorepass client -destkeypass client
从客户端P12证书文件中导出私钥文件
# openssl pkcs12 -in client.keystore.p12 -nodes -nocerts -out client_key.pem
修改Kafka服务器端配置,设置ssl.client.auth=required,开启客户端认证
broker.id=0
port=19092
num.network.threads=2
num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=504857600
log.dirs=/usr/local/kafka/logs
num.partitions=2
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=60000
log.cleaner.enable=true
zookeeper.connect=192.168.1.150:12181
zookeeper.connection.timeout.ms=1000000
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
listeners=PLAINTEXT://172.17.0.19:19092,SSL://172.17.0.19:19093
advertised.listeners=PLAINTEXT://172.17.0.19:19092,SSL://172.17.0.19:19093
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL
ssl.keystore.location=/usr/local/kafka/jks/server.keystore.jks
ssl.keystore.password=server
ssl.key.password=server
ssl.truststore.location=/usr/local/kafka/jks/server.truststore.jks
ssl.truststore.password=server
ssl.protocol=TLS
ssl.client.auth=required
此时Python尝试调用Producer脚本,此时请求时没有带上客户端证书
# cat producer_ssl.py
import sys
import time
from uuid import uuid4
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='172.17.0.19:19093', security_protocol='SSL', ssl_cafile='/tmp/ca-cert', ssl_check_hostname=False)
topic_name = 't-topic'
i = 1
while i < 4:
msg = 'Message: ' + ''.join(str(uuid4()).split('-'))[:5]
res = producer.send(topic_name, msg.encode())
print('Send Message to {} topic, Message value is {} Result: {}'.format(topic_name, msg, res))
i += 1
time.sleep(3)
运行时抛出的异常信息
# python35 producer_ssl.py
Traceback (most recent call last):
File "producer_ssl.py", line 5, in <module>
producer = KafkaProducer(bootstrap_servers='172.17.0.19:19093', security_protocol='SSL', ssl_cafile='/tmp/ca-cert', ssl_check_hostname=False)
File "/usr/local/python35/lib/python3.5/site-packages/kafka/producer/kafka.py", line 362, in __init__
**self.config)
File "/usr/local/python35/lib/python3.5/site-packages/kafka/client_async.py", line 214, in __init__
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
File "/usr/local/python35/lib/python3.5/site-packages/kafka/client_async.py", line 245, in _bootstrap
if not bootstrap.connect_blocking():
File "/usr/local/python35/lib/python3.5/site-packages/kafka/conn.py", line 301, in connect_blocking
self.connect()
File "/usr/local/python35/lib/python3.5/site-packages/kafka/conn.py", line 392, in connect
if self._try_handshake():
File "/usr/local/python35/lib/python3.5/site-packages/kafka/conn.py", line 455, in _try_handshake
self._sock.do_handshake()
File "/usr/local/python35/lib/python3.5/ssl.py", line 983, in do_handshake
self._sslobj.do_handshake()
File "/usr/local/python35/lib/python3.5/ssl.py", line 628, in do_handshake
self._sslobj.do_handshake()
ssl.SSLError: [SSL: UNEXPECTED_MESSAGE] unexpected message (_ssl.c:645)
此时修改Producer脚本,请求时带上客户端证书,再次尝试
# kafka cat producer_ssl_client_auth.py
import sys
import time
from uuid import uuid4
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='172.17.0.19:19093', security_protocol='SSL', ssl_cafile='/tmp/ca-cert', ssl_check_hostname=False, ssl_certfile='/tmp/client.crt', ssl_keyfile='/tmp/client_key.pem')
topic_name = 't-topic'
i = 1
while i < 4:
msg = 'Message: ' + ''.join(str(uuid4()).split('-'))[:5]
res = producer.send(topic_name, msg.encode())
print('Send Message to {} topic, Message value is {} Result: {}'.format(topic_name, msg, res))
i += 1
time.sleep(3)
Producer 运行效果
# python35 producer_ssl_client_auth.py
Send Message to t-topic topic, Message value is Message: 3c0fe Result: <kafka.producer.future.FutureRecordMetadata object at 0x7fd51e537630>
Send Message to t-topic topic, Message value is Message: 9f416 Result: <kafka.producer.future.FutureRecordMetadata object at 0x7fd51e543080>
Send Message to t-topic topic, Message value is Message: 6e48b Result: <kafka.producer.future.FutureRecordMetadata object at 0x7fd51e5431d0>
修改Consumer脚本,请求时带上客户端证书
# cat consumer_ssl_client_auth.py
import sys
from kafka import KafkaConsumer
topic_name = 't-topic'
group_name = sys.argv[1]
consumer = KafkaConsumer(topic_name, bootstrap_servers=' 172.17.0.19:19093', group_id=group_name, security_protocol='SSL', ssl_cafile='/tmp/ca-cert', ssl_check_hostname=False, auto_offset_reset='earliest', ssl_certfile='/tmp/client.crt', ssl_keyfile='/tmp/client_key.pem')
for msg in consumer:
info = 'Topic: {} Partition: {} Offset: {} Message: {}'.format(msg.topic, msg.partition, msg.offset, msg.value)
print(info)
运行效果
# python35 consumer_ssl_client_auth.py t1-topic
Topic: t-topic Partition: 0 Offset: 22 Message: b'aaa111aaa'
Topic: t-topic Partition: 0 Offset: 23 Message: b'Message: 3c0fe'
Topic: t-topic Partition: 0 Offset: 24 Message: b'Message: 9f416'
Topic: t-topic Partition: 0 Offset: 25 Message: b'Message: 6e48b'