Kafka high level consumer

为什么选择High Level Consumer

很多时候用户只是想从kafka中读取数据,对于如何处理消息的offset则不怎么关注。在抽象了Kafka中消费事件的大部分细节后,High Level Consumer可以让用户使用起来更为简单。

首先需要知道的就是High Level Consumer保存了从zookeeper中读取某个partition最后的offset。这个offset基于读取进程一开始时提供给kafka的名称保存。这个名称可以理解为Consumer Group的名称。

Consumer Group的名称在kafka集群中是一个全局的属性,因此在启动新的Consumer程序前,注意要关掉“旧的”Consumer。当使用已有的Consumer Group名称启动一个新的Consumer的时候,Kafka将会把这个进程的线程添加到一个现有的线程组中来消费Topic,并触发一次“re-balance”。在“re-balance”中,kafka将会把可用的partition分配给可用的线程。这就有可能把一个partition移交给另一个进程。如果同时使用新的和旧的业务处理逻辑,就很有可能把一些消息导向旧的业务处理逻辑。

设计一个High Level Consumer

首先要说明的是使用High Level Consumer的可以(或者说应该)是一个多线程应用。围绕着topic中partition的数量定义的线程模型有如下几个明显的特征:

  • 如果提供的线程数量多于topic的partition的数量,一些线程将永远接收不到任何消息;
  • 如果提供的线程数量少于topic的partition数量,一些线程将会收到来自多个partition的数据;
  • 如果一个线程对应着多个partition,那么接收到的消息的有序性将会得不到保证,除非在partition内部的offset是序列化的。举例说,你可能会从partition10中获取5条消息,从partition11中获取6条消息,那么有可能在你从partition10中获取5条消息后,又继续从partition10中获取了5条消息,尽管此时partition11中的消息是可用的;
  • 添加进程或线程将会导致re-balance,这就有可能会导致线程对应的partition会重新分配。

现在就可以尝试从Kafka集群读取数据了,如果没有新数据的话,读取数据的进程可能会阻塞。

如下是一个非常简单的Kafka High Level Consumer线程实例:

程序中值得关注的部分是“while (it.hasNext()) ”这一句,程序就是通过这一句不停地从Kafka集群读取数据的——直到用户主动停止线程。

配置测试应用

和SimpleConsumer不同的是,High Level Consumer为我们做了大量的信息记录以及故障处理工作,然而我们还是需要告诉Kafka将一些信息存储在哪儿。在下面的方法中,定义了创建一个High Level Consumer最基本的配置信息:

简单说明下这里的配置参数:

  • “zookeeper.connect”指明了如何在kafka集群中找到启动的Zookeeper实例。Kafka使用zookeeper保存了当前ConsumerGroup从指定topic中消费消息的偏移量以及对应partition信息;
  • “group.id”定义了当前进程所代表的Consumer Group;
  • “zookeeper.session.timeout.ms”定义了kafka等待Zookeeper响应请求(读或者写请求)的时间,时间单位是毫秒,如果超过时间,Kafka就会放弃并继续消费消息;
  • “zookeeper.sync.time.ms”表示了没有发生故障时,zookeeper的一个follower和master同步的时间间隔;
  • “auto.commit.interval.ms”定义了多久更新一次写入到zookeeper的消费offset信息。注意,因为这个提交频率是基于时间的而非基于消费的消息的,如果在提交更新时发生了错误,就有可能重新消费消息。

关于配置的更多信息可以查看这里:http://kafka.apache.org/08/configuration.html

创建线程池

在示例程序中使用java的“java.util.concurrent”包来管理线程,使用这个包可以很方便的创建一个线程池:

首先我们创建了一个Map用来告诉Kafka我们要为目标topic启动多少个线程。我们调用consumer.createMessageStreams方法来传递这个信息给Kafka。这个方法的返回值是一个Map对象,表示了topic和监听topic的KafkaStream的映射关系。注意,我们的示例程序中只向Kafka请求了一个topic,实际上我们可以请求多个topic的信息,只需要在topicCountMap加入对应的信息即可。

最后,我们成功创建了一个线程池,并为每个线程创建了一个ConsumerTest对象作为具体的业务逻辑。

安全退出和错误处理

前面已经提过,Kafka并不会在每次读取消息后就立即更新保存在Zookeeper中的消息offset,而是每隔一段时间更新一次。这就有可能产生一小段延迟,比如我们的程序已经消费了消息,但是实际上此时仍未同步到Zookeeper。如果此时客户端退出了或者崩溃了,那么此前我们消费过的消息可能会再次出现。

还要注意,有时候Broker故障或者其他事件导致的Partition的leader的改变也有可能导致消息的重复消费。

为了避免这种情况的发生,需要尽可能保证安全退出,不要使用“kill -9”这种指令。

在我们的示例程序中,主线程执行到最后sleep了10秒钟。这样后台消费线程就有了10秒钟时间消费stream中的数据。因为已经开启了自动提交,Kafka将会每秒钟提交一次offset。最后,主线程调用了shutdown方法,这个方法会先调用每个消费者线程的shutdown方法,而后才会调用ExecutorService的shutdown方法,最后会等待ExecutorService完成所有未完成的工作。这给了消费者线程一些时间来处理完成仍在stream中的少量未处理消息。如果消费者线程已经处理完了所有来自server的消息,此时关闭消费者线程,Stream的迭代器的hasNext()方法将会返回false。这样消费者线程也可以优雅的退出。另外,如果开启了自动提交,调用消费者线程的consumer方法将会在退出前提交最终的offset。

在实际工作中,通常采用的工作模式是让主线程无限期的睡眠,通过shutdown hook的方式实现安全退出。(有必要了解一下java的hook机制)。

运行示例程序

运行示例程序需要如下命令行参数:

  • 包含端口号的Zookeeper连接字符串;
  • 这次消费进程要使用的Consumer Group名称;
  • 消费的消息所属的Topic;
  • 此次消费进程启动的线程数目。

例如:

这个命令表示将会通过连接主机server01.myco.com1的2181端口与其上的Zookeeper进行通信,请求了名为“myTopic”的Topic的全部partition,并启动了4个线程来消费这些partition上的消息。这个示例中使用的Consumer Group是“group3”。

完整的代码如下(做了折叠):

参考文档

##############################

发表评论

This site uses Akismet to reduce spam. Learn how your comment data is processed.