关闭kafka服务 对外提供服务 安全吗

Kafka vs RocketMQ——单机系统可靠性
前几期的评测中,我们对比了Kafka和RocketMQ的吞吐量和稳定性,本期我们要引入一个新的评测标准——软件可靠性。
何为“可靠性”?
先看下面这种情况:有A,B两辆越野汽车,在城市的周边地区均能很好应对泥泞的路况。当一同开去穿越西藏,A车会因为西藏本地的汽油不达标,导致油路受阻无法点火,而B车顺利完成了穿越。因此我们说,B车的可靠性比A车高。
何为“软件可靠性”?
“软件的可靠性”就是考察软件在各种异常突发的情况下的应对能力。常见的软件异常有:磁盘损坏、进程意外退出、宿主机宕机等情况。
何为“消息中间件的可靠性”?
对于消息中间件来说,“可靠性”最直接的指标就是——消息数据不丢失。此外,消息不重投、服务一主多备等特性也可以用来评估可靠性。
那么Kafka和RocketMQ(以下简称RMQ)在可靠性上孰优孰劣呢?和我们走进本期的测试比拼吧!
在消息收发的过程中,分别模拟Broker服务进程被Kill、物理机器掉电的异常场景,多次实验,查看极端情况下消息系统的可靠性。
以下场景使用多个发送端向一个Topic发送消息,发送方式为同步发送,分区数为8,只启动一个订阅者。
模拟进程退出
在消息收发过程中,利用Kill -9 命令使Broker进程终止,然后重新启动,得到可靠性数据如下:
注:以上测试场景中Kafka的异步刷盘间隔为1秒钟,同步发送需设置request.required.acks=1,否则会出现消息丢失。
在Broker进程被终止重启,Kafka和RMQ都能保证同步发送的消息不丢,因为进程退出后操作系统能确保将该进程遗留在内存的数据刷到磁盘上。实验中,Kafka出现了极少量的消息重复。再次可以确定此场景中,二者的可靠性都很高。
模拟机器掉电
在消息收发过程中,直接拔掉Broker所在的宿主机电源,然后重启宿主机和Broker应用。因受到机房断电限制,我们在本场景测试中使用的是普通PC机器。得到可靠性数据如下:
测试发现,即使在并发很低的情况下,Kafka和RMQ都无法保证掉电后不丢消息。这个时候,就需要改变刷盘策略了。我们把刷盘策略由“异步刷盘”变更为“同步刷盘”,就是说,让每一条消息都完成存储后才返回,以保证消息不丢失。注:关于两种刷盘模式的详细区别可以参照文档最下方的说明
重新执行上面的测试,得到数据如下:
首先,设置同步刷盘时,二者都没出现消息丢失的情况。限于我们使用的是普通PC机器,两者吞吐量都不高。此时Kafka的最高TPS仅有500条/秒,RMQ可以达到4000条/秒,已经是Kafka的8倍。
为什么Kafka的吞吐量如此低呢?因为Kafka本身是没有实现任何同步刷盘机制的,就是说在这种场景下测试,Kafka注定是要丢消息的。但要想做到每一条消息都在落盘后才返回,我们可以通过修改异步刷盘的频率来实现。设置参数log.flush.interval.messages=1,即每条消息都刷一次磁盘。这样的做法,Kafka也不会丢消息了,但是频繁的磁盘读写直接导致性能的下降。
另外,二者在服务恢复后,均出现了消息重复消费的情况,这说明消费位点的提交并不是同步落盘的。不过,幸好Kafka和RMQ都提供了自定义消费位点的接口,来避免大量的重复消费。
在Broker进程被Kill的场景, Kafka和RocketMQ都能在保证吞吐量的情况下,不丢消息,可靠性都比较高。
在宿主机掉电的场景,Kafka与RocketMQ均能做到不丢消息,此时Kafka的吞吐量会急剧下跌,几乎不可用。RocketMQ则仍能保持较高的吞吐量。
在单机可靠性方面,RocketMQ综合表现优于Kafka。
服务端为单机部署,机器配置如下:
应用版本:
同步刷盘和异步刷盘的区别
同步刷盘是在每条消息都确认落盘了之后才向发送者返回响应;而异步刷盘中,只要消息保存到Broker的内存就向发送者返回响应,Broker会有专门的线程对内存中的消息进行批量存储。所以异步刷盘的策略下,当机器突然掉电时,Broker内存中的消息因无法刷到磁盘导致丢失。
本期测试中,RocketMQ比Kafka具有更高的单机可靠性。对于普通业务,部署异步刷盘模式可以得到更高的性能;对于丢消息零容忍的业务,则更适用RocketMQ同步刷盘的模式,在享受高可靠性保障的同时,又能拥有较高的吞吐量。
实际上,单机可靠性只是软件可靠性测试的一个环节,Kafka和RocketMQ都提供了主备机模式,来解决服务器的单点故障。这点我们在后续会继续实验摸索,敬请期待接下来的比拼!7.4 认证和acl
kafka附带一个可插拔的Authorizer和out-of-box authorizer实现,并使用zookeeper来存储所有acl。acl的格式定义 &Principal P is [Allowed/Denied] Operation O From Host H On Resource R”,你可以在KIP-11上了解有关acl结构的更多信息。为了添加,删除或列出acl,你可以使用Kafka authorizer CLI 。默认情况下,如果资源R没有关联acl,除了超级用户,没有用户允许访问,如果你想改变这种行为,你可以在broker.properties配置:
allow.everyone.if.no.acl.found=true
你也可以在broker.properties添加超级用户,像这样(注意分隔符是分号,因为SSL的用户名是逗号)。
super.users=User:Bob;User:Alice
默认情况下,SSL的用户名称的形式是&CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown&.可修改,在broker.properties设置自定义的PrincipalBuilder,如下。
principal.builder.class=CustomizedPrincipalBuilderClass
默认情况下,SASL用户名是Kerberos principal的主要组成部分。可修改,通过在broker.properteis中的sasl.kerberos.principal.to.local.rules来自定义规则。sasl.kerberos.principal.to.local.rules的格式是一个列表(每个工作的规则列表),相同方式如 的 auth_to_local。每个规则的开头RULE:包含一个格式声明[n:string](regexp)s/pattern/replacement/g。详细信息参见Kerberos文档。举个例子,添加规则,以正确的转换为用户,同时保持默认的规则︰
sasl.kerberos.principal.to.local.rules=RULE:[1:$1@$0](.*@)s/@.*//,DEFAULT
命令行界面
Kafka认证管理CLI(和其他的CLI脚本)可以在bin目录中找到。CLI脚本名是kafka-acls.sh。以下列出了所有脚本支持的选项:
添加一个acl
移除一个acl
--authorizer
authorizer的完全限定类名
kafka.security.auth.SimpleAclAuthorizer
Configuration
--authorizer-properties
key=val,传给authorizer进行初始化,例如:zookeeper.connect=localhost:2181
Configuration
指定集群作为资源。
--topic [topic-name]
指定topic作为资源。
--group [group-name]
指定 consumer-group 作为资源。
-allow-principal
添加到允许访问的ACL中,Principal是PrincipalType:name格式。你可以指定多个。
--deny-principal
添加到拒绝访问的ACL中,Principal是PrincipalType:name格式。你可以指定多个。
--allow-host
--allow-principal中的principal的IP地址允许访问。
如果--allow-principal指定的默认值是*,则意味着指定“所有主机”
--deny-host
允许或拒绝的操作。有效值为:读,写,创建,删除,更改,描述,ClusterAction,全部
--operation
--deny-principal中的principals的IP地址拒绝访问。
如果 --deny-principal指定的默认值是 * 则意味着指定 &所有主机&
--producer
为producer角色添加/删除acl。生成acl,允许在topic上WRITE, DESCRIBE和CREATE集群。
Convenience
--consumer
为consumer role添加/删除acl,生成acl,允许在topic上READ, DESCRIBE 和 consumer-group上READ。
Convenience
假设所有操作都是yes,规避提示
Convenience
添加acl假设你要添加一个acl “以允许198.51.100.0和198.51.100.1,Principal为User:Bob和User:Alice对主题是Test-Topic有Read和Write的执行权限” 。可通过以下命令实现:
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic
默认情况下,所有的principal在没有一个明确的对资源操作访问的acl都是拒绝访问的。在极少的情况下,acl允许访问所有的资源,但一些principal我们可以使用 --deny-principal 和 --deny-host来拒绝访问。例如,如果我们想让所有用户读取Test-topic,只拒绝IP为198.51.100.3的User:BadBob,我们可以使用下面的命令:
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:* --allow-host * --deny-principal User:BadBob --deny-host 198.51.100.3 --operation Read --topic Test-topic
需要注意的是--allow-host和deny-host仅支持IP地址(主机名不支持)。上面的例子中通过指定--topic [topic-name]作为资源选项添加ACL到一个topic。同样,用户通过指定--cluster和通过指定--group [group-name]消费者组添加ACL。
删除acl删除和添加是一样的,--add换成--remove选项,要删除第一个例子中添加的,可以使用下面的命令:
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic
acl列表我们可以通过--list选项列出所有资源的ACL。假设要列出Test-topic,我们可以用下面的选项执行CLI所有的ACL:
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic Test-topic
添加或删除作为生产者或消费者的principalacl管理添加/移除一个生产者或消费者principal是最常见的使用情况,所以我们增加更便利的选项处理这些情况。为主题Test-topic添加一个生产者User:Bob,我们可以执行以下命令:
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --producer --topic Test-topic
同样,添加Alice作为主题Test-topic的消费者,用消费者组为Group-1,我们只用 --consumer 选项:
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --consumer --topic test-topic --group Group-1
注意,消费者的选择,我们还必须指定消费者组。从生产者或消费者角色删除主体,我们只需要通过--remove选项。
我怎么指定生产者和消费者的用户呢?
是否如果要使用acl需要把SSL和SASL的流程都走一遍呢?
不用,是独立的
您好,那么我该如何指定生产者或者消费者的username呢?
你把前面2篇SSL和SASL的看完了,就知道了。
请问这里的用户指的是什么?我没有配置SSL和SASL,直接按照本页的步骤进行设置,用这条命令
/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --operation All --allow-principal User:* --allow-host 192.168.70.101&--allow-host 192.168.70.102&--add --cluster&
给集群配了all权限,但是启动集群的时候还是会发生很多权限问题,比如没有update metadata的权限。但是如果设置了allow.everyone.if.no.acl.found=true,则权限控制好像整个是失效的,任意一台没有acl权限的host上的root用户都可以对集群收发消息。
在server.properteis中增加
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
这个为什么在kafka官网中没有提到?你是从哪里知道的这个参数。
请问,kafka acl有没有Java接口?
大致看了一下,没找到相应的API。
我也遇到了你这种问题,设置了allow.everyone.if.no.acl.found=true 后感觉权限就失效了,但是不设置这个 启动的时候又会有一堆错误,请问您后来是怎么解决的
我添加了&authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer 这个配置,为什么感觉权限控制也失效的呢,从没有配置权限的机器上也能向 集群里 写入数据
请问kafka可以按用户隔离数据吗?比如不同的用户可以建相同的topic,一个用户只能看到自己创建的topic?
大神,既然ACL与SSL是独立的,那么使用这个--allow-principal User:xxx 参数该如何配置Client用户名呢,通过&kafka_client_jaas.conf 配置吗?
给你个常用的例子吧
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1
kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --operation Write --operation Read --allow-principal User:* --allow-host 192.168.0.11 --add --topic test1
这样,就可以在11节点上对 topic test1拥有读与写的权限
这个不是通过IP粒度的吗,如果要通过User:Bob 这样的粒度该如何操作呢,就像上面例子中那个命令:&&bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Bob --allow-principal User:Alice --allow-host 198.51.100.0 --allow-host 198.51.100.1 --operation Read --operation Write --topic Test-topic
KafkaClient {& org.mon.security.plain.PlainLoginModule required& username="alice"& password="alice";};
也就是必须和上一篇介绍的SASL/PLAIN 认证同时使用咯?
大神,你给的这个例子可以达到不用设置sasl就ip粒度的控制吧。我按照评论里的方法,设置了
server.properteis中的authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer,然后执行了./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --operation All --allow-principal User:* --allow-host 172.16.64.132 --add --cluster,然后按非sasl方式连接kafka失败
群主我想请教一个问题,望您指导下感谢。
kafka可否能像hdfs一样使用proxyuser,比如kafka的superuser我指定usera,然后我通过usera &代理创建userb的 & producer,完成消息的产生,acl还是userb的?感谢!
kafka的存在了zk里面。
群主您好:那这种需求是不是需要二次开发了,比如借助redis判断权限,因为我想给kafka平台话,多应用接入,实现kerberos下的权限管理,hadoop在kerberos下是可以代理用户的,kafka我查了部分资料没发现支持(也许我查询的资料过少吧),类似于通过keytab创建代理用户执行操作,问您下这个是不是目前版本还不支持?再次感谢!
其他的项目我不太清楚,但是kerberos是依附于系统的。kafka只是实现它用于判断,理论上是可以做到的。
嗯 &感谢群主 细心解答。
楼主,你好。因为我的consumer是动态增加的,而且要以不同的用户名连接broker进行消费。目前的user是通过配置文件在kafka启动的时候配置的SASL, 用户名也是启动时指定的,如果我的kafka已经运行了,能不能不重启kafka而动态增加用户。
那有没有什么办法可以从数据库中获取用户名和密码来验证呢?
半兽人之家
您还未填写推送消息的邮箱,请填写您常用的邮箱地址!Kafka官方文档安全部分翻译(一)- 利用SSL进行加密和身份认证 | Peablog翻译说明:基于官方0.10.1版本文档翻译, 原文地址:7. 安全7.1 安全概览在0.9.0.0版本中,Kafka社区添加了一系列可以单独或组合使用的特性来增加Kafka集群的安全性。这些特性都是被考虑为测试版的,如下当前支持的安全措施:可以利用SSL或者SASL(Kerberos)协议来对客户端(生产者和消费者)或其他brokers/工具到brokers的连接进行身份认证。SASL/PLAIN也可以在0.10.0.0以后的版本中使用对brokers到ZooKeeper的连接进行身份认证利用SSL对brokers与客户端之间、brokers与brokers之间、brokers和其他工具之间的数据传输进行加密对客户端的读写进行权限控制权限控制是可插拔的并且支持与外部的授权服务进行集成值得注意的是安全是可选的,非安全的集群也是支持的,同样身份认证和非身份认证、加密和非加密客户端也是支持的。下面的教程解释了如何在客户端和brokers之间配置和利用这些安全特性。7.2 利用SSL进行加密和身份认证Apache Kafka允许客户端通过SSL进行连接,默认情况下SSl是禁用的,但是可以根据需要来开启。7.2.1 为每个Kafka broker生成SSL key和证书部署HTTPS的第一步就是为集群中的每台机器生成key和证书。你可以用Java的keytool来完成这项工作。我们首先将生成key到一个临时的密钥库中,这样我们后面就可以用CA将其导出并签名。1keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey在上面的命令中,你需要指定两个参数:keystore: 存储证书的密钥库文件名。keystore文件包含私钥和证书,因此,它需要被安全的保存。validity: 证书的有效天数注意:默认情况下ssl.endpoint.identification.algorithm是未定义的,因此基于主机名的认证是不会有效的。要想启用基于主机名的认证认证,请设置如下的配置:1ssl.endpoint.identification.algorithm=HTTPS一旦启用,客户端就会根据下面两个字段中的其中一个来检查服务器的全限定域名(FQDN):Common Name (CN)Subject Alternative Name (SAN)这两个字段都是可用的,然后RFC-2818推荐使用SAN。SAN也更加可灵活,允许定义多个DNS报文。另一个好处就是CN可以被设置为更加有意义的值, 可添加如下参数-ext SAN=DNS:{FQDN}到keytool命令:1keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -ext SAN=DNS:{FQDN}可以用如下的命令来验证生成的证书中内容:1keytool -list -v -keystore server.keystore.jks7.2.2 创建你自己的CA经过上面的步骤,集群中的每台机器都有了一个公私钥对,以及一个证书来标识这个机器。但是这个证书是未签名的,也就是说攻击者可以创建这样一个证书来假装任意一台机器。因此,对集群中的每台机器都进行签名来防止证书伪造是非常重要的。证书授权机构(CA)是负责证书签名的。CA的工作就像政府管理护照一样 - 政府给每个护照盖戳让护照变得难以伪造。其他政府验证验证这个戳来确保护照是真是可信的。类似的,CA给证书签名,密码学保证了一个签名后的证书是难以通过计算来伪造的。因此,只要CA是一个真实可信的机构,客户端就有很高可信度认为他们正在连接的是信得过的机器。1openssl req -new -x509 -keyout ca-key -out ca-cert -days 365生成的CA是一个简单的公私钥对和证书,准备用来给其他证书签名。下一步是添加生成的CA到客户端受信任区(clients’ truststore),这样客户端就能信任这个CA:1keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert注意:如果你在中通过将ssl.client.auth设置为”requested” 或者 “required”来进行客户端认证,你必须也要给brokers提供一个信任库,它应该包含所有给客户端密钥签名的CA证书。1keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert与步骤1中的密钥库相比较,密钥库是存储了每台机器各自的标识,而客户端的信任库存储了所有客户端应该信任的证书。将一个证书导入到一个信任库中也意味着信任所有由这个证书签发的证书。由上面类推,信任一个政府(CA)也就意味着信任这个政府签发的所有护照(证书)。这个属性被称为信任链,在一个大Kafka集群中部署SSL时非常有用。你可以用同一个CA给集群中所有的证书签名,同时让所有机器都共享同一个信任这个CA的信任库。这样所有机器都能信任所有其他机器。7.2.3 给证书签名下一步是用第二步生成的CA证书给第一步生成的所有证书签名。首先,你需要从密钥库中导出证书:1keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file然后利用CA给导出的证书签名:1openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days {validity} -CAcreateserial -passin pass:{ca-password}最后,你需要把CA证书和签名后的证书都导入到密钥库中:12keytool -keystore server.keystore.jks -alias CARoot -import -file ca-certkeytool -keystore server.keystore.jks -alias localhost -import -file cert-signed参数定义如下:keystore: 密钥库的位置ca-cert: CA证书ca-key: CA的私钥ca-password: CA的密码cert-file: 服务端导出的未签名的证书cert-signed: 服务端导出已签名的证书下面是一个包含上面所有步骤的bash脚本。注意其中一条命令假设密码是test1234, 所以请就使用这个密码或者在运行脚本前修改为其他密码。123456789101112#!/bin/bash#Step 1keytool -keystore server.keystore.jks -alias localhost -validity 365 -keyalg RSA -genkey#Step 2openssl req -new -x509 -keyout ca-key -out ca-cert -days 365keytool -keystore server.truststore.jks -alias CARoot -import -file ca-certkeytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert#Step 3keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-fileopenssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:test1234keytool -keystore server.keystore.jks -alias CARoot -import -file ca-certkeytool -keystore server.keystore.jks -alias localhost -import -file cert-signed7.2.4 配置Kafka BrokersKafka Brokers 支持在多个端口上监听连接。我们需要在server.properties中配置如下属性,它必须有一个或者多个逗号分隔的值:1listeners如果没有为broker间通信启用SSL(如何启用可参见下面),PLAINTEXT和SSL端口都必须设置。1listeners=PLAINTEXT://host.name:port,SSL://host.name:port在broker端需要有如下的SSL配置:12345ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jksssl.keystore.password=test1234ssl.key.password=test1234ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jksssl.truststore.password=test1234一些可选的设置值得考虑下:ssl.client.auth=none (“required” =& 要求客户端身份认证, “requested” =& 要求进行客户端身份认证但没有证书的客户端也可以连接,不建议使用”requested”, 因为它提供的是一个无感知的安全同时未配置的客户端仍然可以成功连接)ssl.cipher.suites (可选的).密码套件是身份认证、加密、MAC、密钥交换算法那 利用TLS或者SSL网络协议(默认是空列表)ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1(列出你打算接受的从客户端发起的SSL协议,注意TLS取代了已经废弃的SSL,不推荐在生产环境中使用SSL)ssl.keystore.type=JKSssl.truststore.type=JKSssl.secure.random.implementation=SHA1PRNG如果你想为broker间通信启用SSL,在broker配置文件中添加如下配置:1security.inter.broker.protocol=SSL由于在不同的导入规则,Oracle的实现上默认限制了密码算法的强度。如果需要使用更高强度的算法(例如,256-bit密钥的AES), JDK/JRE 中必须包含 。更多信息参见.JRE/JDK有默认的伪随机数生成器(PRNG)用于加密操作,因此不需要使用ssl.secure.random.implementation来配置具体的实现方法。但是,某些实现方法是有性能问题的(特别是Linux系统上的默认选项:NativePRNG, 利用的是一个全局锁),为以防SSL连接性能成为问题,请考虑明确设置使用的实现方法。SHA1PRNG方式的实现是非阻塞的,在高负载的情况下也能有很好的性能表现(单broker在有副本的情况下达50MB每秒的生产消息)。一旦你启动broker, 你就能看见server.log :1 :
& (192.168.64.1,9092,), & (192.168.64.1,9093,)为了检查密钥库和信任库是否正确启动,你可以运行如下的命令:1openssl s_client -debug -connect localhost:9093 -tls1(注意:TLSv1需要包含在ssl.enabled.protocols配置的列表中)在这个命令的输出中,你可以看见服务端的证书:12345-----BEGIN CERTIFICATE-----{variable sized -----END subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Sriharsha issuer=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=kafka/emailAddress=如果没有显示证书或者有其他错误信息那密钥库就没有配置正确。7.2.5 配置Kafka客户端SSL仅被新的Kafka生产者和消费者支持,旧的API是不支持的。对生产者和消费者的SSL配置是相同的。如果broker不要求客户端身份认证,那所需最小配置示例如下:123security.protocol=SSLssl.truststore.location=/var/private/ssl/kafka.client.truststore.jksssl.truststore.password=test1234如果要求客户端身份认证,那必须创建一个步骤一里面的密钥库,同时也要有如下配置:123ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jksssl.keystore.password=test1234ssl.key.password=test1234根据我们的需求和broker的配置,也可能需要设置其他配置:ssl.provider (可选的). SSL连接的安全提供者的名字。默认值是默认的安全提供者JVM。ssl.cipher.suites (可选的).密码套件是身份认证、加密、MAC、密钥交换算法那 利用TLS或者SSL网络协议(默认是空列表)ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 至少有一个协议是包含在broker端设置的协议列表中的ssl.keystore.type=JKSssl.truststore.type=JKS使用console-producer和console-consumer的示例:12kafka-console-producer.sh --broker-list localhost:9093 --topic test --producer.config client-ssl.propertieskafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config client-ssl.properties

我要回帖

更多关于 关闭kafka服务 的文章

 

随机推荐