Kafka的Simple Consumer并不简单。相反的,它是直接在对Kafka的partition和offset进行操作,是一种比较接近底层的方案。 为什么要使用Simple Consumer 使用SimpleConsumer最主要的原因是想在消费消息时获取更大的权限。比如说要做下面这些事情: 多次读取同一条消息; 在一个处理过程中,只消费topic中partition的子集; 进行事务管理,保证消息被消费了一次且只消费了一次。 使用Simple Consumer的负面影响 较之ConsumerGroup,使用SimpleConsumer需要做大量额外的工作: 在应用中需要跟踪offset以便知道消费到哪里了; 需要指明topic和partition对应的leader Broker; 需要对leader Broker的改变做出应对。 使用SimpleBroker的步骤 找到一个活跃Broker,并找出要消费的Topic和Partition的leader Broker; 决定哪个Broker是要消费的Topic和Partition的副本Broker; 建立请求,并定义要抓取的数据; 抓取数据; 确认并还原leader的变化。 找到一个Topic和Partition的Leader Broker 要找到Leader Broker最简单的解决方案就是传送一组已知的Broker到处理程序中,这可以通过配置信息或者命令行来完成。 这里没必要传递集群中全部的Broker给处理程序,只要提供少量的活跃Broker,而后程序可以通过这些Broker得到Leader Broker的信息。程序如下: 在上面的程序中调用了topicsMetadata()方法,通过这个方法,程序可以向已经连接的Broker请求关于目标topic的全部细节。 对partitionsMetadata进行迭代循环会遍历所有的partitions,直到找到我们想要的partition。一旦我们找了想要的partition,将会立即跳出全部循环。 在代码中后面还记录了topic所有副本所在的broker。如果需要重新找出新的leader这些记录就可以派上用场了。 找到消费起始的offset 现在定义从哪儿开始读取数据。Kafka有两个常量可以派上用场: kafka.api.OffsetRequest.EarliestTime():从日志中找到数据最开始的位置,并从该位置开始读取数据; kafka.api.OffsetRequest.LatestTime():这个只传递新的消息。 假使已经有数据了,第一个方法会从头开始读取历史数据;第二个方法则不会读取历史数据,只读取新数据。 不要假设起始offset是0,因为随着时间推移,分区中的消息可能会被删除。 如果要读取最早的数据,在调用getLastOffset方法时,可以为whichTime赋值为kafka.api.OffsetRequest.EarliestTime();如果要读取最新的数据,可以为whichTime赋值为kafka.api.OffsetRequest.LatestTime()。 错误处理 因为SimpleConsumer不会处理关于Leader Broker的错误,需要写一些代码来解决这个问题: 如果fetchResponse.hasError()返回true,即出现了错误,我们会在日志上记录原因,并关闭consumer,然后尝试找出新的leader。 在这个方法中,我们调用了早些时候定义的findLeader()方法来找出新的leader。如果我们尝试连接的只是topic或partition的一个副本,程序将无法尝试找出新的leader。这样我们也无法再从Broker中读取到需要的数据,然后放弃读取任务并抛出异常退出。 Zookeeper需要一小段时间才能发现leader不存在了并尝试重新指定一个leader,因此处理线程在得不到回复的情况下会先sleep一小段时间。事实上,Zookeeper执行错误恢复的速度非常快,通常不需要sleep等待。 读取数据 最后从partition中读取topic数据并输出: 注意readOffset请求的是在读取消息后需要的下一个offset。这样当程序处理完当前的消息块以后就可以知道从哪里开始继续抓取消息了。 还有一点需要注意,就是在程序中我们特意判断了读取到的offset是否比readOffset的值小。这是操作是有必要的。如果Kafka正在对消息进行压缩,抓取请求将会返回一个完全压缩后的消息块,尽管最开始readOffset返回的值不是这个压缩后的消息块的起始位置。因此,我们之前曾经见到过的消息也有可能会被再次返回。还要注意的是我们请求的fetchSize的长度是100000 bytes。如果此时Kafka的Producer正在大批量写入,这个长度可能就不够,也就有可能返回空的消息集合。在这种情况下,需要调整fetchSize的值,直到不再继续返回空消息集。 最后,我们会持续记录读取的消息的数量。如果在上一次请求中没有读取到任何数据,读取线程将会sleep一秒钟。这样程序就不会在没有数据的情况下还反复向Kafka发起请求。 运行示例程序 运行示例程序需要如下参数: 要读取的消息的最大总数(一个整型值,这样我们的程序不会一直循环下去); 要读取的Topic(一个字符串,比如dmp_xxx); 要读取的Partition(一个整型值,即Partition ID); 一个用来查找Metadata的Broker(Broker IP,如127.0.0.1); Broker监听的端口(如 9092)。 源代码 程序源代码如下,这里我暂时做了折叠: ##########
[阅读更多...]-
Kafka Simple Consumer
-
Kafka Producer程序示例
这次是要写一个Producer示例程序。使用的kafka版本是0.8.2。开发语言是java。 程序主体是一个Producer类。这个Producer类主要是用来为指定的topic创建消息。 首先需要引入一些支持类: 然后就是定义一些属性来告诉Producer如何找到Kafka集群、怎样对消息进行序列化以及怎样恰当地引导消息到指定的分区。这些属性是以Java Properties对象的形式定义的: 第一个属性“metadata.broker.list”定义了一个或者多个broker。Producer会为每个topic选择一个broker作为Leader。没必要将集群中所有的broker都添加到这个属性中,但是建议最少设置两个,以防止第一个broker不可用。不用考虑Kafka如何指明哪个broker作为topic(和partition)的Leader,kafka知道怎样与broker建立连接、请求元数据,并最终连接到正确的Broker。 第二个属性“serializer.class”定义了在准备传递消息给Broker时要使用的序列化类。在我们的示例程序中使用了Kafka提供的一个简单的序列化类StringEncoder。注意这里使用的encoder必须能够处理下一步在KeyedMessage中定义的类型。 也可以单独调整消息的Key使用的序列化类,这个可以通过恰当地定义“key.serializer.class”来实现。默认情况下,这个属性和“serializer.class”的值一致。 第三个属性“partitioner.class”定义了要使用哪个类来判断将消息发送给Topic中的哪个Partition。这个属性是可选的。不过在一些特殊的应用中,用户可能想自定义实现一个partition方案。稍后再讨论如何实现Partition。如果消息的key值不为null,可是又没有定义一个“partitioner.class”属性,Kafka将会使用默认的partitioner。如果key值为null,Producer将会把消息分配给一个随机的Partition。 最后一个属性“request.required.acks”告诉Kafka您希望broker在接收到消息后能发送一个确认信号给您的Producer。不设置这个属性的话,Producer将会“fire and forget(放弃并遗忘)”(消息),这有可能会导致数据的丢失。要了解更多,可以参考这个网页:http://kafka.apache.org/08/configuration.html。 然后就是定义Producer对象: 这里使用了java的泛型,您需要指明Producer两个参数的类型。第一个参数就是Partition的key的类型,第二个是消息的类型。在这个示例程序中,这两个属性都是String类型。还需要注意这两个属性需要和前面定义的的配置属性“serializer.class”和“partitioner.class”呼应。 现在开始构建要发送的消息: 这里我们伪造了一系列网站访问的IP信息。将消息以逗号分隔,第一部分是事件发生时的时间戳,第二部分是网址,第三部分是请求来源IP。在这里,我们使用java Random类来保证IP地址的最后八位字节不同,以便我们观察Partitioner是怎样工作的。 然后将消息发送到broker: “page_visits”就是将要把消息写入的Topic。这里我们将IP作为partition的key。需要注意,如果不设置partition的key的话,即使已经定义了一个Partitioner类,Kafka仍然会将消息分配给一个随机的partition。 完整的代码如下: 自定义的Partitioner类: 在自定义的Partitioner类中,我们使用IP地址作为key。我们取出IP地址的最后八位字节与Kafka中topic的partition总数进行模运算。这种partition方案的好处就是所有相同IP的访问记录都会被放置到相同的partition。此外,Consumer处理逻辑也要知道怎样对之进行处理。 在运行我们的程序前,需要确认是否创建了名为”page_visits”的Topic。新建topic的指令如下: 就这样. 参考文档 https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example ######
[阅读更多...] -
kafka简介
简介 kafka是一个分布式的、可分区的、可复制的日志提交服务。它提供了消息传递的功能,但是有着独特的设计。 首先,先了解一些基础概念: Kafka将消息源的分类称为topic; 向Kafka的topic发送消息的进程被称为producer; 订阅并消费消息的进程被称为consumer; Kafka运行在由一个或多个服务组成的集群上,其中的每个服务被称为broker。 从整体上来看,producer负责通过网络将消息发送给Kafka集群,Kafka集群将消息提供给consumer,过程如下图: 客户端和服务器之间通过TCP协议进行通信。Kafka提供了java的客户端。但是客户端也可以用多种语言实现。 1. topic和日志 我们先来研究一下Kafka的一个抽象概念——topic。一个topic是一组消息源的分类或订阅名称。针对每个topic,Kafka集群都维护了一组分区日志(partition)。分区日志即是一个topic的一个partition对应的所有segment文件。如下图: 每个topic都是由一系列有序的,不可变的消息组成。这些消息被不断的追加到分区中。分区中的每个消息都被分配了一个序列ID号。这个序列ID号被称为offset,可以作为在分区中消息的唯一标识。 在一个可配置的时间段内,Kafka集群会保存所有已发布的消息——不管它们是否已被消费。比如说如果日志的保留期被设置为两天,那么在消息发布后的两天内它都是可消费的。两天后,消息会被清除以释放空间。数据规模大小并不影响Kafka的性能,保存大量的数据对Kafka来说并不是个问题。 实际上,每个consumer唯一需要维护的数据就是consumer在日志中消费到的位置,也就是offset。offset是由consumer来维护的。一般情况下,consumer每读取一条消息,offset的值就向前增加一次。但是事实上consumer可以自由控制offset信息,它可以按任意的顺序读取消息。比如,consumer可以将offset的值重置为一个比较旧的位置来进行重新处理。 这些特性意味着kafka的consumer可以非常灵活——它们可以随意进出而不影响集群或者其它的consumer。举例说,用户可以在命令行中使用tail命令来追踪任意topic的内容,而不用担心影响正在访问topic的consumer。 对日志进行分区有这样几点作用: 首先,可以使日志的规模保持在一个指定的范围内,以能保存到单独的服务上。每个独立的分区必须能够适配它所在的服务节点。一个topic可以有多个分区,从而可以承载任意规模的数据。 第二,可以作为并行处理的单元(这点稍后讨论)。 2. 分布式 日志的分区分布式地保存在kafka集群的服务节点上。这样每个服务节点都可以请求数据管理数据。每个分区在一定数量的服务节点上都有副本以进行容错,并且这个数量是可配置的。 对于每个分区,都有一个服务节点扮演着leader的角色,其他的零个或者多个服务扮演着follower的角色。leader负责处理分区的读写请求,而follower只是被动地维护leader的副本。一旦leader出现了故障,一个follower就会自动成为新的leader。每个服务节点都可能同时扮演leader和follower两种角色。即一个服务节点可能是它上面一些分区的leader,同时也可能是其他分区的follower。就是这样kafka在集群内部实现了负载均衡。 3. Producer producer负责发布数据到其所选择的topic中,并选择将数据分配给topic的哪个分区。分区选择可以简单地由负载均衡以轮流制的方式实现,也可以通过一些特定的分区函数(取决于消息中的一些key)实现。通常使用的是第二种方式。 4. Comsumer 消息发布通常有两种模式:队列模式和发布订阅模式。在队列模式中,多个consumer(这里有个consumer pool的概念)从一个服务读取消息,每个消息只能被其中的一个consumer读到。在发布订阅模式中,消息会广播给所有的consumer。kafka提供了一个抽象的consumer概念即consumer group,从而同时实现了这两种方案。 每个consumer可以加入一个consumer group。consumer group订阅发布到topic的消息,而后将消息传送给其下面的一个consumer实例。所有的consumer实例可以运行在不同的进程上,也可以在不同的独立的机器上。 如果所有的consumer实例属于同一个consumer group,这样的工作模式在consumer中实现了负载均衡,类似常规的消息队列模式。 如果所有的consumer实例分别属于不同的consumer group,每个消息会被广播给所有的consumer,这样的模式类似于发布订阅模式。 不过更常见的是,每个topic都会有若干consumer group,一个consumer group就是一个逻辑上的“订阅者”。每个consumer group都是由多个consumer实例组成,从而获得了更好的稳定性和容错能力。这其实也是一个发布订阅模式,只不过订阅者不只是一个进程,而是由多个consumer组成的集群。 上图展示了一个kafka集群,其中包含两台服务器、四个分区(P0~P3)、以及两个consumer group。其中group A有两个consumer实例,B有四个。 相比常规的消息系统,kafka可以更好的保证有序性。 常规的队列模式在服务器上有序的保存消息。当多个consumer需要从队列中获取消息的时候,服务器会按照消息保存的顺序将消息取出。然而,尽管服务器是按顺序将消息取出的,消息却是异步地被分发至consumer的,当消息到达不同的consumer的时候可能已经失去了顺序。 这意味着在并发消费时将导致消息的顺序错乱。为了解决这个问题,消息系统通常会采用“exclusive consumer(独有消费)”的概念,即只允许一个进程从队列读取消息,当然这就意味着失去了并发性。 kafka提供了一个很好的解决方案。通过topic内部的分区概念,在处理多个consumer并发时,kafka可以实现有序性和负载均衡。 kafka将topic的每个分区只分发给一个consumer group, 这样一个分区就只能被这个group中的一个consumer消费,并且可以保证消费的顺序性。因为有很多分区,所以仍然可以在多个consumer实例中实现负载均衡。请注意consumer实例的数量不应该比分区的数量多。 kafka只是在一个分区内提供了消息的有序性,而在一个topic不同的分区间是不能保证有序性的。然而对于大部分的应用程序来说这已经足够了。如果要保证topic中所有消息的有序性,那么只能让一个topic中只有一个分区,当然这也意味着只能有一个消费者进程。 5. 保证 在某个层面上 ,kafka提供了如下保证: kafka会保证producer发给topic指定分区的消息按照发送顺序追加到日志中。即是说,如果消息M1和M2先后被同一个producer发送,如果M1先发送,那么在分区日志中M1的offset要比M2小,并且更早出现在分区日志中; kafka可以保证consumer消费消息的顺序和消息在分区日志中存储的顺序一致; 如果一个topic的“replication factor”是N,那么Kafka可以保证在N-1个服务器失效后仍不丢失提交给分区的消息。 更多细节可以在文档的“设计原理”这一章查看。 使用场景 这里列出了Kafka的几种常见使用场景。如果想对kafka在实战中的使用有个概括的了解,可以看一下这篇博客。 1. 消息处理 kafka可以作为常规的MessageBroker。MessageBroker有多种用途(将数据从producer方解耦,缓存未处理的消息等等)。相比大多数消息系统,kafka有更好的吞吐能力、内置的分区机制、冗余备份机制、以及不错的容错能力,因此kafka适用于大部分消息处理需求。 在我们的经验中,一般消息处理的吞吐量相对不大,但是要求较低的端到端延迟,并且需要kafka提供的良好的稳定性保障。 在消息处理这个领域,kafka可以比得上常规的消息系统,比如ActiveMQ和RabbitMQ。 2. 网站活动跟踪 kafka最早的用处就是作为一个实时发布订阅系统来构建用户行为追踪管道。这意味着网站上的活动(网页浏览、搜索以及其他用户行为)都会被发布到中央topic集合,每个topic代表了一种行为类型。收集的这些数据可以在订阅后支持多种用途比如实时处理、实时监控、加载到hadoop系统或者离线数据仓库系统作离线处理或者生成报表。 活动追踪通常需要很大的空间容量,因为用户每访问一个网页都会产生大量的活动数据。 3. Metrics(运营评估) kafka还经常会被用来处理运营监控数据。这涉及到聚合多个分布式应用上的数据来生成运营分析数据。 4. 日志聚合 很多人使用kafka作为日志聚合解决方案。日志聚合通常就是从各个服务器收集物理日志文件并将之集中到一个地方(比如文件服务器或者HDFS)去做处理。kafka并不关注文件的细节,它提供了一个更简洁的消息数据流的抽象概念来处理日志或者事件数据。这就实现了低延迟处理、简单的多数据源支持以及分布式数据消费。相比一些日志收集系统比如Scribe或者Flume,kafka有着不错的性能,更低的端到端延迟,并通过冗余复制提供了更健壮的持久性保障。 5. 流处理 用户经常需要对数据做一些阶段性的处理:从topic获取原始数据,而后经过汇总、丰富或者以其他方式生成新的topic为进一步的数据消费做准备。举个例子:一个文章推荐工作流程首先需要从RSS订阅源抓取文章内容并发布为一个名为“article”的topic;而后对从“article”中获取的所有文章进行整理并作去重处理后发布为新的topic,最后的工作就是尝试进行内容匹配并推荐给用户。这里描述了一个实时数据处理流程图。Storm 和 Samza 是两个用来做这些工作的常见框架。 6. Event Sourcing(事件溯源) 事件溯源是一种软件设计模式。在这种模式下,每一次数据状态的变化都会被记录到一个时间顺序的记录中。kafka支持海量数据的特性为这种设计模式提供了一个不错的选择。 7. Commit Log 对于分布式系统,kafka可以被用于提供额外的日志提交服务。日志可以被用来在节点间复制数据,也可以作为重新同步机制以在节点故障后恢复数据。kafka的日志压缩特性支持了这方面的应用。在这一点上,kafka和Apache BookKeeper项目有些类似。 三. 生态圈 在kafka的主项目外,还有大量的支持工具。在ecosystem page上,列出了这些工具,包括流处理系统、hadoop聚合接口、监控工具、部署工具等。
[阅读更多...]