在Spark上通过自定义RDD访问HBase

这里介绍一个在Spark上使用自定义RDD获取HBase数据的方案。

这个方案的基础是我们的HBase表的行键设计。行键设计大概是这样子的:标签ID+时间戳+随机码。平时的需求主要是导出指定标签在某个时间范围内的全部记录。根据需求和行键设计确定下实现的大方向:使用行键中的时间戳进行partition并界定startRow和stopRow来缩小查询范围,使用HBase API创建RDD获取数据,在获取的数据的基础上使用SparkSQL来执行灵活查询。

创建Partition

这里我们要自定义的RDD主要的功能就是获取源数据,所以需要自定义实现Partition类:

前面我们说过,主要是依赖行键中的时间戳来进行partition的,所以在自定义的QueryPartition类中保留了两个长整型构造参数start和stop来表示起止时间。剩下的构造参数idx作用是标记分区的索引。

这样,自定义RDD中的getPartitions()方法该如何实现也就很清楚了:

在上面代码中的第六行可以看到getPartitions()方法是按每小时一个区间进行partition的。

代码中的unit是一个查询单元,封装了一些必要的查询参数,包括存储数据的表、要查询的标签ID以及起止时间。大致是这样的:

注意,QueryUnit这个类需要实现Serializable接口。

查询HBase

因为要实现灵活查询的需求,所以需要将HBase表中符合需求的数据的所有列都取出来。我们可以考虑使用List[Map[String, String]]这样一种结构来暂时保存数据。根据这个需求实现的自定义HBaseClient的代码如下:

在代码中使用了typesafe的Config类来记录一些信息,比如zookeeper连接信息。

查询HBase这块儿没什么好说的。继续好了。

自定义RDD

前面的两节为自定义实现RDD类做了一些铺垫,包括进行partition的方式,以及一个查询hbase的工具类HBaseClient。实际上我们已经完成实现了自定义RDD的一个抽象方法getPartitions。

自定义RDD需要继承Spark的一个抽象类RDD。

在继承抽象类RDD的同时,需要为它提供两个构造参数,一个是SparkContext实例,一个是父RDD列表。我们要自定义的RDD的是用来获取源数据的,没有父RDD,所以父RDD列表可以直接设置为Nil。

抽象类RDD还有两个抽象方法需要实现,分别是getPartitions()和compute()。getPartitions()方法用来对原始的任务进行分片,可以将原始任务切割成不同的partition,以便进行分布式处理。compute()方法则是实现了对切割出的partition进行处理的逻辑。

getPartitions()方法的实现前面已经提过了,现在看看compute()方法的实现:

可以看到,compute()方法就是在getPartitions()方法创建的时间区间QueryPartition上对HBase中的表进行查询,并将查询出的结果封装成json字符串列表。

编写驱动类

至此,工作已经完成了大半,可以看看驱动类是怎么写的了:

驱动类的任务是在创建的QueryRDD上使用SparkSQL执行查询,并将查询结果保存到HDFS上。

代码中的SparkUtil只是将经常使用的初始化SparkContext以及执行Spark任务的行为封装了一下,是这样实现的:

好了,就这样!!

已有2条评论 发表评论

  1. prince /

    ConfigFactory类怎么配置的啊,help!

    1. robin / 本文作者

      HBase自带的类

发表评论