kafka 无法消费消息如何消费一部分消息么

:转载时请以超链接形式标明文章原始出处和作者信息及本声明
Cloudera发布了Kafka的好文,《Deploying Apache Kafka: A Practical FAQ》,参见:
是否应当为Kafka Broker使用 固态硬盘 (SSD)
实际上使用SSD盘并不能显著地改善 Kafka 的性能,主要有两个原因:
Kafka写磁盘是异步的,不是同步的。就是说,除了启动、停止之外,Kafka的任何操作都不会去等待磁盘同步(sync)完成;而磁盘同步(disk syncs)总是在后台完成的。这就是为什么Kafka消息至少复制到三个副本是至关重要的,因为一旦单个副本崩溃,这个副本就会丢失数据无法同步写到磁盘。
每一个Kafka Partition被存储为一个串行的WAL(Write Ahead Log)日志文件。因此,除了极少数的数据查询,Kafka中的磁盘读写都是串行的。现代的操作系统已经对串行读写做了大量的优化工作。
如何对Kafka Broker上持久化的数据进行加密&
目前,Kafka不提供任何机制对Broker上持久化的数据进行加密。用户可以自己对写入到Kafka的数据进行加密,即是,生产者(Producers)在写Kafka之前加密数据,消费者(Consumers)能解密收到的消息。这就要求生产者(Producers)把加密协议(protocols)和密钥(keys)分享给消费者(Consumers)。
另外一种选择,就是使用软件提供的文件系统级别的加密,例如Cloudera Navigator Encrypt。Cloudera Navigator Encrypt是Cloudera企业版(Cloudera Enterprise)的一部分,在应用程序和文件系统之间提供了一个透明的加密层。
Apache Zookeeper正成为Kafka集群的一个痛点(pain point),真的吗?
Kafka高级消费者(high-level consumer)的早期版本(0.8.1或更早)使用Zookeeper来维护读的偏移量(offsets,主要是Topic的每个Partition的读偏移量)。如果有大量生产者(consumers)同时从Kafka中读数据,对Kafka的读写负载可能就会超出它的容量,Zookeeper就变成一个瓶颈(bottleneck)。当然,这仅仅出现在一些很极端的案例中(extreme cases),即有成百上千个消费者(consumers)在使用同一个Zookeeper集群来管理偏移量(offset)。
不过,这个问题已经在Kafka当前的版本(0.8.2)中解决。从版本0.8.2开始,高级消费者(high-level consumer)能够使用Kafka自己来管理偏移量(offsets)。本质上讲,它使用一个单独的Kafka Topic来管理最近的读偏移量(read offsets),因此偏移量管理(offset management)不再要求Zookeeper必须存在。然后,用户将不得不面临选择是用Kafka还是Zookeeper来管理偏移量(offsets),由消费者(consumer)配置参数 offsets.storage 决定。
Cloudera强烈推荐使用Kafka来存储偏移量。当然,为了保证向后兼容性,你可以继续选择使用Zookeeper存储偏移量。(例如,你可能有一个监控平台需要从Zookeeper中读取偏移量信息。) 假如你不得不使用Zookeeper进行偏移量(offset)管理,我们推荐你为Kafka集群使用一个专用的Zookeeper集群。假如一个专用的Zookeeper集群仍然有性能瓶颈,你依然可以通过在Zookeeper节点上使用固态硬盘(SSD)来解决问题。
Kafka是否支持跨数据中心的可用性
Kafka跨数据中心可用性的推荐解决方案是使用。在你的每一个数据中心都搭建一个Kafka集群,在Kafka集群之间使用MirrorMaker来完成近实时的数据复制。
使用MirrorMaker的架构模式是为每一个"逻辑"的topic在每一个数据中心创建一个topic:例如,在逻辑上你有一个"clicks"的topic,那么你实际上有"DC1.clicks"和&DC2.clicks&两个topic(DC1和DC2指得是你的数据中心)。DC1向DC1.clicks中写数据,DC2向DC2.clicks中写数据。MirrorMaker将复制所有的DC1 topics到DC2,并且复制所有的DC2 topics到DC1。现在每个DC上的应用程序都能够访问写入到两个DC的事件。这个应用程序能够合并信息和处理相应的冲突。
另一种更复杂的模式是在每一个DC都搭建本地和聚合Kafka集群。这个模式已经被Linkedin使用,Linkedin Kafka运维团队已经在&&中有详细的描述(参见&Tiers and Aggregation&)。
Kafka支持哪些类型的数据转换(data transformation)
数据流过的Kafka的时候,Kafka并不能进行数据转换。为了处理数据转换,我们推荐如下方法:
对于简单事件处理,使用&,并且写一个简单的Apache Flume Interceptor。
对于复杂(事件)处理,使用Apache Spark Streaming从Kafka中读数据和处理数据。
在这两种情况下,被转换或者处理的数据可被写会到新的Kafka Topic中,或者直接传送到数据的最终消费者(Consumer)那里。
对于实时事件处理模式更全面的描述,看看&。
如何通过Kafka发送大消息或者超大负荷量?
Cloudera的性能测试表明Kafka达到最大吞吐量的消息大小为10K左右。更大的消息将导致吞吐量下降。然后,在一些情况下,用户需要发送比10K大的多的消息。
如果消息负荷大小是每100s处理MB级别,我们推荐探索以下选择:
如果可以使用共享存储(HDFS、S3、NAS),那么将超负载放在共享存储上,仅用Kafka发送负载数据位置的消息。
对于大消息,在写入Kafka之前将消息拆分成更小的部分,使用消息Key确保所有的拆分部分都写入到同一个partition中,以便于它们能被同一个消息着(Consumer)消费的到,在消费的时候将拆分部分重新组装成一个大消息。
在通过Kafka发送大消息时,请记住以下几点:
Kafka生产者(Producers)能够压缩消息。通过配置参数compression.codec确保压缩已经开启。有效的选项为"gzip"和"snappy"。
Broker配置
message.max.bytes (default: 1000000): Broker能够接受的最大消息。增加这个值以便于匹配你的最大消息。
log.segment.bytes (default: 1GB): Kafka数据文件的大小。确保它至少大于一条消息。默认情况下已经够用,一般最大的消息不会超过1G大小。
replica.fetch.max.bytes (default: 1MB): Broker间复制的最大的数据大小。这个值必须大于message.max.bytes,否则一个Broker接受到消息但是会复制失败,从而导致潜在的数据丢失。
Consumer配置
fetch.message.max.bytes (default: 1MB): Consumer所读消息的最大大小。这个值应该大于或者等于Broker配置的message.max.bytes的值。
其他方面的考虑:
Broker需要针对复制为每一个partition分配一个replica.fetch.max.bytes大小的缓存区。需要计算确认( partition的数量 * 最大消息的大小 )不会超过可用的内存,否则就会引发OOMs(内存溢出异常)。
Consumers有同样的问题,因子参数为 fetch.message.max.bytes :确认每一个partition的消费者针对最大的消息有足够可用的内存。
大消息可能引发更长时间的垃圾回收停顿(garbage collection pauses)(brokers需要申请更大块的内存)。注意观察GC日志和服务器日志。假如发现长时间的GC停顿导致Kafka丢失了Zookeeper session,你可能需要为zookeeper.session.timeout.ms配置更长的timeout值。
Kafka是否支持MQTT或JMS协议
目前,Kafka针对上述协议不提供直接支持。但是,用户可以自己编写Adaptors从MQTT或者JMS中读取数据,然后写入到Kafka中。
引用地址:kafka入门(二)——kafka的逻辑结构
相关推荐:这篇文章来自Spark集成Kafka(/blog/2174765),这里把它单独取出来,作为Kafka的入门吧下载Kafka http://mirror.bit.e
一,组成部分概要
&&&&&&&&&& Producer:消息生产者
&&&&&&&&& Consumer:消息消费者
&&&&&&&&& Topic:特指kafka处理的消息源的不同分类
&&&&&&&&& Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)
&&&&&&&&& Message:消息,是通信的基本单位,每个 producer 可以向一个topic(主题)发布一些消息
&&&&&&&&& Broker:缓存代理,Kafa 集群中的一台或多台服务器统称为 broker
&&&&&&&& &Consumer Group: 消费者组,可以并行消费Topic中partition的消息
&&&&&&&&& Producers:消息和数据生产者,向Kafka 的一个 topic 发布消息的过程叫做producers。
&&&&&&&&& Consumers:消息和数据消费者,订阅topics 并处理其发布的消息的过程叫做 consumers。
二,kafka各个部分介绍 &&&&&&
&&& 1,producers&&&&&&&
消息和数据生产者,向 Kafka 的一个 topic 发布消息的过程叫做producers。
Producer将消息发布到指定的Topic中,同Producer也能决定将此消息归属于哪个比如基于&round-robin&方式或者通过其他的一些算法等.
批量发送可以很有效的提高发送效率。Kafka producer的异步发送模式允许进行批量发送,先将消息缓存在内存中,然后一次请求批量发送出去。 &&
&&& 2,broker
Broker:缓存代理,Kafka集群中的一台或多台服务器统称为 broker。
Message在Broker中通Log追加的方式进行持久化存储。并进行分区(patitions)
为了减少磁盘写入的次数,broker会将消息暂时buffer起来,当消息的个数(或尺寸)达到一定阀值时,再flush到磁盘,这样减少了磁盘IO调用的次数。
&&&&&&& 1,broker的无状态机制&&&&&
&&&&&&&&&&&&&& 1.& Broker没有副本机制,一旦broker宕机,该broker的消息将都不可用。
&&&&&&&&&&& 2.& Broker不保存订阅者的状态,由订阅者自己保存。相关推荐:官方文档:https://kafka.apache.org/documentation.html说下quick start遇到问题:运行kafka二进制项目不需要scala文档API栏目,每个API
&&&&&&&&&&& 3.&无状态导致消息的删除成为难题(可能删除的消息正在被订阅),kafka采用基于时间的SLA(服务水平保证),消息保存一定时间(通常为7天)后会被删除。
&&&&&&&&&&& 4.& 消息订阅者可以rewind back到任意位置重新进行消费,当订阅者故障时,可以选择最小的offset(id)进行重新读取消费消息。
&&&&& 3,message的组成&&
Message消息:是通信的基本单位,每个 producer 可以向一个topic(主题)发布一些消息。
Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的),每个partition存储一部分Message。
partition中的每条Message包含了以下三个属性:
&&&&&& offset&&&&&&&&对应类型:long
&&&&&& MessageSize&&&&&&&&对应类型:int32
&&&& & data是message的具体内容
&&&&& 4,partions组成&&&&&&
kafka基于文件存储.通过分区,可以将日志内容分散到多个server上,来避免文件尺寸达到单机磁盘的上限,每个partiton都会被当前server(kafka实例)保存;
可以将一个topic切分多任意多个partitions,来消息保存/消费的效率.
越多的partitions意味着可以容纳更多的consumer,有效提升并发消费的能力.
&&&& 5,consumers
消息和数据消费者,订阅 topics 并处理其发布的消息的过程叫做 consumers。
在 kafka中,我们 可以认为一个group是一个“订阅者”,一个Topic中的每个partions,只会被一个“订阅者”中的一个consumer消费,不过一个 consumer可以消费多个partitions中的消息(消费者数据小于Partions的数量时)
注: kafka的设计原理决定,对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息.
&&&&&&&&&&&&&&&&
相关推荐:一、zookeeper存储结构总图图片当我们kafka启动运行以后,就会在zookeeper上初始化kafka相关数据,主要包括六大类:consumersadminconf
一,组成部分概要?????????? Producer:消息生产者????????? Consumer:消息消费者????????? Topic:特指kafka处理的消息源的不同分类????????? Partition:topic物理上的分组,一个topic可以分...
相关阅读排行
相关内容推荐
请激活账号
为了能正常使用评论、编辑功能及以后陆续为用户提供的其他产品,请激活账号。
您的注册邮箱:
如果您没有收到激活邮件,请注意检查垃圾箱。q5725827 的BLOG
用户名:q5725827
访问量:9244
注册日期:
阅读量:5863
阅读量:12276
阅读量:396622
阅读量:1087196
51CTO推荐博文
APIProducer API此处只简介一个procedure的例子生产类是用来创建新消息的主题和可选的分区。如果使用Java你需要包括几个包和支持类:import kafka.javaapi.producer.Pimport kafka.producer.KeyedMimport kafka.producer.ProducerC&第一步首先定义producer如何找到集群,如何序列化消息和为消息选择适合的分区。下面吧这些定义在一个标准的JAVA& Properties类中Properties&props&=&new&Properties();
props.put("metadata.broker.list","broker1:9092,broker2:9092");
props.put("serializer.class","kafka.serializer.StringEncoder");
props.put("partitioner.class","example.producer.SimplePartitioner");
props.put("request.required.acks","1");
ProducerConfig&config&=&new&ProducerConfig(props);1.metadata.broker.list 定义了生产者如何找到一个或多个Broker去确定每个topic的Leader。这不需要你设置集群中全套的brokers但至少应包括两个,第一个经纪人不可用可以替换。不需要担心需要指出broker为主题的领袖(分区),生产者知道如何连接到代理请求元数据并连接到正确的broker。2.第二个属性“序列化类定义“。定义使用什么序列化程序传递消息。在我们的例子中,我们使用一个卡夫卡提供的简单的字符串编码器。请注意,encoder必须和下一步的keyedmessage使用相同的类型可以适当的定义"key.serializer.class"根据key改变序列化类。默认的是与"serializer.class"相同3.第三个属性partitioner.class 定义了决定topic中的分区发送规则。这个属性是可选的,但是对于你的特殊的分区实现是重要的。如果存在key将使用kafka默认的分组规则,如果key为null 则使用随机的分区发送策略。4.最后一个属性“request.required.acks”将设置kafka知否需要broker的回应。如果不设置可能将导致数据丢失。1.1此处可以设置为0 生产者不等待broker的回应。会有最低能的延迟和最差的保证性(在服务器失败后会导致信息丢失)1.2此处可以设置为1生产者会收到leader的回应在leader写入之后。(在当前leader服务器为复制前失败可能会导致信息丢失)1.3此处可以设置为-1生产者会收到leader的回应在全部拷贝完成之后。之后可以定义生产者Producer&String, String& producer =new Producer&String, String&(config);此处泛型的第一个type是分区的key的类型。第二个是消息的类型。与上面Properties中定义的对应。现在定义messgaeRandom&rnd&=&new&Random();
long&runtime&=&new&Date().getTime();
String&ip&=&“192.168.2.”&+rnd.nextInt(255);
String&msg&=&runtime&+&“,,”+&此处模拟一个website的访问记录。之后想broker中写入信息.KeyedMessage&String, String& data =new KeyedMessage&String, String&("page_visits",ip, msg);producer.send(data);此处的“page_visits”是要写入的Topic。此处我们将IP设置为分区的key值。注意如果你没有设置键值,即使你定义了一个分区类,kafka也将使用随机发送.Full Code:import&java.util.*;
import&kafka.javaapi.producer.P
import&kafka.producer.KeyedM
import&kafka.producer.ProducerC
public&class&TestProducer&{
&&&public&static&void&main(String[]&args)&{
&&&&&&&long&events&=&Long.parseLong(args[0]);
&&&&&&&Random&rnd&=&new&Random();
&&&&&&&Properties&props&=&new&Properties();
&&&&&&&props.put("metadata.broker.list","broker1:9092,broker2:9092&");
&&&&&&&props.put("serializer.class","kafka.serializer.StringEncoder");
&&&&&&&props.put("partitioner.class","example.producer.SimplePartitioner");
&&&&&&&props.put("request.required.acks",&"1");
&&&&&&&ProducerConfig&config&=&new&ProducerConfig(props);
&&&&&&&Producer&String,&String&&producer&=&new&Producer&String,String&(config);
&&&&&&&for&(long&nEvents&=&0;&nEvents&&&&nEvents++)&{
&&&&&&&&&&&&&&&long&runtime&=&newDate().getTime();&
&&&&&&&&&&&&&&&String&ip&=&“192.168.2.”&+rnd.nextInt(255);
&&&&&&&&&&&&&&&String&msg&=&runtime&+“,,”&+&
&&&&&&&&&&&&&&&KeyedMessage&String,String&&data&=&new&KeyedMessage&String,&String&("page_visits",ip(key),&msg);
&&&&&&&&&&&&&&&producer.send(data);
&&&&&&&producer.close();
Partitioning&Code:&(分区函数)
import&kafka.producer.P
import&kafka.utils.VerifiableP
public&class&SimplePartitioner&implementsPartitioner&String&&{
&&&public&SimplePartitioner&(VerifiableProperties&props)&{
&&&public&int&partition(String&key,&int&a_numPartitions)&{
&&&&&&&int&partition&=&0;
&&&&&&&int&offset&=&key.lastIndexOf('.');
&&&&&&&if&(offset&&&0)&{
&&&&&&&&&&partition&=&Integer.parseInt(&key.substring(offset+1))&%a_numP
&&&&&&return&
}上面分区的作用是相同的IP将发送至相同的分区。但此时你的消费者需要知道如何去处理这样的规则消息。使用前需要建立topicbin/kafka-create-topic.sh&--topicpage_visits&--replica&3&--zookeeper&localhost:2181&--partition&5可以使用下面的工具验证你发送的消息bin/kafka-console-consumer.sh&--zookeeperlocalhost:2181&--topic&page_visits&--from-beginningHigh Level Consumer API顶层接口:class&Consumer&{
&&*&&创建java的消费者与kafka的connect
&&*&&@param&config&&至少需要提供consumer的groupId和zookeeper.connect.
public&statickafka.javaapi.consumer.ConsumerConnector&createJavaConsumerConnector(config:ConsumerConfig);
ConsumerConnector:
public&interfacekafka.javaapi.consumer.ConsumerConnector&{
&&*&&为每一个主题创建一个泛型的消息流
&&*&&@param&topicCountMap&&提供topic和Stream的一一对应
&&*&&@param&decoder&解析器&
&&*&&@return&Map&&&&topic&,List&#streams&&
&&*&&&&&&&&&&&&&&&&&&&此处的KafkaStream提供对内容的Iterable读取
&public&&K,V&&Map&String,&List&KafkaStream&K,V&&&
&&&&createMessageStreams(Map&String,Integer&&topicCountMap,&Decoder&K&&keyDecoder,&Decoder&V&valueDecoder);
&&*&&同上.
&public&Map&String,&List&KafkaStream&byte[],&byte[]&&&createMessageStreams(Map&String,&Integer&&topicCountMap);
&&&*&&&&&&&&&&建一个匹配的通配符主题的消息流的List
&&*&&@param&topicFilter一个topicfilter指定Consumer订阅的话题(
&&*&&包含了一个白名单和黑名单).
&&*&&@param&numStreams&messagestreams的数量
&&*&&@param&keyDecoder&message&key解析器
&&*&&@param&valueDecoder&a&message解析器
&&*&&@return&同上
&public&&K,V&&List&KafkaStream&K,V&&
&&&createMessageStreamsByFilter(TopicFilter&topicFilter,&int&numStreams,Decoder&K&&keyDecoder,&Decoder&V&&valueDecoder);
&………………………….(其余接口类似,是上述方法的重载方法)
&&*&&提交本连接器所连接的所有分区和主题
&public&void&commitOffsets();
&&*&&停止当前Consumer
&public&void&shutdown();
}&e.g &example1. 为什使用高级消费者(High Level Consumer)&&&&&&&& 有时消费者从卡夫卡读取消息不在乎处理消息的偏移量逻辑,只是消费消息内部的信息。高级消费者提供了消费信息的方法而屏蔽了大量的底层细节。&&&&&&&& 首先要知道的是,高级消费者从zookeeper的特殊分区存储最新偏离。这个偏移当kafka启动时准备完毕。这一般是指消费者群体(Consumer group)[此处的意思,kafka中的消息是发送到Consumer group中的任一个consumer上的,kafka保存的是整体的偏移。此处不知是否理解正确请大虾指点。]&&&&&&&& 请小心,对于kafka集群消费群体的名字是全局的,任何的“老”逻辑的消费者应该被关闭,然后运行新的代码。当一个新的进程拥有相同的消费者群的名字,卡夫卡将会增加进程的线程消费topic并且引发的“重新平衡(reblannce)”。在这个重新平衡中,卡夫卡将分配现有分区到所有可用线程,可能移动一个分区到另一个进程的消费分区。如果此时同时拥有旧的的新的代码逻辑,将会有一部分逻辑进入旧得Consumer而另一部分进入新的Consumer中的情况.2. Designing a High Level Consumer了解使用高层次消费者的第一件事是,它可以(而且应该!)是一个多线程的应用。线程围绕在你的主题分区的数量,有一些非常具体的规则:1.&如果你提供比在主题分区多的线程数量,一些线程将不会看到消息2.&如果你提供的分区比你拥有的线程多,线程将从多个分区接收数据3.&如果你每个线程上有多个分区,对于你以何种顺序收到消息是没有保证的。举个栗子,你可能从分区10上获取5条消息和分区11上的6条消息,然后你可能一直从10上获取消息,即使11上也拥有数据。4.添加更多的进程/线程将使卡夫卡重新平衡,可能改变一个分区到线程的分配。这里是一个简单的消费者栗子:package&com.test.
import&kafka.consumer.ConsumerI
import&kafka.consumer.KafkaS
public&class&ConsumerTest&implements&Runnable&{
&&&&privateKafkaStream&m_
&&&&private&intm_threadN
&&&&publicConsumerTest(KafkaStream&a_stream,&int&a_threadNumber)&{
&&&&&&&m_threadNumber&=&a_threadN
&&&&&&&&m_stream&=a_
&&&&public&void&run()&{
&&&&&&&ConsumerIterator&byte[],&byte[]&&it&=&m_stream.iterator();
&&&&&&&&while(it.hasNext())
System.out.println("Thread&"&+&m_threadNumber+&":&"&+&new&String(it.next().message()));
&&&&&&&System.out.println("Shutting&down&Thread:&"&+&m_threadNumber);
}在这里有趣的是,(it.hasnext())。这个代码将从卡夫卡读取直到你停下来。3. Config不像simpleconsumer高层消费者为你很多的提供需要bookkeeping(?)和错误处理。但是你要告诉卡夫卡这些信息。下面的方法定义了创建高级消费者基础配置:private&static&ConsumerConfigcreateConsumerConfig(String&a_zookeeper,&String&a_groupId)&{
&&&&&&&&Propertiesprops&=&new&Properties();
&&&&&&&props.put("zookeeper.connect",&a_zookeeper);
&&&&&&&&props.put("group.id",&a_groupId);
&&&&&&&props.put("zookeeper.session.timeout.ms",&"400");
&&&&&&&props.put("zookeeper.sync.time.ms",&"200");
&&&&&&&props.put("mit.interval.ms",&"1000");
&&&&&&&&return&newConsumerConfig(props);
&&&&}zookeeper.connect& 指定zookeeper集群中的一个实例,kafka利用zookeeper储存topic的分区偏移值。Groupid 消费者所属的Consumer Group(消费者群体)zookeeper.session.timeout.ms&zookeeper的超时处理<mit.interval.ms&& 属性自动提交的间隔。这将替代消息被消费后提交。如果发生错误,你将从新获得未更新的消息。4.使用线程池处理消息public&void&run(int&a_numThreads)&{
&&&Map&String,&Integer&&topicCountMap&=&new&HashMap&String,Integer&();
&&&topicCountMap.put(topic,&new&Integer(a_numThreads));
&&&Map&String,&List&KafkaStream&byte[],&byte[]&&&consumerMap&=&consumer.createMessageStreams(topicCountMap);
&&&List&KafkaStream&byte[],&byte[]&&&streams&=consumerMap.get(topic);
&&&//&now&launch&all&the&threads
&&&executor&=&Executors.newFixedThreadPool(a_numThreads);
&&&//&now&create&an&object&to&consume&the&messages
&&&int&threadNumber&=&0;
&&&for&(final&KafkaStream&stream&:&streams)&{
&&&&&&&executor.submit(new&ConsumerTest(stream,&threadNumber));
&&&&&&&threadNumber++;
}首先我们创建一个map,告诉kafka提供给哪个topic多少线程。consumer.createmessagestreams是我们如何把这个信息传递给卡夫卡。返回的是一个包含kafkastream 的以topic 为键list的map结合。(注意,这里我们只向卡夫卡注册一个话题,但我们可以为map中多添加一个元素的)最后,我们创建的线程池和通过一项新的consumertest对象,每个线程运转我们的业务逻辑。5.清理和异常处理Kafka在每次处理后不会立即更新zookeeper上的偏移值,她会休息上一段时间后提交。在这段时间内,你的消费者可能已经消费了一些消息,但并没有提交到zookeeper上。这样你可能会重复消费数据。同时一些时候,broker失败从新选取leader是也可能会导致重复消费消息。为了避免这种情况应该清理完成后再关闭,而不是直接使用kill命令。e.gtry&{
&&&Thread.sleep(10000);
}&catch&(InterruptedException&ie)&{
example.shutdown();full codepackage&com.test.
import&kafka.consumer.ConsumerC
import&kafka.consumer.KafkaS
importkafka.javaapi.consumer.ConsumerC
import&java.util.HashM
import&java.util.L
import&java.util.M
import&java.util.P
importjava.util.concurrent.ExecutorS
import&java.util.concurrent.E
public&class&ConsumerGroupExample&{
&&&private&final&ConsumerConnector&
&&&private&final&String&
&&&private&&ExecutorService&
&&&public&ConsumerGroupExample(String&a_zookeeper,&String&a_groupId,&Stringa_topic)&{
&&&&&&&consumer&=&kafka.consumer.Consumer.createJavaConsumerConnector(
&&&&&&&&&&&&&&&createConsumerConfig(a_zookeeper,&a_groupId));
&&&&&&&this.topic&=&a_
&&&public&void&shutdown()&{
&&&&&&&if&(consumer&!=&null)&consumer.shutdown();
&&&&&&&if&(executor&!=&null)&executor.shutdown();
&&&public&void&run(int&a_numThreads)&{
&&&&&&&Map&String,&Integer&&topicCountMap&=&new&HashMap&String,Integer&();
&&&&&&&topicCountMap.put(topic,&new&Integer(a_numThreads));
&&&&&&&Map&String,&List&KafkaStream&byte[],&byte[]&&&consumerMap&=&consumer.createMessageStreams(topicCountMap);
&&&&&&&List&KafkaStream&byte[],&byte[]&&&streams&=consumerMap.get(topic);
&&&&&&&//&now&launch&all&the&threads
&&&&&&&executor&=&Executors.newFixedThreadPool(a_numThreads);
&&&&&&&//&now&create&an&object&to&consume&the&messages
&&&&&&&int&threadNumber&=&0;
&&&&&&&for&(final&KafkaStream&stream&:&streams)&{
&&&&&&&&&&&executor.submit(new&ConsumerTest(stream,&threadNumber));
&&&&&&&&&&&threadNumber++;
&&&private&static&ConsumerConfig&createConsumerConfig(String&a_zookeeper,String&a_groupId)&{
&&&&&&&Properties&props&=&new&Properties();
&&&&&&&props.put("zookeeper.connect",&a_zookeeper);
&&&&&&&props.put("group.id",&a_groupId);
&&&&&&&props.put("zookeeper.session.timeout.ms",&"400");
&&&&&&&props.put("zookeeper.sync.time.ms",&"200");
&&&&&&&props.put("mit.interval.ms",&"1000");
&&&&&&&return&new&ConsumerConfig(props);
&&&public&static&void&main(String[]&args)&{
&&&&&&&String&zooKeeper&=&args[0];
&&&&&&&String&groupId&=&args[1];
&&&&&&&String&topic&=&args[2];
&&&&&&&int&threads&=&Integer.parseInt(args[3]);
&&&&&&&ConsumerGroupExample&example&=&new&ConsumerGroupExample(zooKeeper,groupId,&topic);
&&&&&&&example.run(threads);
&&&&&&&try&{
&&&&&&&&&&&Thread.sleep(10000);
&&&&&&&}&catch&(InterruptedException&ie)&{
&&&&&&&example.shutdown();
}此处的启动命令需提供1:2181&group3 &&myTopic& 41.1:2181 zookeeper 的端口和地址2.group3&& Consumer Group Name3.myTopic& consumer消费消息的message4.消费topic的线程数
了这篇文章
类别:┆阅读(0)┆评论(0)

我要回帖

更多关于 kafka 消息重复消费 的文章

 

随机推荐