springboot入门14 – Kafka应用

简述

这几天优化了一下之前写的一个springboot kafka组件。比较起原生的spring-kafka来,我希望能够简化kafka的使用,可以更聚焦于具体的消息处理逻辑。

接下来的内容是这个组件的用法。

使用方法

添加依赖

这个组件已经提交到了maven中央仓库,可以直接通过依赖的形式引入:

0.2.2是这两天刚发布的一个版本。

消费者Processor

kafka-spring-boot-starter这个组件已经完成了kafka消费者的主要功能。

对于开发者来说,可以不必关注KafkaConsumer的创建,只需要实现Processor接口并注入到容器中即可。

下面是一个简单的示例:

如示例中,通过@Component注解完成了Processor实现类的实例的注入,并为注入的Bean提供了一个名称:zhyyy。记住这个名称,在之后的配置文件中会用到。

使用生产者

kafka-spring-boot-starter会根据配置主动创建KafkaProducer。开发使用时可以直接从容器中获取ProducerTemplate实例来发送消息:

如果写入kafka的消息的key和value的序列化方案采用的都是默认的字符串(反)序列化方案(StringDeserializerStringSerializer),可以使用StringProducerTemplate实例:

发送消息时酌情调用不同的send()方法:

配置

下面是一个最简单的配置:

如上配置中:

  • test-group00既是配置项的ID,也是消费组ID
  • bootstrap-servers我想不需要多做解释。
  • topics对应的是一个数组结构,也可以写作[test-topic1][test-topic1,test-topic2],即支持同一个kafka集群上多个类似topic的统一处理
  • consumer是消费者相关配置,processor对应的是Processor实现类的Bean名称,count标识的是应用内消费线程的数量

默认的序列化方案采用的是字符串序列化方案。

虽然在配置中没有体现,但kafka-spring-boot-starter组件会基于已有的信息创建KafkaProducer,使用时可以通过ProducerTemplate执行消息发送。

比较完整的配置是这样子的:

其中common模块下是一些通用的配置,config模块下则是一或多组具体的配置(这里是两组)。common下的配置会被config下的配置覆盖。

此外还独立出来了一些常用的配置项,如autoOffsetResetkeyDeserializer等,以便在使用时进行配置。

其他

示例应用在 github / spring-boot-kafka

kafka-spring-boot-starter这个组件的源码也在 github 。如果有定制化的需求可以据此进行调整。

End!


已有2条评论 发表评论

  1. sss /

    看了下,消费呢

    1. 白42 / 本文作者

      消费逻辑在Processor实例中

回复给白42 取消回复

此站点使用Akismet来减少垃圾评论。了解我们如何处理您的评论数据