传输加密(双向认证)

基于单向认证的基础上,在加上验证客户端身份,这就是双向认证,在请求时客户端需要带上服务器端信任CA签发的客户端证书,以及客户端私钥才能完成SSL请求。

为client生成证书

# keytool -genkey -keystore client.keystore.jks -alias localhost -validity 3650 -keyalg RSA

导出客户端证书请求

# keytool -keystore client.keystore.jks -alias localhost -certreq -file client.csr

使用CA证书私钥签署客户端证书请求

# openssl x509 -req -CA ca-cert -CAkey ca-key -in client.csr -out client.crt -days 3650 -CAcreateserial -passin pass:ca-cert

导入CA证书中信息到client端KeyStore

# keytool -keystore client.keystore.jks -alias CARoot -import -file ca-cert

导入CA签发client证书中信息到client端KeyStore

# keytool -keystore client.keystore.jks -alias localhost -import -file client.crt

到此,证书相关初步完成了,不过如果客户端不仅仅有Java,还有Python等,而Python在使用SSL双向认证时,需要CA证书、客户端证书、客户端私钥,所以此时证书还需要在处理下,需要从client.keystore.jks导出私钥文件才能完成正常使用

转换客户端keystore JKS证书文件格式为P12

# keytool -importkeystore -srckeystore client.keystore.jks -destkeystore client.keystore.p12 -deststoretype PKCS12 -srcalias localhost -deststorepass client -destkeypass client

从客户端P12证书文件中导出私钥文件

# openssl pkcs12 -in client.keystore.p12  -nodes -nocerts -out client_key.pem

修改Kafka服务器端配置,设置ssl.client.auth=required,开启客户端认证

broker.id=0
port=19092
num.network.threads=2

num.io.threads=8
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=504857600
log.dirs=/usr/local/kafka/logs
num.partitions=2
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=60000
log.cleaner.enable=true
zookeeper.connect=192.168.1.150:12181
zookeeper.connection.timeout.ms=1000000
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
listeners=PLAINTEXT://172.17.0.19:19092,SSL://172.17.0.19:19093
advertised.listeners=PLAINTEXT://172.17.0.19:19092,SSL://172.17.0.19:19093
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL
ssl.keystore.location=/usr/local/kafka/jks/server.keystore.jks
ssl.keystore.password=server
ssl.key.password=server
ssl.truststore.location=/usr/local/kafka/jks/server.truststore.jks
ssl.truststore.password=server
ssl.protocol=TLS
ssl.client.auth=required

此时Python尝试调用Producer脚本,此时请求时没有带上客户端证书

# cat producer_ssl.py
import sys
import time
from uuid import uuid4
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='172.17.0.19:19093', security_protocol='SSL', ssl_cafile='/tmp/ca-cert', ssl_check_hostname=False)
topic_name = 't-topic'
i = 1
while i < 4:
    msg = 'Message: ' + ''.join(str(uuid4()).split('-'))[:5]
    res = producer.send(topic_name, msg.encode())
    print('Send Message to {} topic, Message value is {} Result: {}'.format(topic_name, msg, res))
    i += 1
    time.sleep(3)

运行时抛出的异常信息

# python35 producer_ssl.py
Traceback (most recent call last):
  File "producer_ssl.py", line 5, in <module>
    producer = KafkaProducer(bootstrap_servers='172.17.0.19:19093', security_protocol='SSL', ssl_cafile='/tmp/ca-cert', ssl_check_hostname=False)
  File "/usr/local/python35/lib/python3.5/site-packages/kafka/producer/kafka.py", line 362, in __init__
    **self.config)
  File "/usr/local/python35/lib/python3.5/site-packages/kafka/client_async.py", line 214, in __init__
    self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
  File "/usr/local/python35/lib/python3.5/site-packages/kafka/client_async.py", line 245, in _bootstrap
    if not bootstrap.connect_blocking():
  File "/usr/local/python35/lib/python3.5/site-packages/kafka/conn.py", line 301, in connect_blocking
    self.connect()
  File "/usr/local/python35/lib/python3.5/site-packages/kafka/conn.py", line 392, in connect
    if self._try_handshake():
  File "/usr/local/python35/lib/python3.5/site-packages/kafka/conn.py", line 455, in _try_handshake
    self._sock.do_handshake()
  File "/usr/local/python35/lib/python3.5/ssl.py", line 983, in do_handshake
    self._sslobj.do_handshake()
  File "/usr/local/python35/lib/python3.5/ssl.py", line 628, in do_handshake
    self._sslobj.do_handshake()
ssl.SSLError: [SSL: UNEXPECTED_MESSAGE] unexpected message (_ssl.c:645)

此时修改Producer脚本,请求时带上客户端证书,再次尝试

# kafka cat producer_ssl_client_auth.py 
import sys
import time
from uuid import uuid4
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='172.17.0.19:19093', security_protocol='SSL', ssl_cafile='/tmp/ca-cert', ssl_check_hostname=False, ssl_certfile='/tmp/client.crt', ssl_keyfile='/tmp/client_key.pem')
topic_name = 't-topic'
i = 1
while i < 4:
    msg = 'Message: ' + ''.join(str(uuid4()).split('-'))[:5]
    res = producer.send(topic_name, msg.encode())
    print('Send Message to {} topic, Message value is {} Result: {}'.format(topic_name, msg, res))
    i += 1
    time.sleep(3)

Producer 运行效果

# python35 producer_ssl_client_auth.py
Send Message to t-topic topic, Message value is Message: 3c0fe Result: <kafka.producer.future.FutureRecordMetadata object at 0x7fd51e537630>
Send Message to t-topic topic, Message value is Message: 9f416 Result: <kafka.producer.future.FutureRecordMetadata object at 0x7fd51e543080>
Send Message to t-topic topic, Message value is Message: 6e48b Result: <kafka.producer.future.FutureRecordMetadata object at 0x7fd51e5431d0>

修改Consumer脚本,请求时带上客户端证书

# cat consumer_ssl_client_auth.py 
import sys
from kafka import KafkaConsumer
topic_name = 't-topic'
group_name = sys.argv[1]
consumer = KafkaConsumer(topic_name, bootstrap_servers=' 172.17.0.19:19093', group_id=group_name, security_protocol='SSL', ssl_cafile='/tmp/ca-cert', ssl_check_hostname=False, auto_offset_reset='earliest', ssl_certfile='/tmp/client.crt', ssl_keyfile='/tmp/client_key.pem')
for msg in consumer:
    info = 'Topic: {} Partition: {} Offset: {} Message: {}'.format(msg.topic, msg.partition, msg.offset, msg.value)
    print(info)

运行效果

#  python35 consumer_ssl_client_auth.py t1-topic
Topic: t-topic Partition: 0 Offset: 22 Message: b'aaa111aaa'
Topic: t-topic Partition: 0 Offset: 23 Message: b'Message: 3c0fe'
Topic: t-topic Partition: 0 Offset: 24 Message: b'Message: 9f416'
Topic: t-topic Partition: 0 Offset: 25 Message: b'Message: 6e48b'

results matching ""

    No results matching ""