最近项目中用到了Kafka0.9,在使用0.9的Consumer API的时候遇到了poll()方法阻塞的问题。程序没有报任何错误,只是持续在poll()方法处阻塞。深入poll()方法可以看到是在AbstractCoordinator.ensureCoordinatorKnown()方法中出现了死循环。在循环中不停地输出如下DEBUG日志: 需要关注的是这处信息: 看样子是kafka的连接出了问题。不过我的Producer向kafka写数据是没问题的,使用kafka提供的消费工具kafka-console-consumer.sh执行消费也是没问题的。 在网上找到了一些关于这个现象的解释:在客户端进行消费之前会为ConsumerGroup向Kafka集群申请coordinater节点。kafka集群在配置或分配coordinater节点的时候可能会短暂的报这个错误。 我这里不是短暂的报错,而是陷入了死循环。目前可以想到的就是我的kafka集群配置出现问题了。在简单粗暴地将zookeeper上kafka的配置完全删掉再重启Kafka后,消费可以正常执行了。至于问题具体出在哪儿还没有找到。目前只能是持续关注,等问题再次出现了。 就这样。 还有一点,在查询解决方案的过程中看到:如果kafka是部署在Docker上,出现了这样的问题需要检查有没有配置环境变量ADVERTISE.HOST.NAME和ADVERTISE.HOST.PORT。 ######
[阅读更多...]-
kafka0.9 Consumer poll()方法阻塞
-
获取Kafka Consumer的offset
从kafka的0.8.11版本开始,它会将consumer的offset提交给ZooKeeper。然而当offset的数量(consumer数量 * partition的数量)的很多的时候,ZooKeeper的适应性就可能会出现不足。幸运的是,Kafka现在提供了一种理想的机制来存储Consumer的offset。Kafka现在是将Consumer的offset写入到一个分布式持久化的、高可用的topic中。开发者可以通过消费这个topic的方式来获取Consumer的offset。为了提升访问速度,kafka还提供了offset的内存缓存。也就是说,现在提交offset是通过普通的生产者请求(代价并不高)来完成的,而获取offset则是通过在内存中的快速查询完成的。 Kafka的官方文档描述了这个特性是如何工作的,以及如何将offset从zookeeper迁移到kafka。下面的代码演示了如何利用基于kafka存储offset的特性。 第一步:通过发送consumer元数据请求到任意Broker来发现并连接offset manager: 第二步:发送OffsetCommitRequest 或者 OffsetFetchRequest到offset manager: 原文:https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka ####
[阅读更多...] -
HBase连接异常:KeeperErrorCode = OperationTimeout
手上的一个HBase相关的服务在重启后开始报错(重启前运行良好),错误信息如下: 错误信息提示连接zookeeper时间超长。经检查是因为有一个zookeeper节点已经停止运行(zk3.com),和运维商量后将zookeeper故障节点删除,并移掉了另一个节点,剩下了3个节点。修改配置文件并重启后服务恢复。 #######
[阅读更多...] -
使用HBase Coprocessor
HBase的Coprocessor是模仿谷歌BigTable的Coprocessor模型实现的。 Coprocessor提供了一种机制可以让开发者直接在RegionServer上运行自定义代码来管理数据。 首先必须要指明使用Coprocessor还是存在一些风险的。Coprocessor是HBase的高级功能,本来是只为HBase系统开发人员准备的。因为Coprocessor的代码直接在RegionServer上运行,并直接接触数据,这样就带来了数据破坏的风险,比如“中间人攻击(Man-in-the-MiddleAttack,简称“MITM攻击”,见百度词条)”以及其他类型的恶意入侵。目前还没有任何机制来屏蔽Coprocessor导致的数据破坏。此外,因为没有资源隔离,一个即使不是恶意设计的但表现不佳的Coprocessor也会严重影响集群的性能和稳定性。 通常我们访问HBase的方式是使用scan或get获取数据,使用Filter过滤掉不需要的部分,最后在获取到的数据上进行业务运算。但是在数据量非常大的时候,比如一个有上亿行及十万个列的数据集,再按常用的方式移动获取数据就会在网络层面遇到瓶颈。客户端也需要有强大的计算能力以及足够的内存来处理这么多的数据。此外,这也会使客户端的代码变得庞大而复杂。 这种场景正是Coprocessor可以发挥作用的地方。我们可以将业务运算代码封装到Coprocessor中并在RegionServer上运行,即在数据实际存储位置执行,最后将运算结果返回到客户端。 如下的一些理论可以帮助我们理解Coprocessor是如何发挥作用的: 触发器和存储过程:一个Observer Coprocessor有些类似于关系型数据库中的触发器,通过它我们可以在一些事件(如Get或是Scan)发生前后执行特定的代码。Endpoint Coprocessor则类似于关系型数据库中的存储过程,因为它允许我们在RegionServer上直接对它存储的数据进行运算,而非是在客户端完成运算。 MapReduce:MapReduce的原则就是将运算移动到数据所处的节点。Coprocessor也是按照相同的原则去工作的。 AOP:如果熟悉AOP的概念的话,可以将Coprocessor的执行过程视为在传递请求的过程中对请求进行了拦截,并执行了一些自定义代码。 Coprocessor类型 Coprocessor可以分为两大类:Observer Coprocessors(观察者)和EndPoint Coprocessor(终端)。 Observer Coprocessors Observer Coprocessor在一个特定的事件发生前或发生后触发。在事件发生前触发的Coprocessor需要重写以pre作为前缀的方法,比如prePut。在事件发生后触发的Coprocessor使用方法以post作为前缀,比如postPut。 Observer Coprocessor的使用场景如下: 安全性:在执行Get或Put操作前,通过preGet或prePut方法检查是否允许该操作; 引用完整性约束:HBase并不直接支持关系型数据库中的引用完整性约束概念,即通常所说的外键。但是我们可以使用Coprocessor增强这种约束。比如根据业务需要,我们每次写入user表的同时也要向user_daily_attendance表中插入一条相应的记录,此时我们可以实现一个Coprocessor,在prePut方法中添加相应的代码实现这种业务需求。 二级索引:可以使用Coprocessor来维持一个二级索引。这里暂不展开,有时间会单独说明。 根据作用的对象,Observer Coprocessor有如下几种:RegionObserver、RegionServerObserver、MasterObserver和WalObserver。我们可以通过这些Observer来处理其观察的对象的操作,比如可以通过RegionObserver处理Region相关的事件,如Get和Put操作。 Endpoint Coprocessor Endpoint Coprocessor可以让开发者在数据本地执行运算。一个典型的案例:一个table有几百个Region,需要计算它的运行平均值或者总和。 Observer Coprocessor中代码的执行是相对透明的,而对于Endpoint Coprocessor,则需要显式的调用Table, HTableInterface或者HTable中的CoprocessorService()方法才能使之执行。 从0.96版本开始,HBase开始使用Google的protobuff。这对Endpoint Coprocessor的开发多少有一些影响。Endpoint Coprocessor不应该使用HBase内部成员,尽量只使用公共的API,最理想的情况应该是只依赖接口和数据结构。这样可以使开发的Endpoint Coprocessor更加健壮,不会受到HBase内核演进的干扰。注释为private或evolving的HBase内部API在删除前不必遵守关于deprecate的语义版本规则或相关的一般java规则。而使用protobuff生成的文件不会受到这些注释的影响,因为这些文件是用protoc工具自动生成的。在生成时这些文件时,protoc不知道也不会考虑HBase是如何工作的。 装载和卸载Coprocessor 要使用Coprocessor,就需要先完成对其的装载。这可以静态实现(通过HBase配置文件),也可以动态完成(通过shell或Java API)。 静态装载和卸载Coprocessor 按以下如下步骤可以静态装载自定义的Coprocessor。需要注意的是,如果一个Coprocessor是静态装载的,要卸载它就需要重启HBase。 静态装载步骤如下: 1. 在hbase-site.xml中使用<property>标签定义一个Coprocessor。<property>的子元素<name>的值只能从下面三个中选一个: hbase.coprocessor.region.classes 对应 RegionObservers和Endpoints; hbase.coprocessor.wal.classes 对应 WALObservers; hbase.coprocessor.master.classes 对应MasterObservers。 而<value>标签的内容则是自定义Coprocessor的全限定类名。 下面演示了如何装载一个自定义Coprocessor(这里是在SumEndPoint.java中实现的),需要在每个RegionServer的hbase-site.xml中创建如下的记录: 如果要装载多个类,类名需要以逗号分隔。HBase会使用默认的类加载器加载配置中的这些类,因此需要将相应的jar文件上传到HBase服务端的类路径下。 使用这种方式加载的Coprocessor将会作用在HBase所有表的全部Region上,因此这样加载的Coprocessor又被称为系统Coprocessor。在Coprocessor列表中第一个Coprocessor的优先级值为Coprocessor.Priority.SYSTEM,其后的每个Coprocessor的值将会按序加一(这意味着优先级会减降低,因为优先级是按整数的自然顺序降序排列的)。 当调用配置的Observer Coprocessor时,HBase将会按照优先级顺序依次调用它们的回调方法。 2. 将代码放到HBase的类路径下。一个简单的方法是将封装好的jar(包括代码和依赖)放到HBase安装路径下的/lib目录中。 3. 重启HBase。 静态卸载的步骤如下: 1. 移除在hbase-site.xml中的配置。 2. 重启HBase。 3. 这一步是可选的,将上传到HBase类路径下的jar包移除。 动态装载Coprocessor 动态装载Coprocessor的一个优势就是不需要重启HBase。不过动态装载的Coprocessor只是针对某个表有效。因此,动态装载的Coprocessor又被称为表级Coprocessor。 此外,动态装载Coprocessor是对表的一次schema级别的调整,因此在动态装载Coprocessor时,目标表需要离线。 动态装载Coprocessor有两种方式:通过HBase Shell和通过Java API。 在下面介绍关于动态装载的部分,假设已经封装好了一个coprocessor.jar的包,里面包含实现代码及所有的依赖,并且已经将这个jar上传到了HDFS中。 通过HBase Shell动态装载和卸载 装载步骤如下 1. 在HBase Shell中disable 掉目标表 2. 使用类似如下的命令加载Coprocessor 简单解释下这个命令。这条命令在一个表的table_att中添加了一个新的属性“Coprocessor”。使用的时候Coprocessor会尝试从这个表的table_attr中读取这个属性的信息。这个属性的值用管道符“|”分成了四部分: 文件路径:文件路径中需要包含Coprocessor的实现,并且对所有的RegionServer都是可达的。这个路径可以是每个RegionServer的本地磁盘路径,也可以是HDFS上的一个路径。通常建议是将Coprocessor实现存储到HDFS。HBASE-14548允许使用一个路径中包含的所有的jar,或者是在路径中使用通配符来指定某些jar,比如:hdfs://<namenode>:<port>/user/<hadoop-user>/ 或者 hdfs://<namenode>:<port>/user/<hadoop-user>/*.jar。需要注意的是如果是用路径来指定要加载的Coprocessor,这个路径下的所有jar文件都会被加载,不过该路径下的子目录中的jar不会被加载。另外,如果要用路径指定Coprocessor时,就不要再使用通配符了。这些特性在Java API中也得到了支持。 类名:Coprocessor的全限定类名。 优先级:一个整数。HBase将会使用优先级来决定在同一个位置配置的所有Observer Coprocessor的执行顺序。这个位置可以留白,这样HBase将会分配一个默认的优先级。 参数(可选的):这些值会被传递给要使用的Coprocessor实现。这个项是可选的。 3. enable这个表 4. 检验Coprocessor是否被加载 Coprocessor可以在TABLE_ATTRIBUTES中找到。 加载步骤就是这样。 卸载步骤如下 1. disbale目标表 2. 使用alter命令移除掉Coprocessor 3. enable目标表 使用Java API动态装载和卸载 装载方式如下 针对不同版本的HBase会有不同的JavaAPI。幸运的是有一个全版本的Java API。下面的代码演示了是如何使用Java API来装载Coprocessor的: 0.96及更高版本的HBase还有另一套API。在这套API里,HTableDescriptor的addCoprocessor()方法提供了一种更简单的方式来动态加载Coprocessor: 卸载方式如下: 卸载方式就是重新加载表定义信息。重新加载的时候就不需要再使用setValue()方法或者是addCoprocessor()方法设置表的Coprocessor信息了: 对于0.96及更高版本的HBase,可以使用HTableDescriptor类的removeCoprocessor()方法。 Coprocessor示例程序
[阅读更多...] -
使用BufferedMutator
org.apache.hadoop.hbase.client.BufferedMutator主要用来对HBase的单个表进行操作。它和Put类的作用差不多,但是主要用来实现批量的异步写操作。 BufferedMutator替换了HTable的setAutoFlush(false)的作用。 可以从Connection的实例中获取BufferedMutator的实例。在使用完成后需要调用close()方法关闭连接。对BufferedMutator进行配置需要通过BufferedMutatorParams完成。 MapReduce Job的是BufferedMutator使用的典型场景。MapReduce作业需要批量写入,但是无法找到恰当的点执行flush。BufferedMutator接收MapReduce作业发送来的Put数据后,会根据某些因素(比如接收的Put数据的总量)启发式地执行Batch Put操作,且会异步的提交Batch Put请求,这样MapReduce作业的执行也不会被打断。 BufferedMutator也可以用在一些特殊的情况上。MapReduce作业的每个线程将会拥有一个独立的BufferedMutator对象。一个独立的BufferedMutator也可以用在大容量的在线系统上来执行批量Put操作,但是这时需要注意一些极端情况比如JVM异常或机器故障,此时有可能造成数据丢失。 如下是几个使用BufferedMutator的实例。 实例一: 这段代码演示了创建BufferedMutator的一般过程。如前文所说,BufferedMutator的配置通常通过BufferedMutatorParams完成。获取BufferedMutator实例则是通过Connection对象。在这里还调整了一个writeBuffer的设置,这里等于是覆盖了HBase配置文件中的“hbase.client.write.buffer”设置。 实例二: 这段代码只看前半部分即可。与实例一的不同在于这里新建了一个ExceptionListener接口实现,并添加到BufferedMutatorParams中替换了BufferedMutatorParams的默认ExceptionListener实现。 实例三: 这里是一段BufferedMutator测试代码。 实例四: 实例五: 如有必要,稍后抽时间对代码做些简单说明。 ######
[阅读更多...] -
译文:HBase File Locality in HDFS
Hadoop中一个不明确的内容就是Block复制:它自动完成,通常不需要用户关心。HBase将数据保存到HDFS,并完全相信它的安全性。正是因为HDFS的Block复制对HBase来说是完全透明的,就产生了一个问题:HBase的效率会受到多大的影响?当我们开始写MapReduce作业访问HBase和Hadoop的时候,就难免会想到这个问题。尤为关键的是,当HBase中存储的数据很多的时候,它怎样使数据距离需要的地方更近一些?这就涉及到HBase的数据是怎样在HDFS上存储了。 首先,我们看看Hadoop是如何处理这个问题的。在MapReduce文档中说明了task通常运行在距离其处理的数据比较近的位置。这主要是通过将HDFS中的数据拆分成小块实现的,这些小块的数据被称为Block。HDFS中的Block的大小要比文件系统的Block大得多:默认是64M,但是通常会被设置为128M(这个值还可以设置得更大一些——如果确认所有文件的size都大于单个Block)。每个Block对应一个map task。这也就意味着Block的size设置得越大,Block的数量就越少,需要的map task的数量就越少。Hadoop知道每个Block的存储位置,它会在存储Block的节点上直接运行map task。每个Block都有两到三个副本。事实上,hadoop选择的节点通常是存储Block副本的节点。这样Hadoop保证了MapReduce作业总是在本地处理数据。 现在回到HBase。既然已经知道Hadoop是如何让它的每个map task处理本地数据的,就可以再进一步思考下HBase是如何做到Data Locality的。要是读过关于HBase存储结构的文章的话,就知道HBase是将相关的文件保存在HDFS上。这些文件包括数据文件(HFile)和日志文件(WAL)。在源码里也能看到HBase是调用了FileSystem.create(Path path)方法来创建这些文件的。现在可以想想两种常见的访问模型:1.直接随机访问;2.使用MapReduce扫描全表。我们会很好奇HBase是怎样就近读取HDFS上的Block,从而提升这两种访问方式的效率的。 话说回来,如果Hadoop和Hbase没有在同一个集群上,而是分隔开的,就不要再想Data Locality了——这根本不可能做到。这就如同运行一个独立的MapReduce集群,而它根本无法在DataNode上执行task。所以要实现Data Locality就必须让相关的服务在一个同一个集群上,包括Hadoop(或者说是HDFS)、MapReduce和HBase。就是这样。 好了,是不是弄清了所有的服务都在同一个集群(希望是一个比较大的集群)上?弄清了,那就继续。在HBase访问数据的时候,Hadoop是怎样找到数据在哪儿呢?这样又回到了前面提到的两种访问模型上。这涉及到一个概念:RegionServer。不管是直接随机访问还是扫描全表,都是通过相同的API实现的。正如之前所提到的,HBase仅仅是执行文件的保存,而文件的分布和Block的复制是通过HDFS的DatNode来实现的。假设一个场景:在向HBase中存储了大量的数据后停止HBase服务并连续地对其进行重启。RegionServer在重启后会被分配随机数量的Regions。在这个时候,HBase的Data Locality是无法保证的。为什么呢? 最重要的是HBase不会频繁重启,并且会定期进行内部维护。随着时间的推移,数据不停地增加,HBase会执行compact来重写文件。由于各种原因,文件一旦写入HDFS就是不可变的。因此,数据会不停地写入新的文件。随着数据文件越来越多,HBase会将这些数据文件压缩(compact)合并成另一组新的文件。这里是最让人惊奇的地方:HDFS足够聪明,它知道将数据放到被需要的地方。这是怎么做到的呢?我们需要深入Hadoop的源码来看看HBase调用的FileSystem.create(Path path)方法是怎么工作的。因为我们选择的HBase存储方案是HDFS,所以我们实际上调用的是DistributedFileSystem.create(Path path)方法,该方法的代码如下: 这个方法返回了一个FSDataOutputStream类型的实例。create这个实例的方法如下: 这个方法中使用了一个DFSClient实例作为纽带来连接client和NameNode: 最终返回的是一个DFSClient.DFSOutputStream实例。数据不停地写入到DFSOutputStream,DFSClient会将之收集起来打成包作为一个Block写入到DataNode。这个过程是由DFSClient.DFSOutputStream.DataStreamer实现的,它以守护进程的形式在后台运行。接下来我们一步一步的推断出这个过程具体是怎么实现的。首先我们看一下DataStreamer后台线程的run()方法,它获取了存储数据的DataNode的列表: 而这个方法又调用了如下的代码: 接下来看看locateFollowingBlocks()又调用了哪个方法: 这里就是关键了。这里使用namenode对象添加了一个新的Block,方法中使用的src参数表示要写入的文件,clientName表示DFSClient实例的名称。接下来跳过部分不太重要的内容,直接看看稍后一些关键的步骤: 最后也是最为核心的代码,就是replicator.chooseTarget() 方法的详情了: 接回上面的内容,我们已经启动了一个DFSClient实例并创建了一个写满数据的序列化文件。当要将数据文件存储为Block的时候,上面的代码首先检查是否可以将Block保存在client所在的本地主机(代码中的“writer”)——这是程序中“case 0”的内容。而“case 1”的部分表示程序尝试在另一个机架上保存一个远程备份。后面“case 2”优先选择一个与“case 0”中相同的机架,然后才考虑选择不同的机架。如果还需要更多的备份,则会随机选择一个机架的任意节点保存。 这就表示,在完成一次对所有表的major compaction(可以是手动触发也可以在配置文件中配置)以后,RegionServer运行的时间越长,它将数据保存在本地节点的概率就越大。和RegionServer在相同物理主机上的DataNode会保留一份这个RegionServer需要的所有数据的拷贝。这样可以保证在执行scan、get或其他任何操作时有最好的性能。 最后,要想全面地了解HDFS的设计和数据冗余备份的内容请移步Hadoop官网:HDFS Architecture。也请注意到HBase团队一直在重新设计HMaster分配Region给RegionServer的方案。新的方案将会把Regions分配给拥有Block最多的RegionServer。这调整对改善HBase重启后的DataLocality特别有用。 原文: HBase File Locality in HDFS:http://www.larsgeorge.com/2010/05/hbase-file-locality-in-hdfs.html #####
[阅读更多...] -
HBase Region Locality
因为DataNode和RegionServer通常会部署在相同的机器上,所以会产生Locality这样的概念。 HBase的Locality是通过HDFS的Block复制实现的。在复制Block时,HBase是这样选择副本的位置的: 第一个副本写到本地节点上; 第二个副本写到另一个机架的随机节点上; 第三个副本写到相同机架的一个随机选择的其他节点上; 如果还有更多的副本,这些副本将会写到集群上的随机节点上。 就是这样,在flush或compact后,HBase的Region实现了Locality。 当一个RegionServer处在failover的情况下(rebalance或重启)时,可能会分配到一些没有本地StoreFiles的Region(因为此时没有可用的本地副本)。然而,有新数据再写入这些Region的时候,或者是对表进行compact的时候,StoreFiles将会被重写,这些Region也会再次变成RegionServer的“local”Region。 有一个相关的指标“data locality”,即Region保存在本地的StoreFile的百分比。这个指标影响了major compact的执行。 #########
[阅读更多...] -
使用HBase总结
前段时间我们在项目中使用了HBase,在这里记一下使用经历或者说踩过的坑。 RowKey设计 我们读取数据的方式主要是批量查询,因此在最初的设计中就将大部分查询字段放在了RowKey上,目的是利用RowKey作为索引的特性。 关于RowKey的设计,通常有要求唯一性、散列性以及尽量短。我们在设计RowKey时也尽量地按照这三个原则去做。为了保证唯一性,也想过直接将一个32个字符的原始记录ID(UUID)放到RowKey里,但这样必然会导致行键特别长,所以我们选择使用“查询字段(3个)+服务器标记+机器时间(秒级)+顺序号”这样的行键。为了进一步压缩RowKey,又对“机器时间+顺序号”的数字组合做了32进制的换算。加上为了实现预分区而添加的分区提示符,最终得到的RowKey方案是“分区提示符+查询字段+服务器标记+机器时间(秒级)+顺序号”这样的组合,总长度是31位。 不过这样的设计是存在一个弊端的:因为没有将原始记录ID添加到RowKey里,所以无法在重写数据时保证唯一性,只好在查询完成后再做排重处理。如果直接使用原始记录ID作为RowKey,就无法再利用RowKey作为索引的能力。曾经考虑过再新建一个表专门用来维护RowKey和原始记录ID的关系,也就是作为二级索引。可是这样增加了写入数据的复杂度,后来就放弃了。 另一个问题就是这样设计出来的RowKey仍然是太长了,间接导致了后来的一个非常严重的问题。 再补充一点:假设查询字段为“用户ID+事件时间+标签ID”的组合。因为数据老化的速度比较快,可以考虑将第二个查询字段设置为(Long.MAX_VALUE – 事件时间)。因为RowKey是按字典顺序排序,这样做可以使同一用户的最新记录排在前面。 建表 我们的数据的特点是量非常大,老化速度比较快,需要长期保存。根据这个特点我们进行了按月分表。并在建表的同时做了预分区。 关于预分区,如何选择SplitKey是一个问题。根据一开始的RowKey设计方案无法计算出SplitKey,所以又在RowKey前面添了一个字符作为分区引导符(如1-9,A-Z,a-z)。分区引导符是用原始记录ID的hashcode做求余运算后得到的数值(注意:这样并不能做到很好的散列)。SplitKey自然就是这些分区引导符了。然而很快就发现使用一个字符做分区引导符有点不够用,在运行了一天后,开始出现了新的SplitKey,只好又为分区引导符添加了一个字符。 如果分区数量持续增加到两个字符的分区引导符也不够用的情况,就考虑将两个字符换成一个short型的值(最大为32767),虽然不是可视字符但能省下一些空间。 此外,为了减少空间占用,在建表时对所有列族设置了SNAPPY压缩。 列设计 现在使用的列设计方案是:将数据的字段分成三大类对应三个列族,列族名称是:cf1111,cf222222,cf33333(名称随手写的,但长度是对的)。而后每个字段作为一个qualifier,其中一个表有22个列(qualifier)。列名就是字段的原始名称。 看到这里有经验的朋友应该会指出问题了:列族和列的名称太长了。另外我这里的列也有些多,再加上RowKey的长度,然后想想HFile的结构——最终导致的问题是数据占用的空间膨胀了大约8倍(相对SequenceFile存储),如果再运行一个月就极有可能耗尽HBase的存储空间。 遇到这个问题后,开始重新考虑该如何设计了。我们的需求就是存储日志,并按要求查询导出日志。如果仅从这个需求出发,实在没必要分出这许多列族和列。更彻底一些的做法是:既然将查询条件都放到了RowKey上,存储数据的时候只需要使用单列族单列保存原始的记录即可。只不过这样的做法也太短视了一些。 写数据 为了提升写数据的效率,在写数据时做了如下操作: 关闭了auto flush,设置恰当的WriteBufferSize; 执行批量put; 做了预分区,避免写数据时产生split(split会造成region短暂的停止); 关闭定期major compact的功能,仅在必要的时候使用Java API手动执行major compact; 使用BulkLoad导入历史数据。 查询 为了提升查询效率做了如下事情: 优化RowKey设计,在查询时尽量做到面向行键查询,尽量利用StartRow和StopRow; 在scan时指明要查询的列; 将经常要一起查询的字段放到同一个列族里; 使用恰当的FIlter; 调大查询服务的xmx; 根据需求合理设置ScanCaching,通过这个参数决定client和HBase每次交互的数据量; 关闭BlockCache,我们的查询主要是用来导出数据,不存在反复查询某些记录的情况,所以可以考虑关闭BlockCache。 另外,在查询实现上做了一件极蠢的事情,就是设计实现了一个自定义Filter。不过这个自定义的Filter压根就没有发挥作用,因为不可能为了上传自定义Filter而重启生产环境的HBase。 ——— 好了,现就这样。以后想到了再继续写。 ##########
[阅读更多...] -
HBase Bulk Load
概述 BulkLoad是一种高效写入HBase的方式,适用于将数据批量迁移到HBase。 BulkLoad使用MapReduce作业直接生成HBase的StoreFile,并将生成的StoreFile直接装载入正在运行的HBase集群。较之使用HBase的API,使用BulkLoad耗费的CPU和网络资源都相对较少。 因为BulkLoad绕过了正常写数据的路径(WAL、MemStore、flush),尤其是WAL,通过WAL进行的Cluster Replication就不会处理BulkLoad装载的数据。这很像是调用HBase API时使用了Put.setDurability(SKIP_WAL)。一个解决方式是将原始文件或HFile移到Replication集群上再做其他处理。《Bulk Loaded HFile Replication》对这个问题做了讨论。 步骤 Bulk Load分成两步完成。 通过MapReduce作业准备数据 BulkLoad的第一步是在MapReduce作业中使用HFileOutputFormat2类生成HBase数据文件(StoreFile)。 为了使最终生成的每个HFile都能对应一个Region,需要在MapReduce作业中使用TotalOrderPartitioner类对map的输出结果进行partition,使之与Region的RowKey范围达到一致。幸运的是HFileOutputFormat2类的configureIncrementalLoad()已经做了这个工作,它会根据HBase表中现有的Region边界自动配置TotalOrderPartitioner。 载入数据到HBase集群 在准备好数据文件后,可以在命令行中使用completebulkload工具完成BulkLoad,命令如下: 也可以直接调用LoadIncrementalHFiles实例的doBulkLoad方法完成BulkLoad,: doBulkLoad方法也是completebulkload工具最终调用的方法。不同的是completebulkload工具会检查要写入的表是否存在,不存在的话会主动创建该表。直接调用doBulkLoad方法则需要手动做这些事情。 doBulkLoad方法会遍历MapReduce作业生成的每个数据文件,并决定将其分配给哪一个Region,随后联系接收数据的HRegionServer,将数据移动到HRegionServer上的存储目录,都做完后再通知client数据可用了。 如果在准备数据的时候或者是在装载数据到HBase集群的过程中,Region的边界发生了变化,LoadIncrementalHFiles会自动对数据文件进行split,并发送split后的文件到不同的Region。但是这样会影响导入数据的效率,尤其是在还有其他客户端同时写数据的时候。在执行BulkLoad的时候应当尽量避免这种情况发生。 ###########
[阅读更多...] -
hbase.fs.tmp.dir 导致的错误
在执行BulkLoad的时候报了如下的错误: 使用的HBase版本是1.1.2. 从错误日志中可以看到导致这个问题的是HFileOutputFormat2类中的这一行: 关键是“hbase.fs.tmp.dir”这个配置信息。注意这个配置不是“hbase.tmp.dir”。“hbase.tmp.dir”是本地文件系统上的一个目录,“hbase.fs.tmp.dir”是HDFS上的一个目录。 在hbase-default.xml中找到的对应配置信息如下: 问题在于这个配置对应的目录并不存在,根据一些建议手动设置了下这个配置: 就这样,问题修复了。 下面是一些关于这个问题的讨论: Set default value for hbase.fs.tmp.dir rather than fully depend on hbase-default.xml Use HDFS for HFileOutputFormat2 partitioner’s path ############
[阅读更多...]