为什么选择High Level Consumer
很多时候用户只是想从kafka中读取数据,对于如何处理消息的offset则不怎么关注。在抽象了Kafka中消费事件的大部分细节后,High Level Consumer可以让用户使用起来更为简单。
首先需要知道的就是High Level Consumer保存了从zookeeper中读取某个partition最后的offset。这个offset基于读取进程一开始时提供给kafka的名称保存。这个名称可以理解为Consumer Group的名称。
Consumer Group的名称在kafka集群中是一个全局的属性,因此在启动新的Consumer程序前,注意要关掉“旧的”Consumer。当使用已有的Consumer Group名称启动一个新的Consumer的时候,Kafka将会把这个进程的线程添加到一个现有的线程组中来消费Topic,并触发一次“re-balance”。在“re-balance”中,kafka将会把可用的partition分配给可用的线程。这就有可能把一个partition移交给另一个进程。如果同时使用新的和旧的业务处理逻辑,就很有可能把一些消息导向旧的业务处理逻辑。
设计一个High Level Consumer
首先要说明的是使用High Level Consumer的可以(或者说应该)是一个多线程应用。围绕着topic中partition的数量定义的线程模型有如下几个明显的特征:
- 如果提供的线程数量多于topic的partition的数量,一些线程将永远接收不到任何消息;
- 如果提供的线程数量少于topic的partition数量,一些线程将会收到来自多个partition的数据;
- 如果一个线程对应着多个partition,那么接收到的消息的有序性将会得不到保证,除非在partition内部的offset是序列化的。举例说,你可能会从partition10中获取5条消息,从partition11中获取6条消息,那么有可能在你从partition10中获取5条消息后,又继续从partition10中获取了5条消息,尽管此时partition11中的消息是可用的;
- 添加进程或线程将会导致re-balance,这就有可能会导致线程对应的partition会重新分配。
现在就可以尝试从Kafka集群读取数据了,如果没有新数据的话,读取数据的进程可能会阻塞。
如下是一个非常简单的Kafka High Level Consumer线程实例:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; public class ConsumerTest implements Runnable {     private KafkaStream m_stream;     private int m_threadNumber;     public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {         m_threadNumber = a_threadNumber;         m_stream = a_stream;     }     public void run() {         ConsumerIterator<byte[], byte[]> it = m_stream.iterator();         while (it.hasNext())             System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));         System.out.println("Shutting down Thread: " + m_threadNumber);     } } | 
程序中值得关注的部分是“while (it.hasNext()) ”这一句,程序就是通过这一句不停地从Kafka集群读取数据的——直到用户主动停止线程。
配置测试应用
和SimpleConsumer不同的是,High Level Consumer为我们做了大量的信息记录以及故障处理工作,然而我们还是需要告诉Kafka将一些信息存储在哪儿。在下面的方法中,定义了创建一个High Level Consumer最基本的配置信息:
| 1 2 3 4 5 6 7 8 9 10 | private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) { 	Properties props = new Properties(); 	props.put("zookeeper.connect", a_zookeeper); 	props.put("group.id", a_groupId); 	props.put("zookeeper.session.timeout.ms", "400"); 	props.put("zookeeper.sync.time.ms", "200"); 	props.put("auto.commit.interval.ms", "1000"); 	return new ConsumerConfig(props); } | 
简单说明下这里的配置参数:
- “zookeeper.connect”指明了如何在kafka集群中找到启动的Zookeeper实例。Kafka使用zookeeper保存了当前ConsumerGroup从指定topic中消费消息的偏移量以及对应partition信息;
- “group.id”定义了当前进程所代表的Consumer Group;
- “zookeeper.session.timeout.ms”定义了kafka等待Zookeeper响应请求(读或者写请求)的时间,时间单位是毫秒,如果超过时间,Kafka就会放弃并继续消费消息;
- “zookeeper.sync.time.ms”表示了没有发生故障时,zookeeper的一个follower和master同步的时间间隔;
- “auto.commit.interval.ms”定义了多久更新一次写入到zookeeper的消费offset信息。注意,因为这个提交频率是基于时间的而非基于消费的消息的,如果在提交更新时发生了错误,就有可能重新消费消息。
关于配置的更多信息可以查看这里:http://kafka.apache.org/08/configuration.html。
创建线程池
在示例程序中使用java的“java.util.concurrent”包来管理线程,使用这个包可以很方便的创建一个线程池:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |     public void run(int a_numThreads) {         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();         topicCountMap.put(topic, new Integer(a_numThreads));         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);         List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);         // 创建多个线程         executor = Executors.newFixedThreadPool(a_numThreads);         // 创建一个对象消费消息         int threadNumber = 0;         for (final KafkaStream stream : streams) {             executor.execute(new ConsumerTest(stream, threadNumber));             threadNumber++;         }     } | 
首先我们创建了一个Map用来告诉Kafka我们要为目标topic启动多少个线程。我们调用consumer.createMessageStreams方法来传递这个信息给Kafka。这个方法的返回值是一个Map对象,表示了topic和监听topic的KafkaStream的映射关系。注意,我们的示例程序中只向Kafka请求了一个topic,实际上我们可以请求多个topic的信息,只需要在topicCountMap加入对应的信息即可。
最后,我们成功创建了一个线程池,并为每个线程创建了一个ConsumerTest对象作为具体的业务逻辑。
安全退出和错误处理
前面已经提过,Kafka并不会在每次读取消息后就立即更新保存在Zookeeper中的消息offset,而是每隔一段时间更新一次。这就有可能产生一小段延迟,比如我们的程序已经消费了消息,但是实际上此时仍未同步到Zookeeper。如果此时客户端退出了或者崩溃了,那么此前我们消费过的消息可能会再次出现。
还要注意,有时候Broker故障或者其他事件导致的Partition的leader的改变也有可能导致消息的重复消费。
为了避免这种情况的发生,需要尽可能保证安全退出,不要使用“kill -9”这种指令。
在我们的示例程序中,主线程执行到最后sleep了10秒钟。这样后台消费线程就有了10秒钟时间消费stream中的数据。因为已经开启了自动提交,Kafka将会每秒钟提交一次offset。最后,主线程调用了shutdown方法,这个方法会先调用每个消费者线程的shutdown方法,而后才会调用ExecutorService的shutdown方法,最后会等待ExecutorService完成所有未完成的工作。这给了消费者线程一些时间来处理完成仍在stream中的少量未处理消息。如果消费者线程已经处理完了所有来自server的消息,此时关闭消费者线程,Stream的迭代器的hasNext()方法将会返回false。这样消费者线程也可以优雅的退出。另外,如果开启了自动提交,调用消费者线程的consumer方法将会在退出前提交最终的offset。
| 1 2 3 4 5 | try {       Thread.sleep(10000); } catch (InterruptedException ie) { } example.shutdown(); | 
在实际工作中,通常采用的工作模式是让主线程无限期的睡眠,通过shutdown hook的方式实现安全退出。(有必要了解一下java的hook机制)。
运行示例程序
运行示例程序需要如下命令行参数:
- 包含端口号的Zookeeper连接字符串;
- 这次消费进程要使用的Consumer Group名称;
- 消费的消息所属的Topic;
- 此次消费进程启动的线程数目。
例如:
| 1 | server01.myco.com1:2181 group3 myTopic  4 | 
这个命令表示将会通过连接主机server01.myco.com1的2181端口与其上的Zookeeper进行通信,请求了名为“myTopic”的Topic的全部partition,并启动了4个线程来消费这些partition上的消息。这个示例中使用的Consumer Group是“group3”。
完整的代码如下(做了折叠):
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 | import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class ConsumerGroupExample {     private final ConsumerConnector consumer;     private final String topic;     private ExecutorService executor;     public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {         consumer = kafka.consumer.Consumer.createJavaConsumerConnector(                 createConsumerConfig(a_zookeeper, a_groupId));         this.topic = a_topic;     }     public void shutdown() {         if (consumer != null) consumer.shutdown();         if (executor != null) executor.shutdown();         try {             if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {                 System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");             }         } catch (InterruptedException e) {             System.out.println("Interrupted during shutdown, exiting uncleanly");         }     }     public void run(int a_numThreads) {         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();         topicCountMap.put(topic, new Integer(a_numThreads));         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);         List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);         // 加载所有的线程         executor = Executors.newFixedThreadPool(a_numThreads);         // 创建一个对象消费消息         int threadNumber = 0;         for (final KafkaStream stream : streams) {             executor.submit(new ConsumerTest(stream, threadNumber));             threadNumber++;         }     }     private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {         Properties props = new Properties();         props.put("zookeeper.connect", a_zookeeper);         props.put("group.id", a_groupId);         props.put("zookeeper.session.timeout.ms", "400");         props.put("zookeeper.sync.time.ms", "200");         props.put("auto.commit.interval.ms", "1000");         return new ConsumerConfig(props);     }     public static void main(String[] args) {         String zooKeeper = args[0];         String groupId = args[1];         String topic = args[2];         int threads = Integer.parseInt(args[3]);         ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);         example.run(threads);         try {             Thread.sleep(10000);         } catch (InterruptedException ie) {         }         example.shutdown();     } } | 
参考文档
- https://cwiki.apache.org/confluence/display/KAFKA/Index
- https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
- http://www.cnblogs.com/fxjwind/p/3794255.html?utm_source=tuicool&utm_medium=referral
- http://www.open-open.com/lib/view/open1434551761926.html
- http://my.oschina.net/ielts0909/blog/110280
- http://my.oschina.net/infiniteSpace/blog/312890?p=1
- http://www.cnblogs.com/airwindow/archive/2012/06/24/2559754.html
##############################
发表评论