简述 这几天优化了一下之前写的一个springboot kafka组件。比较起原生的spring-kafka来,我希望能够简化kafka的使用,可以更聚焦于具体的消息处理逻辑。 接下来的内容是这个组件的用法。 使用方法 添加依赖 这个组件已经提交到了maven中央仓库,可以直接通过依赖的形式引入: 0.2.2是这两天刚发布的一个版本。 消费者Processor kafka-spring-boot-starter这个组件已经完成了kafka消费者的主要功能。 对于开发者来说,可以不必关注KafkaConsumer的创建,只需要实现Processor接口并注入到容器中即可。 下面是一个简单的示例: 如示例中,通过@Component注解完成了Processor实现类的实例的注入,并为注入的Bean提供了一个名称:zhyyy。记住这个名称,在之后的配置文件中会用到。 使用生产者 kafka-spring-boot-starter会根据配置主动创建KafkaProducer。开发使用时可以直接从容器中获取ProducerTemplate实例来发送消息: 如果写入kafka的消息的key和value的序列化方案采用的都是默认的字符串(反)序列化方案(StringDeserializer和StringSerializer),可以使用StringProducerTemplate实例: 发送消息时酌情调用不同的send()方法: 配置 下面是一个最简单的配置: 如上配置中: test-group00既是配置项的ID,也是消费组ID bootstrap-servers我想不需要多做解释。 topics对应的是一个数组结构,也可以写作[test-topic1]或[test-topic1,test-topic2],即支持同一个kafka集群上多个类似topic的统一处理 consumer是消费者相关配置,processor对应的是Processor实现类的Bean名称,count标识的是应用内消费线程的数量 默认的序列化方案采用的是字符串序列化方案。 虽然在配置中没有体现,但kafka-spring-boot-starter组件会基于已有的信息创建KafkaProducer,使用时可以通过ProducerTemplate执行消息发送。 比较完整的配置是这样子的: 其中common模块下是一些通用的配置,config模块下则是一或多组具体的配置(这里是两组)。common下的配置会被config下的配置覆盖。 此外还独立出来了一些常用的配置项,如autoOffsetReset,keyDeserializer等,以便在使用时进行配置。 其他 示例应用在 github / spring-boot-kafka。 kafka-spring-boot-starter这个组件的源码也在 github 。如果有定制化的需求可以据此进行调整。 End!
[阅读更多...]-
springboot入门14 – Kafka应用
-
Kafka java.net.SocketTimeoutException
手上有一个消费Kafka的服务,这个服务每隔一段时间使用SimpleConsumer从kafka集群获取数据。Kafka的版本是0.8.21。今天这个服务一直在报错:SocketTimeoutException。异常信息如下: 从异常信息上来看是网络连接超时。翻看了一下任务的运行日志发现是在连接Kafka集群的k3节点时出的错。 尝试telnet连接k3节点的broker是可以的。这样子还报错就比较奇怪了。 使用glances查看了一下消费节点的本地运行环境,发现各项指标也都挺正常的。 登录到k3节点,查看kafka的运行日志也能够看到消费节点的连接记录: 查看了一下使用high-level consumer的其他服务也是正常的。 百思不得其解。但是问题还得解决,随手查看了一下k3节点的句柄数,发现一个进程的句柄数高得有点儿离谱哎: k3节点每个进程可以打开的最大句柄数: 53466就是k3节点上的kafka broker进程: 此时有必要看下这个进程的网络连接了: 发现有大量的CLOSE_WAIT和TIME_WAIT连接: 统计了下CLOSE_WAIT和TIME_WAIT连接的数量,好像有点儿多: 可以看出来所有连接中CLOSE_WAIT的记录占了绝大部分,和CLOSE_WAIT相关的连接又多是在j0节点上。 到j0节点查找和端口51436相关的进程,发现是一个kafka-manager服务。不是生产服务,直接关掉了事。 回到k3节点,重启该节点的broker。再进行测试,异常消失。持续观察几天,kafka的连接数是正常的,说明问题已经得到修复。 ##########
[阅读更多...] -
Kafka报错:Error reading field ‘topics’
在kafka的server.log中发现了如下报错信息: 在StackOverflow上找到了类似的问题。知道报这个错的是因为kafka服务端的版本和kafka客户端的版本不一致导致的。 解决方案很简单: 调整kafka server端版本; 调整kafka客户端版本。 #######
[阅读更多...] -
Kafka警告:No checkpointed highwatermark is found for partition
测试环境使用的Kafka出现了点儿问题,服务器被停止,所有的topic日志不知怎的被清空。一番大清洗(清理zookeeper节点、kafka topic log)、重启Kafka、创建分区后发现报了如下的错误: 找了些资料知道了报警的原因。 日志中提到的high watermark指的是一个partition中上一次提交的消息的offset。报警是因为topic刚创建、消费已开始、但是还没有生产者推送消息过来、对应topic没有任何offset的记录导致的。 所以该警告可以忽略,待有消息推送过来即可。 相关资料 Intra-cluster Replication in Apache Kafka #####
[阅读更多...] -
Kafka报错:connection reset by peer
在kafka的server.log中发现持续爆出如下错误信息: 查了下原因,是因为kafka的客户端异常关闭,而Kafka仍在向其推送数据导致的。 没有找到太好的解决办法。最后使用了一个笨方法:先停掉所有的生产者和消费者服务,再停掉Kafka;而后重启Kafka、重启停掉的服务。这样异常就消失了。 后来又发现这个异常断续出现了几次,因为是测试环境,懒得去管它,神奇的是居然最后也自动恢复了。 ######
[阅读更多...] -
Kafka 调整partiton数目和replica factor
调整partiton 调整partition可以直接执行如下命令: 注意替换topicName、$ZK_HOST_NODE和partitionNum三个参数。 调整replica factor 调整replica-factor需要先创建一个json描述文件replica.json,大致如下: 在描述文件中说明分区和副本所在broker的Id的映射。 而后在replica.json所在的位置执行如下命令: 另外,kafka-manager是个好东西,可以直接在界面上完成partiton数目的调整。可惜不能调整replica-factor。 #####
[阅读更多...] -
Kafka java.nio.channels.ClosedChannelException
最近开始部署一个工程时遇到了Kafka消费的问题,报错信息如下: 相关的工程已经在测试环境测试过了,部署到新的环境上却报错了。所以这个问题应该是环境问题或配置问题。 进入Kafka源码可以发现这个问题是在连接Kafka Broker时出现的。所以这个问题应该是网络连接导致的。找运维部的同事看了一下,确认是相关的节点与Kafka集群的通信被屏蔽了。解除屏蔽后问题就得到修复了。 在StackOverflow上也查过相关的问题,多是类似的缘故,比如host的错误配置等。
[阅读更多...] -
kafka0.9 Consumer poll()方法阻塞
最近项目中用到了Kafka0.9,在使用0.9的Consumer API的时候遇到了poll()方法阻塞的问题。程序没有报任何错误,只是持续在poll()方法处阻塞。深入poll()方法可以看到是在AbstractCoordinator.ensureCoordinatorKnown()方法中出现了死循环。在循环中不停地输出如下DEBUG日志: 需要关注的是这处信息: 看样子是kafka的连接出了问题。不过我的Producer向kafka写数据是没问题的,使用kafka提供的消费工具kafka-console-consumer.sh执行消费也是没问题的。 在网上找到了一些关于这个现象的解释:在客户端进行消费之前会为ConsumerGroup向Kafka集群申请coordinater节点。kafka集群在配置或分配coordinater节点的时候可能会短暂的报这个错误。 我这里不是短暂的报错,而是陷入了死循环。目前可以想到的就是我的kafka集群配置出现问题了。在简单粗暴地将zookeeper上kafka的配置完全删掉再重启Kafka后,消费可以正常执行了。至于问题具体出在哪儿还没有找到。目前只能是持续关注,等问题再次出现了。 就这样。 还有一点,在查询解决方案的过程中看到:如果kafka是部署在Docker上,出现了这样的问题需要检查有没有配置环境变量ADVERTISE.HOST.NAME和ADVERTISE.HOST.PORT。 ######
[阅读更多...] -
获取Kafka Consumer的offset
从kafka的0.8.11版本开始,它会将consumer的offset提交给ZooKeeper。然而当offset的数量(consumer数量 * partition的数量)的很多的时候,ZooKeeper的适应性就可能会出现不足。幸运的是,Kafka现在提供了一种理想的机制来存储Consumer的offset。Kafka现在是将Consumer的offset写入到一个分布式持久化的、高可用的topic中。开发者可以通过消费这个topic的方式来获取Consumer的offset。为了提升访问速度,kafka还提供了offset的内存缓存。也就是说,现在提交offset是通过普通的生产者请求(代价并不高)来完成的,而获取offset则是通过在内存中的快速查询完成的。 Kafka的官方文档描述了这个特性是如何工作的,以及如何将offset从zookeeper迁移到kafka。下面的代码演示了如何利用基于kafka存储offset的特性。 第一步:通过发送consumer元数据请求到任意Broker来发现并连接offset manager: 第二步:发送OffsetCommitRequest 或者 OffsetFetchRequest到offset manager: 原文:https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka ####
[阅读更多...] -
Kafka high level consumer
为什么选择High Level Consumer 很多时候用户只是想从kafka中读取数据,对于如何处理消息的offset则不怎么关注。在抽象了Kafka中消费事件的大部分细节后,High Level Consumer可以让用户使用起来更为简单。 首先需要知道的就是High Level Consumer保存了从zookeeper中读取某个partition最后的offset。这个offset基于读取进程一开始时提供给kafka的名称保存。这个名称可以理解为Consumer Group的名称。 Consumer Group的名称在kafka集群中是一个全局的属性,因此在启动新的Consumer程序前,注意要关掉“旧的”Consumer。当使用已有的Consumer Group名称启动一个新的Consumer的时候,Kafka将会把这个进程的线程添加到一个现有的线程组中来消费Topic,并触发一次“re-balance”。在“re-balance”中,kafka将会把可用的partition分配给可用的线程。这就有可能把一个partition移交给另一个进程。如果同时使用新的和旧的业务处理逻辑,就很有可能把一些消息导向旧的业务处理逻辑。 设计一个High Level Consumer 首先要说明的是使用High Level Consumer的可以(或者说应该)是一个多线程应用。围绕着topic中partition的数量定义的线程模型有如下几个明显的特征: 如果提供的线程数量多于topic的partition的数量,一些线程将永远接收不到任何消息; 如果提供的线程数量少于topic的partition数量,一些线程将会收到来自多个partition的数据; 如果一个线程对应着多个partition,那么接收到的消息的有序性将会得不到保证,除非在partition内部的offset是序列化的。举例说,你可能会从partition10中获取5条消息,从partition11中获取6条消息,那么有可能在你从partition10中获取5条消息后,又继续从partition10中获取了5条消息,尽管此时partition11中的消息是可用的; 添加进程或线程将会导致re-balance,这就有可能会导致线程对应的partition会重新分配。 现在就可以尝试从Kafka集群读取数据了,如果没有新数据的话,读取数据的进程可能会阻塞。 如下是一个非常简单的Kafka High Level Consumer线程实例: 程序中值得关注的部分是“while (it.hasNext()) ”这一句,程序就是通过这一句不停地从Kafka集群读取数据的——直到用户主动停止线程。 配置测试应用 和SimpleConsumer不同的是,High Level Consumer为我们做了大量的信息记录以及故障处理工作,然而我们还是需要告诉Kafka将一些信息存储在哪儿。在下面的方法中,定义了创建一个High Level Consumer最基本的配置信息: 简单说明下这里的配置参数: “zookeeper.connect”指明了如何在kafka集群中找到启动的Zookeeper实例。Kafka使用zookeeper保存了当前ConsumerGroup从指定topic中消费消息的偏移量以及对应partition信息; “group.id”定义了当前进程所代表的Consumer Group; “zookeeper.session.timeout.ms”定义了kafka等待Zookeeper响应请求(读或者写请求)的时间,时间单位是毫秒,如果超过时间,Kafka就会放弃并继续消费消息; “zookeeper.sync.time.ms”表示了没有发生故障时,zookeeper的一个follower和master同步的时间间隔; “auto.commit.interval.ms”定义了多久更新一次写入到zookeeper的消费offset信息。注意,因为这个提交频率是基于时间的而非基于消费的消息的,如果在提交更新时发生了错误,就有可能重新消费消息。 关于配置的更多信息可以查看这里:http://kafka.apache.org/08/configuration.html。 创建线程池 在示例程序中使用java的“java.util.concurrent”包来管理线程,使用这个包可以很方便的创建一个线程池: 首先我们创建了一个Map用来告诉Kafka我们要为目标topic启动多少个线程。我们调用consumer.createMessageStreams方法来传递这个信息给Kafka。这个方法的返回值是一个Map对象,表示了topic和监听topic的KafkaStream的映射关系。注意,我们的示例程序中只向Kafka请求了一个topic,实际上我们可以请求多个topic的信息,只需要在topicCountMap加入对应的信息即可。 最后,我们成功创建了一个线程池,并为每个线程创建了一个ConsumerTest对象作为具体的业务逻辑。 安全退出和错误处理 前面已经提过,Kafka并不会在每次读取消息后就立即更新保存在Zookeeper中的消息offset,而是每隔一段时间更新一次。这就有可能产生一小段延迟,比如我们的程序已经消费了消息,但是实际上此时仍未同步到Zookeeper。如果此时客户端退出了或者崩溃了,那么此前我们消费过的消息可能会再次出现。 还要注意,有时候Broker故障或者其他事件导致的Partition的leader的改变也有可能导致消息的重复消费。 为了避免这种情况的发生,需要尽可能保证安全退出,不要使用“kill -9”这种指令。 在我们的示例程序中,主线程执行到最后sleep了10秒钟。这样后台消费线程就有了10秒钟时间消费stream中的数据。因为已经开启了自动提交,Kafka将会每秒钟提交一次offset。最后,主线程调用了shutdown方法,这个方法会先调用每个消费者线程的shutdown方法,而后才会调用ExecutorService的shutdown方法,最后会等待ExecutorService完成所有未完成的工作。这给了消费者线程一些时间来处理完成仍在stream中的少量未处理消息。如果消费者线程已经处理完了所有来自server的消息,此时关闭消费者线程,Stream的迭代器的hasNext()方法将会返回false。这样消费者线程也可以优雅的退出。另外,如果开启了自动提交,调用消费者线程的consumer方法将会在退出前提交最终的offset。 在实际工作中,通常采用的工作模式是让主线程无限期的睡眠,通过shutdown hook的方式实现安全退出。(有必要了解一下java的hook机制)。 运行示例程序 运行示例程序需要如下命令行参数: 包含端口号的Zookeeper连接字符串; 这次消费进程要使用的Consumer Group名称; 消费的消息所属的Topic; 此次消费进程启动的线程数目。 例如: 这个命令表示将会通过连接主机server01.myco.com1的2181端口与其上的Zookeeper进行通信,请求了名为“myTopic”的Topic的全部partition,并启动了4个线程来消费这些partition上的消息。这个示例中使用的Consumer Group是“group3”。 完整的代码如下(做了折叠): 参考文档 https://cwiki.apache.org/confluence/display/KAFKA/Index https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example http://www.cnblogs.com/fxjwind/p/3794255.html?utm_source=tuicool&utm_medium=referral http://www.open-open.com/lib/view/open1434551761926.html http://my.oschina.net/ielts0909/blog/110280 http://my.oschina.net/infiniteSpace/blog/312890?p=1 http://www.cnblogs.com/airwindow/archive/2012/06/24/2559754.html ##############################
[阅读更多...]