spring cloud streamcloudstream 会出现重复消费吗,怎么解决啊

《Spring Cloud微服务实战》【摘要 书评 试读】- 京东图书
该商品已下柜,非常抱歉!
Spring Cloud微服务实战
商品介绍加载中...
扫一扫,精彩好书免费看
服务承诺:
京东平台卖家销售并发货的商品,由平台卖家提供发票和相应的售后服务。请您放心购买!
注:因厂家会在没有任何提前通知的情况下更改产品包装、产地或者一些附件,本司不能确保客户收到的货物与商城图片、产地、附件说明完全一致。只能确保为原厂正货!并且保证与当时市场上同样主流新品一致。若本商城没有及时更新,请大家谅解!
权利声明:京东上的所有商品信息、客户评价、商品咨询、网友讨论等内容,是京东重要的经营资源,未经许可,禁止非法转载使用。
注:本站商品信息均来自于合作方,其真实性、准确性和合法性由信息拥有者(合作方)负责。本站不提供任何保证,并不承担任何法律责任。
印刷版次不同,印刷时间和版次以实物为准。
价格说明:
京东价:京东价为商品的销售价,是您最终决定是否购买商品的依据。
划线价:商品展示的划横线价格为参考价,该价格可能是品牌专柜标价、商品吊牌价或由品牌供应商提供的正品零售价(如厂商指导价、建议零售价等)或该商品在京东平台上曾经展示过的销售价;由于地区、时间的差异性和市场行情波动,品牌专柜标价、商品吊牌价等可能会与您购物时展示的不一致,该价格仅供您参考。
折扣:如无特殊说明,折扣指销售商在原价、或划线价(如品牌专柜标价、商品吊牌价、厂商指导价、厂商建议零售价)等某一价格基础上计算出的优惠比例或优惠金额;如有疑问,您可在购买前联系销售商进行咨询。
异常问题:商品促销信息以商品详情页“促销”栏中的信息为准;商品的具体售价以订单结算页价格为准;如您发现活动商品售价或促销信息有异常,建议购买前先联系销售商咨询。
加载中,请稍候...
加载中,请稍候...
加载中,请稍候...
加载中,请稍候...
加载中,请稍候...
加载中,请稍候...
加载中,请稍候...
浏览了该商品的用户还浏览了
加载中,请稍候...
价 格: 到
   
iframe(src='//www.googletagmanager.com/ns.html?id=GTM-T947SH', height='0', width='0', style='display: visibility:')使用 Spring Cloud Stream 构建消息驱动微服务 - 简书
使用 Spring Cloud Stream 构建消息驱动微服务
相关源码:
微服务的目的: 松耦合
事件驱动的优势:高度解耦
Spring Cloud Stream 的几个概念
Spring Cloud Stream is a framework for building message-driven microservice applications.
官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。
Spring Cloud Stream Application
应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中binder 交互,通过我们配置来 binding ,而
Spring Cloud Stream 的 binder 负责与中间件交互。所以,我们只需要搞清楚如何与
Spring Cloud Stream 交互就可以方便使用消息驱动的方式
Binder 是 Spring Cloud Stream 的一个抽象概念,是应用与消息中间件之间的粘合剂。目前 Spring Cloud Stream 实现了 Kafka 和 Rabbit MQ 的binder。
通过 binder ,可以很方便的连接中间件,可以动态的改变消息的
destinations(对应于 Kafka 的topic,Rabbit MQ 的 exchanges),这些都可以通过外部配置项来做到。
甚至可以任意的改变中间件的类型而不需要修改一行代码。
Publish-Subscribe
消息的发布(Publish)和订阅(Subscribe)是事件驱动的经典模式。Spring Cloud Stream 的数据交互也是基于这个思想。生产者把消息通过某个 topic 广播出去(Spring Cloud Stream 中的 destinations)。其他的微服务,通过订阅特定 topic 来获取广播出来的消息来触发业务的进行。
这种模式,极大的降低了生产者与消费者之间的耦合。即使有新的应用的引入,也不需要破坏当前系统的整体结构。
Consumer Groups
“Group”,如果使用过 Kafka 的童鞋并不会陌生。Spring Cloud Stream 的这个分组概念的意思基本和 Kafka 一致。
微服务中动态的缩放同一个应用的数量以此来达到更高的处理能力是非常必须的。对于这种情况,同一个事件防止被重复消费,只要把这些应用放置于同一个 “group” 中,就能够保证消息只会被其中一个应用消费一次。
Durability
消息事件的持久化是必不可少的。Spring Cloud Stream 可以动态的选择一个消息队列是持久化,还是 present。
bindings 是我们通过配置把应用和spring cloud stream 的 binder 绑定在一起,之后我们只需要修改 binding 的配置来达到动态修改topic、exchange、type等一系列信息而不需要修改一行代码。
基于 RabbitMQ 使用
以下内容源码:
Spring Cloud Stream 基本用法,需要定义一个接口,如下是内置的一个接口。
public interface Sink {
String INPUT = "input";
@Input("input")
SubscribableChannel input();
注释__ @Input__ 对应的方法,需要返回 __ SubscribableChannel __ ,并且参入一个参数值。
这就接口声明了一个__ binding __命名为 “input” 。
其他内容通过配置指定:
destination: mqTestDefault
destination:指定了消息获取的目的地,对应于MQ就是 exchange,这里的exchange就是 mqTestDefault
@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
// 监听 binding 为 Sink.INPUT 的消息
@StreamListener(Sink.INPUT)
public void input(Message&String& message) {
System.out.println("一般监听收到:" + message.getPayload());
public static void main(String[] args) {
SpringApplication.run(Application.class);
定义一个 class (这里直接在启动类),并且添加注解@EnableBinding(Sink.class) ,其中 Sink 就是上述的接口。同时定义一个方法(此处是 input)标明注解为 __ @StreamListener(Processor.INPUT) __,方法参数为 Message 。
启动后,默认是会创建一个临时队列,临时队列绑定的exchange为 “mqTestDefault”,routing key为 “#”。
所有发送 exchange 为“mqTestDefault” 的MQ消息都会被投递到这个临时队列,并且触发上述的方法。
以上代码就完成了最基本的消费者部分。
消息的发送同消息的接受,都需要定义一个接口,不同的是接口方法的返回对象是 MessageChannel,下面是 Spring Cloud Stream 内置的接口:
public interface Source {
String OUTPUT = "output";
@Output("output")
MessageChannel output();
这就接口声明了一个 binding 命名为 “output” ,不同于上述的 “input”,这个binding 声明了一个消息输出流,也就是消息的生产者。
destination: mqTestDefault
contentType: text/plain
contentType:用于指定消息的类型。具体可以参考
destination:指定了消息发送的目的地,对应 RabbitMQ,会发送到 exchange 是 mqTestDefault 的所有消息队列中。
代码中调用:
@SpringBootApplication
@EnableBinding(Source.class)
public class Application implements CommandLineRunner {
@Autowired
@Qualifier("output")
public void run(String... strings) throws Exception {
// 字符串类型发送MQ
System.out.println("字符串信息发送");
output.send(MessageBuilder.withPayload("大家好").build());
public static void main(String[] args) {
SpringApplication.run(Application.class);
通过注入MessageChannel的方式,发送消息。
通过注入Source 接口的方式,发送消息。 具体可以查看样例
以上代码就完成了最基本的生产者部分。
自定义消息发送接收
自定义接口
Spring Cloud Stream 内置了两种接口,分别定义了 binding 为 “input” 的输入流,和 “output” 的输出流,而在我们实际使用中,往往是需要定义各种输入输出流。使用方法也很简单。
interface OrderProcessor {
String INPUT_ORDER = "inputOrder";
String OUTPUT_ORDER = "outputOrder";
@Input(INPUT_ORDER)
SubscribableChannel inputOrder();
@Output(OUTPUT_ORDER)
MessageChannel outputOrder();
一个接口中,可以定义无数个输入输出流,可以根据实际业务情况划分。上述的接口,定义了一个订单输入,和订单输出两个 binding。
使用时,需要在 @EnableBinding 注解中,添加自定义的接口。
使用 @StreamListener 做监听的时候,需要指定 OrderProcessor.INPUT_ORDER
defaultBinder: defaultRabbit
inputOrder:
destination: mqTestOrder
outputOrder:
destination: mqTestOrder
如上配置,指定了 destination 为 mqTestOrder 的输入输出流。
分组与持久化
上述自定义的接口配置中,Spring Cloud Stream 会在 RabbitMQ 中创建一个临时的队列,程序关闭,对应的连接关闭的时候,该队列也会消失。而在实际使用中,我们需要一个持久化的队列,并且指定一个分组,用于保证应用服务的缩放。
只需要在消费者端的 binding 添加配置项 spring.cloud.stream.bindings.[channelName].group = XXX 。对应的队列就是持久化,并且名称为:mqTestOrder.XXX。
rabbitMQ routing key 绑定
用惯了 rabbitMQ 的童鞋,在使用的时候,发现 Spring Cloud Stream 的消息投递,默认是根据 destination + group 进行区分,所有的消息都投递到 routing key 为 “#‘’ 的消息队列里。
如果我们需要进一步根据 routing key 来进行区分消息投递的目的地,或者消息接受,需要进一步配,Spring Cloud Stream 也提供了相关配置:
inputProductAdd:
destination: mqTestProduct
group: addProductHandler
# 拥有 group 默认会持久化队列
outputProductAdd:
destination: mqTestProduct
inputProductAdd:
bindingRoutingKey: addProduct.*
# 用来绑定消费者的 routing key
outputProductAdd:
routing-key-expression: '''addProduct.*'''
# 需要用这个来指定 RoutingKey
spring.cloud.stream.rabbit.bindings.[channelName].consumer.bindingRoutingKey
指定了生成的消息队列的routing key
spring.cloud.stream.rabbit.bindings.[channelName].producer.routing-key-expression 指定了生产者消息投递的routing key
DLX:Dead-Letter-Exchange(死信队列)。利用DLX, 当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX。消息变成死信一向有一下几种情况:
消息被拒绝(basic.reject/ basic.nack)并且requeue=false
消息TTL过期(参考:)
队列达到最大长度
DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性,当这个队列中有死信时,RabbitMQ就会自动的将这个消息重新发布到设置的Exchange上去,进而被路由到另一个队列,可以监听这个队列中消息做相应的处理。
Spring Cloud Stream 中使用
spring.cloud.stream.rabbit.bindings.[channelName].consumer.autoBindDlq=true
spring.cloud.stream.rabbit.bindings.[channelName].consumer.republishToDlq=true
配置说明,可以参考
Spring Cloud Stream 最大的方便之处,莫过于抽象了事件驱动的一些概念,对于消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件,切换topic。使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。
Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智能路由,微代理,控制总线)。分布式系统的协调导致了样板模式, 使用Spring Cloud开发人员可以快速地支持实现这些模式的服务和应用程序。他们将在任何分布式...
本文参考了:http://blog.didispace.com/springcloud7/http://blog.didispace.com/springcloud8/ 问题:如何实现对配置信息的实时更新?虽然,我们已经能够通过/refresh接口和Git仓库的Web Ho...
前言 在微服务架构的系统中,我们通常会使用轻量级的消息代理来构建一个共用的消息主题让系统中所有微服务实例都能连接上来,由于该主题中产生的消息会被所有实例监听和消费,所以我们称它为消息总线。在总线上的各个实例都可以方便地广播一些需要让其他连接在该主题上的实例都知道的消息,例如...
序言 你在系统中是否写过这样的接口:客户端访问服务器,服务器进行了大量逻辑/耗时操作之后,才能将结果返回给客户端,而这时,客户端的连接或许已经因为超时而关闭了。为了能够及时的给客户端返回数据, 在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理...
重要说明:本方是翻译自https://docs.spring.io/spring-cloud-dataflow/docs/1.3.0.M2/reference/htmlsingle/#getting-started这个文章,感谢Sabby AnandanMarius Bog...
四月了 再也抑制不了自己对大理的情感 一跺脚请了一周的假 来了 延误下午到的昆明 吃了地道的云南米线 地道的在昆明 在昆明 在昆明 ! 逛了逛小吃街 坐了通宵火车 昆明一大理 晚上12点就熄灯 很安静 温度也适宜 火车开的很慢 很快就入睡了 一觉醒来大理到了 来了大理 自然...
乐游商旅编者按,刷卡买包烟欠3亿:20日,四川成都市民吴先生出门买个烟,却收到系统提示信用卡无法继续使用,短信显示欠款元。这可把吴先生吓坏了,被盗刷、中病毒? 银行回应:不是中了病毒 随后,吴先生连忙拨打了平安银行的24小时客服电话,“可是根本没有人...
不知道你有没有相同的感觉,每天都在感受闹钟的千呼万唤。感谢这魔音绕耳的感觉,为每天上班打卡不迟到提供坚强的后盾。现在刚好22:45分,晚睡倒计时。睡前3秒,必看闹钟是否已走上正轨,时间点定好在第二天的6:00,这是闹钟上岗,白日梦下岗的点。 和闹钟也算是相亲相爱过了大半辈子...
引言 XML是一种数据交换格式。两个人聊天,采用彼此都听得懂的语言,数据交换格式就是计算机或程序之间交流的语言。数据交换格式主要分为三种: 纯文本 XML JSON 定义 可扩展标记语言(英语:eXtensible Markup Language,简称: XML),是一种标...
《序》 理想是指路明灯。没有理想,没有坚定的方向。没有方向,没有生活
一托尔斯泰 当这本杂志送到您的手上时,离它上一次问世已经过去两年的时间了,那时她叫1城,一本名不见经传的DM单页。时逢我那时做经营,也成为了她的客户,创始人:老胡。 一个风风火火的女子,条纹控、黄...270501E_SpringCloudStream(分组与消息持久化)(000)-学习考试视频-搜狐视频
270501E_SpringCloudStream(分组与消息持久化)(000)
视频介绍:
270501E_SpringCloudStream(分组与消息持久化)(000)
推荐出品人spring(4)
springcloud
java(10)
微服务架构(6)
Spring Cloud Stream,用精简的语言概括,他本质上其实就是让开发人员使用消息中间件变得简单。
他基于Spring Integration并利用提供了自动配置,提供了极为方便的消息中间件使用体验。看到这里会有人认
为这个开源项目没有什么了不起,基于这个点的开源包有很多,甚至自己已经熟知某种中间件的编码语法何苦重复造轮子, 我就是这当中的一员。
不识庐山真面目,只缘身在此山中
随着深入了解,我发现Stream仅是Pivotal公司在大数据处理方向布局的一个子集Spring Cloud Data Flow(一款可自由组合的云原生微服务,用于收集、转化、存储和分析数据)。并没有在Netflix OSS止步不前,而是继续定义和完善Pivotal堆栈,把结构化平台的优势带到全方位开发方案当中去。
企业开发中,业务是重要的一部分,数据也同样是重要的一部分,用Netflix OSS搞定业务架构,Spring Cloud Data Flow应对数据架构,这事就变得有意思,而使用Stream可以统一业务系统和数据系统的中间件编程模型,作为技术统一规划的角度来看,让我最终决定在生产环境中去尝试Stream。
截止题主止笔,Stream已经支持Kafka/Rabbit MQ/Redis/Gemfire。
一. 同步与异步
使用消息中间件不难,如何用的恰当却是门学问。
同步与异步这个基础性的选择会不可避免的引导我们使用不同的实现。
如果使用同步通信,发起一个远程服务调用后,调用方会阻塞自己并等待整个操作的完成。如果使用异步通信,调用方不需要等待操作完成就可以返回,甚至可能不需要关心这个操作是否完成与否。两种方式都有自己适用的场景,我们不扩展讨论,这里只讨论某些相比之下更适用于事件驱动的场景
这两种不同的通信模式有着各自的协作风格,既&请求/响应&和&基于事件&。
对于前者,通常是编排风格,我们会依赖某个中心大脑来指导并驱动整个流程,缺点是中心控制点承担了太多的职责,他会成为网状结构的中心枢纽及逻辑的起点,这个方法容易导致少量的“上帝”服务,而与其打交道的服务通常会沦为“贫血”的、基于CRUD的服务。
对于后者,通常是协同风格,客户端发起的不是一个请求,而是发布一个事件,然后其他协作者接收到该事件,并知道该怎么做。我们从来不会告知任何人去做任何事,基于事件的系统天生就是异步的。整个系统都很聪明,业务逻辑并非存在某个核心大脑,而是分布在不同的协作者中。基于事件的协作方式耦合性很低,这意味着你可以在不改变客户端代码的情况下,对该事件添加新的订阅者来完成新增的功能需求。
二. Stream应用模型
Stream能自动发现并使用类路径中的binder,你也可以引入多个binders并选择使用哪一个,甚至可以在运行时根据不同的channels选择不同的binder实现。
三. 消费者分组
发布-订阅模型可以很容易地通过共享topics连接应用程序,但创建一个应用多实例的的水平扩展能力同等重要。当这样做时,应用程序的不同实例被放置在一个竞争的消费者关系中,其中只有一个实例将处理一个给定的消息,这种分组类似于Kafka consumer groups,灵感也来源于此。每个消费者通过spring.cloud.stream.bindings.&channelName&.group指定一个组名称,channelName是代码中定义好的通道名称,下文会有介绍。
消费者组订阅是持久的,如果你的应用指定了group,那即便你这个组下的所有应用实例都挂掉了,你的应用也会在重新启动后从未读取过的位置继续读取。但如果不指定groupStream将分配给一个匿名的、独立的只有一个成员的消费组,该组与所有其他组都处于一个发布-订阅关系中,还要注意的是匿名订阅不是持久的,意味着如果你的应用挂掉,那么在修复重启之前topics中错过的数据是不能被重新读取到的。所以为了水平扩展和持久订阅,建议最好指定一个消费者组。
首先,你要放空你之前kafka分区的相关知识,从零开始去领会Stream分区,以免造成理解上的困扰。
Stream提供了一个通用的抽象,用于统一方式进行分区处理,和具体使用的中间件无关,因此分区可以用于自带分区的代理(如kafka)或者不带分区的代理(如rabbiemq),这句话要反复读几遍。
Stream支持在一个应用程序的多个实例之间数据分区,N个生产者的数据会发送给M个消费者,并保证共同的特性的数据由相同的消费者实例处理,这会提升你处理能力。
Stream使用多实例进行分区数据处理是一个复杂设置,分区功能需要在生产者与消费者两端配置,SpringCloudDataFlow可以显著的简化过程,而且当你没有用SpringCloudDataFlow时,会给你的配置带来一些不便,需要你提前规划好,而不能再应用启动后动态追加。
下面是生产者有效的和典型的配置(Output Bindings)
spring.cloud.stream.bindings.&.producer.partitionKeyExpression=payload.id
spring.cloud.stream.bindings.&.producer.partitionCount=5
分区key的值是基于partitionKeyExpression计算得出的,用于每个消息被发送至对应分区的输出channel,partitionKeyExpression是spirng EL表达式用以提取分区键
下面是消费者有效的和典型的配置(Input Bindings)
spring.cloud.stream.bindings.input.consumer.partitioned=true
spring.cloud.stream.instanceIndex=3
spring.cloud.stream.instanceCount=5
instanceCount表示应用实例的总数,instanceIndex在多个实例中必须唯一,并介于0~(instanceCount-1)之间。实例的索引可以帮助每个实例确定唯一的接收数据的分区,正确的设置这两个值十分重要,用来确保所有的数据被消费,以及应用实例接收相互排斥不重复消费。
五. 编程模型
引入pom依赖
&org.springframework.cloud&
&spring-cloud-starter-stream-kafka&
也可以引入spring-cloud-stream-binder-kafka,这个少依赖了web和actuater的功能,这两个功能根据项目实际情况定制更合理,不需要的情况下没必要依赖。同理你可以引入spring-cloud-stream-binder-redis/rabbit
配置binder参数
SpringBoot项目启动会扫描到classpath中的kafka binder,并会用默认参数去连接本地的kafka服务和zookeeper服务,如果本地没有默认配置启动的这两个服务,一定会启动失败。所以我们要指定配置。
spring.cloud.stream.kafka.binder.brokers=10.79.96.52:9092
spring.cloud.stream.kafka.binder.zk-nodes=10.79.96.52:2182
spring.cloud.stream.kafka.binder.minPartitionCount=1
spring.cloud.stream.kafka.binder.autoCreateTopics=true
spring.cloud.stream.kafka.binder.autoAddPartitions=false
本例中配置的后三项配置值和默认值一致,当然可根据自己的需求定义。
这种配置有些讨巧,这个是kafka binder提供的Binder-Specific Configuration,这种方式让配置更看上去更清爽一些,但如果按照Stream的配置语义,应该如下配置
spring.cloud.stream.bindings.&.binder=&
spring.cloud.stream.binders.&.type=kafka
spring.cloud.stream.binders.&.environment.spring.cloud.stream.kafka.binder.brokers=10.79.96.52:9092
spring.cloud.stream.binders.&.environment.spring.cloud.stream.kafka.binder.zk-nodes=10.79.96.52:2182
先为channel对应的binder设置一个&binderName&,再根据这个&binderName&设置binder的type和environment。如果我们的应用只连接一个kafka,那我们完全可以用上面的配置方法,看起来更简洁。如果我们的应用要连接多个kafka服务,那我们必须用下面的配置方案,通过&binderName&来完成不同kafka服务的识别与隔离。
Stream应用可以有任意数目的input和output通道,可通过@Input和@Output注解在接口中定义。注解默认通道名字为方法名 ,当然也可以自定义channel名字,@Input(&myinputchannel&),下面的例子就完成了通道的定义,Stream在运行时会自动生成这个接口的实现类。
public interface Barista {
SubscribableChannel orders();
MessageChannel hotDrinks();
MessageChannel coldDrinks();
Stream为了方便开发者,内置了三个接口,在简单业务背景下,我们不用如上所述的去定义通道,直接利用预置通道会更便捷。这三个接口分别是Source,Sink,Processor。
Source用于有单个输出(outbound)通道的应用,通道名称为output
public interface Source {
String OUTPUT = &output&;
@Output(Source.OUTPUT)
MessageChannel output();
Sink用于有单个输入(inbound)通道的应用,通道名称为input
public interface Sink {
String INPUT = &input&;
@Input(Sink.INPUT)
SubscribableChannel input();
Processor用于单个应用同时包含输入和输出通道的情况,通道名称分别为output和input。
public interface Processor extends Source, Sink {
配置通道绑定参数
输入通道的绑定,本例中使用Sink定义输入通道,根据上面所述&channelName&=input
spring.cloud.stream.bindings.input.destination=wsh-topic-01
spring.cloud.stream.bindings.input.group=s3
spring.cloud.stream.bindings.input.consumer.concurrency=1
spring.cloud.stream.bindings.input.consumer.partitioned=false
输出通道的绑定,本例中使用Source定义输出通道,根据上面所述&channelName&=output
spring.cloud.stream.bindings.output.destination=wsh-topic-01
spring.cloud.stream.bindings.output.content-type=text/plain
spring.cloud.stream.bindings.output.producer.partitionCount=1
通过@EnableBinding触发绑定
现在binder配置好了,channel也配置好了,需要做的就是将binder和channel在代码中绑定起来。
@EnableBinding(Source.class)
public class SendService {
@Autowired
public void sendMessage(String msg) {
source.output().send(MessageBuilder.withPayload(msg).build());
} catch (Exception e) {
e.printStackTrace();
消费者通过@StreamListener监听
@EnableBinding(Sink.class)
public class MsgSink {
@StreamListener(Sink.INPUT)
public void messageSink(Object payload) {
System.out.println(&Received: & + payload);
配置分区、分组信息
具体配置上文有提到,不重复描述,额外提一下spring.cloud.stream.kafka.binder.autoAddPartitions这个配置默认是false,通常情况下会产生无法启动的问题,强烈建议配置成true。
这里面的原理大致描述如下,比如你启动了一个生产者并配置producer.partitionCount=5,那么Stream底层是需要kafka提供5个kafka分区(注意Stream的5个分区
和 kafka的5个分区此时相等是巧合,请分开理解),如果kafka中如果没有目标topics,Stream会在启动的时候在kafka中创建5个分区,并成功启动,但是如果kafka中已经有了目标topics,并且目标topics不足5个分区,那么生产者启动失败。所以必须设置autoAddPartitions=true,生产者才能在启动的时候自动将kafka中的目标topics分区扩展成5个,方能启动成功。
如果此刻生产者启动成功,你会启动消费者,如果消费者你规划了5个实例,每个实例支持2个并发(concurrency=2),那么每个Stream底层需要5*2=10个kafka分区(而此时kafka的目标topics只有5个分区),消费者也会启动失败,这种情况下需要将消费者的autoAddPartitions=true。
autoAddPartitions=true
有得也有失。得到的上文已经描述,这里再提一下失去的。 还拿上文举例,生产者启动了5个kafka分区,所以生产者实例只会往这5个分区中输出,这样就导致消费者扩展出来的另外5个分区收不到数据,所以要重启生产者,用以重新计算生产者与底层kafka分区的关系。 官方文档提到使用SpringCloudDataFlow可以显著的简化过程,我还没有尝试。
六. Content Type
@StreamListener是Stream提供的注解,Spring
Integration也有一个类似功能的注解@ServiceActivator,两者都有监听通道功能,区别是@StreamListener可以根据contentType去解析数据,比如一个json格式的数据,@StreamListener可以自动解析成对象Vote
@EnableBinding(Processor.class)
public class TransformProcessor {
@Autowired
VotingService votingS
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public VoteResult handle(Vote vote) {
return votingService.record(vote);
七. 项目代码
文章出处:Spring Cloud Blog http://blog.spring-cloud.io/blog/sc-stream.html
访问:33360次
排名:千里之外
原创:15篇
转载:49篇
评论:13条
(2)(3)(8)(5)(11)(10)(11)(15)(1)

我要回帖

更多关于 spring stream kafka 的文章

 

随机推荐