• springboot入门14 – Kafka应用

    简述 这几天优化了一下之前写的一个springboot kafka组件。比较起原生的spring-kafka来,我希望能够简化kafka的使用,可以更聚焦于具体的消息处理逻辑。 接下来的内容是这个组件的用法。 使用方法 添加依赖 这个组件已经提交到了maven中央仓库,可以直接通过依赖的形式引入: 0.2.2是这两天刚发布的一个版本。 消费者Processor kafka-spring-boot-starter这个组件已经完成了kafka消费者的主要功能。 对于开发者来说,可以不必关注KafkaConsumer的创建,只需要实现Processor接口并注入到容器中即可。 下面是一个简单的示例: 如示例中,通过@Component注解完成了Processor实现类的实例的注入,并为注入的Bean提供了一个名称:zhyyy。记住这个名称,在之后的配置文件中会用到。 使用生产者 kafka-spring-boot-starter会根据配置主动创建KafkaProducer。开发使用时可以直接从容器中获取ProducerTemplate实例来发送消息: 如果写入kafka的消息的key和value的序列化方案采用的都是默认的字符串(反)序列化方案(StringDeserializer和StringSerializer),可以使用StringProducerTemplate实例: 发送消息时酌情调用不同的send()方法: 配置 下面是一个最简单的配置: 如上配置中: test-group00既是配置项的ID,也是消费组ID bootstrap-servers我想不需要多做解释。 topics对应的是一个数组结构,也可以写作[test-topic1]或[test-topic1,test-topic2],即支持同一个kafka集群上多个类似topic的统一处理 consumer是消费者相关配置,processor对应的是Processor实现类的Bean名称,count标识的是应用内消费线程的数量 默认的序列化方案采用的是字符串序列化方案。 虽然在配置中没有体现,但kafka-spring-boot-starter组件会基于已有的信息创建KafkaProducer,使用时可以通过ProducerTemplate执行消息发送。 比较完整的配置是这样子的: 其中common模块下是一些通用的配置,config模块下则是一或多组具体的配置(这里是两组)。common下的配置会被config下的配置覆盖。 此外还独立出来了一些常用的配置项,如autoOffsetReset,keyDeserializer等,以便在使用时进行配置。 其他 示例应用在 github / spring-boot-kafka。 kafka-spring-boot-starter这个组件的源码也在 github 。如果有定制化的需求可以据此进行调整。 End!

    [阅读更多...]
  • Kafka java.net.SocketTimeoutException

    手上有一个消费Kafka的服务,这个服务每隔一段时间使用SimpleConsumer从kafka集群获取数据。Kafka的版本是0.8.21。今天这个服务一直在报错:SocketTimeoutException。异常信息如下: 从异常信息上来看是网络连接超时。翻看了一下任务的运行日志发现是在连接Kafka集群的k3节点时出的错。 尝试telnet连接k3节点的broker是可以的。这样子还报错就比较奇怪了。 使用glances查看了一下消费节点的本地运行环境,发现各项指标也都挺正常的。 登录到k3节点,查看kafka的运行日志也能够看到消费节点的连接记录: 查看了一下使用high-level consumer的其他服务也是正常的。 百思不得其解。但是问题还得解决,随手查看了一下k3节点的句柄数,发现一个进程的句柄数高得有点儿离谱哎: k3节点每个进程可以打开的最大句柄数: 53466就是k3节点上的kafka broker进程: 此时有必要看下这个进程的网络连接了: 发现有大量的CLOSE_WAIT和TIME_WAIT连接: 统计了下CLOSE_WAIT和TIME_WAIT连接的数量,好像有点儿多: 可以看出来所有连接中CLOSE_WAIT的记录占了绝大部分,和CLOSE_WAIT相关的连接又多是在j0节点上。 到j0节点查找和端口51436相关的进程,发现是一个kafka-manager服务。不是生产服务,直接关掉了事。 回到k3节点,重启该节点的broker。再进行测试,异常消失。持续观察几天,kafka的连接数是正常的,说明问题已经得到修复。 ##########

    [阅读更多...]
  • Kafka high level consumer

    为什么选择High Level Consumer 很多时候用户只是想从kafka中读取数据,对于如何处理消息的offset则不怎么关注。在抽象了Kafka中消费事件的大部分细节后,High Level Consumer可以让用户使用起来更为简单。 首先需要知道的就是High Level Consumer保存了从zookeeper中读取某个partition最后的offset。这个offset基于读取进程一开始时提供给kafka的名称保存。这个名称可以理解为Consumer Group的名称。 Consumer Group的名称在kafka集群中是一个全局的属性,因此在启动新的Consumer程序前,注意要关掉“旧的”Consumer。当使用已有的Consumer Group名称启动一个新的Consumer的时候,Kafka将会把这个进程的线程添加到一个现有的线程组中来消费Topic,并触发一次“re-balance”。在“re-balance”中,kafka将会把可用的partition分配给可用的线程。这就有可能把一个partition移交给另一个进程。如果同时使用新的和旧的业务处理逻辑,就很有可能把一些消息导向旧的业务处理逻辑。 设计一个High Level Consumer 首先要说明的是使用High Level Consumer的可以(或者说应该)是一个多线程应用。围绕着topic中partition的数量定义的线程模型有如下几个明显的特征: 如果提供的线程数量多于topic的partition的数量,一些线程将永远接收不到任何消息; 如果提供的线程数量少于topic的partition数量,一些线程将会收到来自多个partition的数据; 如果一个线程对应着多个partition,那么接收到的消息的有序性将会得不到保证,除非在partition内部的offset是序列化的。举例说,你可能会从partition10中获取5条消息,从partition11中获取6条消息,那么有可能在你从partition10中获取5条消息后,又继续从partition10中获取了5条消息,尽管此时partition11中的消息是可用的; 添加进程或线程将会导致re-balance,这就有可能会导致线程对应的partition会重新分配。 现在就可以尝试从Kafka集群读取数据了,如果没有新数据的话,读取数据的进程可能会阻塞。 如下是一个非常简单的Kafka High Level Consumer线程实例: 程序中值得关注的部分是“while (it.hasNext()) ”这一句,程序就是通过这一句不停地从Kafka集群读取数据的——直到用户主动停止线程。 配置测试应用 和SimpleConsumer不同的是,High Level Consumer为我们做了大量的信息记录以及故障处理工作,然而我们还是需要告诉Kafka将一些信息存储在哪儿。在下面的方法中,定义了创建一个High Level Consumer最基本的配置信息: 简单说明下这里的配置参数: “zookeeper.connect”指明了如何在kafka集群中找到启动的Zookeeper实例。Kafka使用zookeeper保存了当前ConsumerGroup从指定topic中消费消息的偏移量以及对应partition信息; “group.id”定义了当前进程所代表的Consumer Group; “zookeeper.session.timeout.ms”定义了kafka等待Zookeeper响应请求(读或者写请求)的时间,时间单位是毫秒,如果超过时间,Kafka就会放弃并继续消费消息; “zookeeper.sync.time.ms”表示了没有发生故障时,zookeeper的一个follower和master同步的时间间隔; “auto.commit.interval.ms”定义了多久更新一次写入到zookeeper的消费offset信息。注意,因为这个提交频率是基于时间的而非基于消费的消息的,如果在提交更新时发生了错误,就有可能重新消费消息。 关于配置的更多信息可以查看这里:http://kafka.apache.org/08/configuration.html。 创建线程池 在示例程序中使用java的“java.util.concurrent”包来管理线程,使用这个包可以很方便的创建一个线程池: 首先我们创建了一个Map用来告诉Kafka我们要为目标topic启动多少个线程。我们调用consumer.createMessageStreams方法来传递这个信息给Kafka。这个方法的返回值是一个Map对象,表示了topic和监听topic的KafkaStream的映射关系。注意,我们的示例程序中只向Kafka请求了一个topic,实际上我们可以请求多个topic的信息,只需要在topicCountMap加入对应的信息即可。 最后,我们成功创建了一个线程池,并为每个线程创建了一个ConsumerTest对象作为具体的业务逻辑。 安全退出和错误处理 前面已经提过,Kafka并不会在每次读取消息后就立即更新保存在Zookeeper中的消息offset,而是每隔一段时间更新一次。这就有可能产生一小段延迟,比如我们的程序已经消费了消息,但是实际上此时仍未同步到Zookeeper。如果此时客户端退出了或者崩溃了,那么此前我们消费过的消息可能会再次出现。 还要注意,有时候Broker故障或者其他事件导致的Partition的leader的改变也有可能导致消息的重复消费。 为了避免这种情况的发生,需要尽可能保证安全退出,不要使用“kill -9”这种指令。 在我们的示例程序中,主线程执行到最后sleep了10秒钟。这样后台消费线程就有了10秒钟时间消费stream中的数据。因为已经开启了自动提交,Kafka将会每秒钟提交一次offset。最后,主线程调用了shutdown方法,这个方法会先调用每个消费者线程的shutdown方法,而后才会调用ExecutorService的shutdown方法,最后会等待ExecutorService完成所有未完成的工作。这给了消费者线程一些时间来处理完成仍在stream中的少量未处理消息。如果消费者线程已经处理完了所有来自server的消息,此时关闭消费者线程,Stream的迭代器的hasNext()方法将会返回false。这样消费者线程也可以优雅的退出。另外,如果开启了自动提交,调用消费者线程的consumer方法将会在退出前提交最终的offset。 在实际工作中,通常采用的工作模式是让主线程无限期的睡眠,通过shutdown hook的方式实现安全退出。(有必要了解一下java的hook机制)。 运行示例程序 运行示例程序需要如下命令行参数: 包含端口号的Zookeeper连接字符串; 这次消费进程要使用的Consumer Group名称; 消费的消息所属的Topic; 此次消费进程启动的线程数目。 例如: 这个命令表示将会通过连接主机server01.myco.com1的2181端口与其上的Zookeeper进行通信,请求了名为“myTopic”的Topic的全部partition,并启动了4个线程来消费这些partition上的消息。这个示例中使用的Consumer Group是“group3”。 完整的代码如下(做了折叠): 参考文档 https://cwiki.apache.org/confluence/display/KAFKA/Index https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example http://www.cnblogs.com/fxjwind/p/3794255.html?utm_source=tuicool&utm_medium=referral http://www.open-open.com/lib/view/open1434551761926.html http://my.oschina.net/ielts0909/blog/110280 http://my.oschina.net/infiniteSpace/blog/312890?p=1 http://www.cnblogs.com/airwindow/archive/2012/06/24/2559754.html ##############################

    [阅读更多...]
  • Kafka Simple Consumer

    Kafka的Simple Consumer并不简单。相反的,它是直接在对Kafka的partition和offset进行操作,是一种比较接近底层的方案。 为什么要使用Simple Consumer 使用SimpleConsumer最主要的原因是想在消费消息时获取更大的权限。比如说要做下面这些事情: 多次读取同一条消息; 在一个处理过程中,只消费topic中partition的子集; 进行事务管理,保证消息被消费了一次且只消费了一次。 使用Simple Consumer的负面影响 较之ConsumerGroup,使用SimpleConsumer需要做大量额外的工作: 在应用中需要跟踪offset以便知道消费到哪里了; 需要指明topic和partition对应的leader Broker; 需要对leader Broker的改变做出应对。 使用SimpleBroker的步骤 找到一个活跃Broker,并找出要消费的Topic和Partition的leader Broker; 决定哪个Broker是要消费的Topic和Partition的副本Broker; 建立请求,并定义要抓取的数据; 抓取数据; 确认并还原leader的变化。 找到一个Topic和Partition的Leader Broker 要找到Leader Broker最简单的解决方案就是传送一组已知的Broker到处理程序中,这可以通过配置信息或者命令行来完成。 这里没必要传递集群中全部的Broker给处理程序,只要提供少量的活跃Broker,而后程序可以通过这些Broker得到Leader Broker的信息。程序如下: 在上面的程序中调用了topicsMetadata()方法,通过这个方法,程序可以向已经连接的Broker请求关于目标topic的全部细节。 对partitionsMetadata进行迭代循环会遍历所有的partitions,直到找到我们想要的partition。一旦我们找了想要的partition,将会立即跳出全部循环。 在代码中后面还记录了topic所有副本所在的broker。如果需要重新找出新的leader这些记录就可以派上用场了。 找到消费起始的offset 现在定义从哪儿开始读取数据。Kafka有两个常量可以派上用场: kafka.api.OffsetRequest.EarliestTime():从日志中找到数据最开始的位置,并从该位置开始读取数据; kafka.api.OffsetRequest.LatestTime():这个只传递新的消息。 假使已经有数据了,第一个方法会从头开始读取历史数据;第二个方法则不会读取历史数据,只读取新数据。 不要假设起始offset是0,因为随着时间推移,分区中的消息可能会被删除。 如果要读取最早的数据,在调用getLastOffset方法时,可以为whichTime赋值为kafka.api.OffsetRequest.EarliestTime();如果要读取最新的数据,可以为whichTime赋值为kafka.api.OffsetRequest.LatestTime()。 错误处理 因为SimpleConsumer不会处理关于Leader Broker的错误,需要写一些代码来解决这个问题: 如果fetchResponse.hasError()返回true,即出现了错误,我们会在日志上记录原因,并关闭consumer,然后尝试找出新的leader。 在这个方法中,我们调用了早些时候定义的findLeader()方法来找出新的leader。如果我们尝试连接的只是topic或partition的一个副本,程序将无法尝试找出新的leader。这样我们也无法再从Broker中读取到需要的数据,然后放弃读取任务并抛出异常退出。 Zookeeper需要一小段时间才能发现leader不存在了并尝试重新指定一个leader,因此处理线程在得不到回复的情况下会先sleep一小段时间。事实上,Zookeeper执行错误恢复的速度非常快,通常不需要sleep等待。 读取数据 最后从partition中读取topic数据并输出: 注意readOffset请求的是在读取消息后需要的下一个offset。这样当程序处理完当前的消息块以后就可以知道从哪里开始继续抓取消息了。 还有一点需要注意,就是在程序中我们特意判断了读取到的offset是否比readOffset的值小。这是操作是有必要的。如果Kafka正在对消息进行压缩,抓取请求将会返回一个完全压缩后的消息块,尽管最开始readOffset返回的值不是这个压缩后的消息块的起始位置。因此,我们之前曾经见到过的消息也有可能会被再次返回。还要注意的是我们请求的fetchSize的长度是100000 bytes。如果此时Kafka的Producer正在大批量写入,这个长度可能就不够,也就有可能返回空的消息集合。在这种情况下,需要调整fetchSize的值,直到不再继续返回空消息集。 最后,我们会持续记录读取的消息的数量。如果在上一次请求中没有读取到任何数据,读取线程将会sleep一秒钟。这样程序就不会在没有数据的情况下还反复向Kafka发起请求。 运行示例程序 运行示例程序需要如下参数: 要读取的消息的最大总数(一个整型值,这样我们的程序不会一直循环下去); 要读取的Topic(一个字符串,比如dmp_xxx); 要读取的Partition(一个整型值,即Partition ID); 一个用来查找Metadata的Broker(Broker IP,如127.0.0.1); Broker监听的端口(如 9092)。 源代码 程序源代码如下,这里我暂时做了折叠: ##########

    [阅读更多...]
  • Kafka Producer程序示例

    这次是要写一个Producer示例程序。使用的kafka版本是0.8.2。开发语言是java。 程序主体是一个Producer类。这个Producer类主要是用来为指定的topic创建消息。 首先需要引入一些支持类: 然后就是定义一些属性来告诉Producer如何找到Kafka集群、怎样对消息进行序列化以及怎样恰当地引导消息到指定的分区。这些属性是以Java Properties对象的形式定义的: 第一个属性“metadata.broker.list”定义了一个或者多个broker。Producer会为每个topic选择一个broker作为Leader。没必要将集群中所有的broker都添加到这个属性中,但是建议最少设置两个,以防止第一个broker不可用。不用考虑Kafka如何指明哪个broker作为topic(和partition)的Leader,kafka知道怎样与broker建立连接、请求元数据,并最终连接到正确的Broker。 第二个属性“serializer.class”定义了在准备传递消息给Broker时要使用的序列化类。在我们的示例程序中使用了Kafka提供的一个简单的序列化类StringEncoder。注意这里使用的encoder必须能够处理下一步在KeyedMessage中定义的类型。 也可以单独调整消息的Key使用的序列化类,这个可以通过恰当地定义“key.serializer.class”来实现。默认情况下,这个属性和“serializer.class”的值一致。 第三个属性“partitioner.class”定义了要使用哪个类来判断将消息发送给Topic中的哪个Partition。这个属性是可选的。不过在一些特殊的应用中,用户可能想自定义实现一个partition方案。稍后再讨论如何实现Partition。如果消息的key值不为null,可是又没有定义一个“partitioner.class”属性,Kafka将会使用默认的partitioner。如果key值为null,Producer将会把消息分配给一个随机的Partition。 最后一个属性“request.required.acks”告诉Kafka您希望broker在接收到消息后能发送一个确认信号给您的Producer。不设置这个属性的话,Producer将会“fire and forget(放弃并遗忘)”(消息),这有可能会导致数据的丢失。要了解更多,可以参考这个网页:http://kafka.apache.org/08/configuration.html。 然后就是定义Producer对象: 这里使用了java的泛型,您需要指明Producer两个参数的类型。第一个参数就是Partition的key的类型,第二个是消息的类型。在这个示例程序中,这两个属性都是String类型。还需要注意这两个属性需要和前面定义的的配置属性“serializer.class”和“partitioner.class”呼应。 现在开始构建要发送的消息: 这里我们伪造了一系列网站访问的IP信息。将消息以逗号分隔,第一部分是事件发生时的时间戳,第二部分是网址,第三部分是请求来源IP。在这里,我们使用java Random类来保证IP地址的最后八位字节不同,以便我们观察Partitioner是怎样工作的。 然后将消息发送到broker: “page_visits”就是将要把消息写入的Topic。这里我们将IP作为partition的key。需要注意,如果不设置partition的key的话,即使已经定义了一个Partitioner类,Kafka仍然会将消息分配给一个随机的partition。 完整的代码如下: 自定义的Partitioner类: 在自定义的Partitioner类中,我们使用IP地址作为key。我们取出IP地址的最后八位字节与Kafka中topic的partition总数进行模运算。这种partition方案的好处就是所有相同IP的访问记录都会被放置到相同的partition。此外,Consumer处理逻辑也要知道怎样对之进行处理。 在运行我们的程序前,需要确认是否创建了名为”page_visits”的Topic。新建topic的指令如下: 就这样. 参考文档 https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example ######

    [阅读更多...]
  • kafka简介

    简介 kafka是一个分布式的、可分区的、可复制的日志提交服务。它提供了消息传递的功能,但是有着独特的设计。 首先,先了解一些基础概念: Kafka将消息源的分类称为topic; 向Kafka的topic发送消息的进程被称为producer; 订阅并消费消息的进程被称为consumer; Kafka运行在由一个或多个服务组成的集群上,其中的每个服务被称为broker。 从整体上来看,producer负责通过网络将消息发送给Kafka集群,Kafka集群将消息提供给consumer,过程如下图: 客户端和服务器之间通过TCP协议进行通信。Kafka提供了java的客户端。但是客户端也可以用多种语言实现。 1. topic和日志 我们先来研究一下Kafka的一个抽象概念——topic。一个topic是一组消息源的分类或订阅名称。针对每个topic,Kafka集群都维护了一组分区日志(partition)。分区日志即是一个topic的一个partition对应的所有segment文件。如下图: 每个topic都是由一系列有序的,不可变的消息组成。这些消息被不断的追加到分区中。分区中的每个消息都被分配了一个序列ID号。这个序列ID号被称为offset,可以作为在分区中消息的唯一标识。 在一个可配置的时间段内,Kafka集群会保存所有已发布的消息——不管它们是否已被消费。比如说如果日志的保留期被设置为两天,那么在消息发布后的两天内它都是可消费的。两天后,消息会被清除以释放空间。数据规模大小并不影响Kafka的性能,保存大量的数据对Kafka来说并不是个问题。 实际上,每个consumer唯一需要维护的数据就是consumer在日志中消费到的位置,也就是offset。offset是由consumer来维护的。一般情况下,consumer每读取一条消息,offset的值就向前增加一次。但是事实上consumer可以自由控制offset信息,它可以按任意的顺序读取消息。比如,consumer可以将offset的值重置为一个比较旧的位置来进行重新处理。 这些特性意味着kafka的consumer可以非常灵活——它们可以随意进出而不影响集群或者其它的consumer。举例说,用户可以在命令行中使用tail命令来追踪任意topic的内容,而不用担心影响正在访问topic的consumer。 对日志进行分区有这样几点作用: 首先,可以使日志的规模保持在一个指定的范围内,以能保存到单独的服务上。每个独立的分区必须能够适配它所在的服务节点。一个topic可以有多个分区,从而可以承载任意规模的数据。 第二,可以作为并行处理的单元(这点稍后讨论)。 2. 分布式 日志的分区分布式地保存在kafka集群的服务节点上。这样每个服务节点都可以请求数据管理数据。每个分区在一定数量的服务节点上都有副本以进行容错,并且这个数量是可配置的。 对于每个分区,都有一个服务节点扮演着leader的角色,其他的零个或者多个服务扮演着follower的角色。leader负责处理分区的读写请求,而follower只是被动地维护leader的副本。一旦leader出现了故障,一个follower就会自动成为新的leader。每个服务节点都可能同时扮演leader和follower两种角色。即一个服务节点可能是它上面一些分区的leader,同时也可能是其他分区的follower。就是这样kafka在集群内部实现了负载均衡。 3. Producer producer负责发布数据到其所选择的topic中,并选择将数据分配给topic的哪个分区。分区选择可以简单地由负载均衡以轮流制的方式实现,也可以通过一些特定的分区函数(取决于消息中的一些key)实现。通常使用的是第二种方式。 4. Comsumer 消息发布通常有两种模式:队列模式和发布订阅模式。在队列模式中,多个consumer(这里有个consumer pool的概念)从一个服务读取消息,每个消息只能被其中的一个consumer读到。在发布订阅模式中,消息会广播给所有的consumer。kafka提供了一个抽象的consumer概念即consumer group,从而同时实现了这两种方案。 每个consumer可以加入一个consumer group。consumer group订阅发布到topic的消息,而后将消息传送给其下面的一个consumer实例。所有的consumer实例可以运行在不同的进程上,也可以在不同的独立的机器上。 如果所有的consumer实例属于同一个consumer group,这样的工作模式在consumer中实现了负载均衡,类似常规的消息队列模式。 如果所有的consumer实例分别属于不同的consumer group,每个消息会被广播给所有的consumer,这样的模式类似于发布订阅模式。 不过更常见的是,每个topic都会有若干consumer group,一个consumer group就是一个逻辑上的“订阅者”。每个consumer group都是由多个consumer实例组成,从而获得了更好的稳定性和容错能力。这其实也是一个发布订阅模式,只不过订阅者不只是一个进程,而是由多个consumer组成的集群。 上图展示了一个kafka集群,其中包含两台服务器、四个分区(P0~P3)、以及两个consumer group。其中group A有两个consumer实例,B有四个。 相比常规的消息系统,kafka可以更好的保证有序性。 常规的队列模式在服务器上有序的保存消息。当多个consumer需要从队列中获取消息的时候,服务器会按照消息保存的顺序将消息取出。然而,尽管服务器是按顺序将消息取出的,消息却是异步地被分发至consumer的,当消息到达不同的consumer的时候可能已经失去了顺序。 这意味着在并发消费时将导致消息的顺序错乱。为了解决这个问题,消息系统通常会采用“exclusive consumer(独有消费)”的概念,即只允许一个进程从队列读取消息,当然这就意味着失去了并发性。 kafka提供了一个很好的解决方案。通过topic内部的分区概念,在处理多个consumer并发时,kafka可以实现有序性和负载均衡。 kafka将topic的每个分区只分发给一个consumer group,  这样一个分区就只能被这个group中的一个consumer消费,并且可以保证消费的顺序性。因为有很多分区,所以仍然可以在多个consumer实例中实现负载均衡。请注意consumer实例的数量不应该比分区的数量多。 kafka只是在一个分区内提供了消息的有序性,而在一个topic不同的分区间是不能保证有序性的。然而对于大部分的应用程序来说这已经足够了。如果要保证topic中所有消息的有序性,那么只能让一个topic中只有一个分区,当然这也意味着只能有一个消费者进程。 5. 保证 在某个层面上 ,kafka提供了如下保证: kafka会保证producer发给topic指定分区的消息按照发送顺序追加到日志中。即是说,如果消息M1和M2先后被同一个producer发送,如果M1先发送,那么在分区日志中M1的offset要比M2小,并且更早出现在分区日志中; kafka可以保证consumer消费消息的顺序和消息在分区日志中存储的顺序一致; 如果一个topic的“replication factor”是N,那么Kafka可以保证在N-1个服务器失效后仍不丢失提交给分区的消息。 更多细节可以在文档的“设计原理”这一章查看。 使用场景 这里列出了Kafka的几种常见使用场景。如果想对kafka在实战中的使用有个概括的了解,可以看一下这篇博客。 1. 消息处理 kafka可以作为常规的MessageBroker。MessageBroker有多种用途(将数据从producer方解耦,缓存未处理的消息等等)。相比大多数消息系统,kafka有更好的吞吐能力、内置的分区机制、冗余备份机制、以及不错的容错能力,因此kafka适用于大部分消息处理需求。 在我们的经验中,一般消息处理的吞吐量相对不大,但是要求较低的端到端延迟,并且需要kafka提供的良好的稳定性保障。 在消息处理这个领域,kafka可以比得上常规的消息系统,比如ActiveMQ和RabbitMQ。 2. 网站活动跟踪 kafka最早的用处就是作为一个实时发布订阅系统来构建用户行为追踪管道。这意味着网站上的活动(网页浏览、搜索以及其他用户行为)都会被发布到中央topic集合,每个topic代表了一种行为类型。收集的这些数据可以在订阅后支持多种用途比如实时处理、实时监控、加载到hadoop系统或者离线数据仓库系统作离线处理或者生成报表。 活动追踪通常需要很大的空间容量,因为用户每访问一个网页都会产生大量的活动数据。 3. Metrics(运营评估) kafka还经常会被用来处理运营监控数据。这涉及到聚合多个分布式应用上的数据来生成运营分析数据。 4. 日志聚合 很多人使用kafka作为日志聚合解决方案。日志聚合通常就是从各个服务器收集物理日志文件并将之集中到一个地方(比如文件服务器或者HDFS)去做处理。kafka并不关注文件的细节,它提供了一个更简洁的消息数据流的抽象概念来处理日志或者事件数据。这就实现了低延迟处理、简单的多数据源支持以及分布式数据消费。相比一些日志收集系统比如Scribe或者Flume,kafka有着不错的性能,更低的端到端延迟,并通过冗余复制提供了更健壮的持久性保障。 5. 流处理 用户经常需要对数据做一些阶段性的处理:从topic获取原始数据,而后经过汇总、丰富或者以其他方式生成新的topic为进一步的数据消费做准备。举个例子:一个文章推荐工作流程首先需要从RSS订阅源抓取文章内容并发布为一个名为“article”的topic;而后对从“article”中获取的所有文章进行整理并作去重处理后发布为新的topic,最后的工作就是尝试进行内容匹配并推荐给用户。这里描述了一个实时数据处理流程图。Storm 和 Samza 是两个用来做这些工作的常见框架。 6. Event Sourcing(事件溯源) 事件溯源是一种软件设计模式。在这种模式下,每一次数据状态的变化都会被记录到一个时间顺序的记录中。kafka支持海量数据的特性为这种设计模式提供了一个不错的选择。 7. Commit Log 对于分布式系统,kafka可以被用于提供额外的日志提交服务。日志可以被用来在节点间复制数据,也可以作为重新同步机制以在节点故障后恢复数据。kafka的日志压缩特性支持了这方面的应用。在这一点上,kafka和Apache BookKeeper项目有些类似。 三. 生态圈 在kafka的主项目外,还有大量的支持工具。在ecosystem page上,列出了这些工具,包括流处理系统、hadoop聚合接口、监控工具、部署工具等。

    [阅读更多...]