Spark快速入门

这是整理的Spark官网的《QuickStart》教程。在这篇教程里我们会先使用shell初步接触一下spark,然后再编写一个spark应用。我这里会优先使用scala来完成这些工作。如果想使用python或者java请直接移步原文

shell操作

基础

sprak shell可以让我们快速的熟悉相关的API,同时它也是一个强大的交互式数据分析工具。目前spark shell只支持scala和python两种语言,这里我们只使用scala。在spark根目录下执行如下语句:

./bin/spark-shell

spark最重要的抽象是一个分布式数据集合,叫做RDD( Resilient Distributed Dataset,弹性分布式数据集)。可以从hadoop的InputFormats(比如HDFS文件)中创建RDD,也可以转译其他的RDD为新的RDD。现在我们使用spark根目录下的README文件中的文本创建一个新的RDD:

scala> val textFile = sc.textFile("README.md")
textFile: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1] at textFile at <console>:24

RDD有两大类数据操作(也有人称之为算子,算子是RDD中定义的函数,可以对RDD中的数据进行转换和操作):分别是action(行为)和transformation(转换)。其中action返回的是值,transformation返回的是指向新的RDD的指针。

我们先从几个action开始:

scala> textFile.count()
res1: Long = 99                                                                 

scala> textFile.first()
res2: String = # Apache Spark

然后我们体验一下transformation,这里我们将会使用filter来返回README.md文件中所有包含“Spark”字样的行作为一个新的RDD。

scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:26

我们也可以将transformation和action连起来用:

scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
res3: Long = 19

关于RDD的更多操作

RDD的action和transformation还可以用来做一些比较复杂的计算。比如说我们想找出单词最多的一行:

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
res0: Int = 22

前面的map运算将计算出每行单词的个数,创建了一个新的RDD。然后在这个RDD上调用reduce方法来找出单词数最多的一个行。map和reduce的参数都是scala的函数值(闭包),可以使用任何语言特性或者是java或scala的库。举个例子说:我们可以简单调用其他地方声明的函数。下面我们使用Math.max()来让代码更容易理解些:

scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
res5: Int = 22

一个常见的数据流模型是MapReduce,也是因Hadoop而流行起来的。Spark可以很容易地实现MapReduce流:

scala> val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[8] at reduceByKey at <console>:26

这里我们结合了flatMap、map和reduceByKey三种transformation来计算文件中每个单词出现的次数,并生成了一个类型为(String, int)对的RDD。要获取RDD中每个单词出现的次数可以使用collect这个action:

scala> wordCounts.collect()
res6: Array[(String, Int)] = Array((package,1), (this,1), (Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version),1), (Because,1), (Python,2), (cluster.,1), (its,1), ([run,1), (general,2), (have,1), (pre-built,1), (YARN,,1), (locally,2), (changed,1), (locally.,1), (sc.parallelize(1,1), (only,1), (Configuration,1), (This,2), (basic,1), (first,1), (learning,,1), ([Eclipse](https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools#UsefulDeveloperTools-Eclipse),1), (documentation,3), (graph,1), (Hive,2), (several,1), (["Specifying,1), ("yarn",1), (page](http://spark.apache.org/documentation.html),1), ([params]`.,1), ([project,2), (prefer,1), (SparkPi,2), (<http://spark.apache.org/>,1), (engine,1), (version,1), (file,1), (documentation...

Caching

Spark也支持从集群范围的内存缓存中获取数据。这在需要重复访问数据时是很有用的,比如需要频繁重复查询一个的小型数据集时,或者进行类似PageRank这样的迭代运算时。下面我们演示下如何缓存并使用之前获取的linesWithSpark数据集:

scala>  val linesWithSpark = textFile.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at filter at <console>:26

scala> linesWithSpark.cache()
res8: linesWithSpark.type = MapPartitionsRDD[9] at filter at <console>:26

scala> linesWithSpark.count()
res9: Long = 19

scala> linesWithSpark.count()
res10: Long = 19

使用Spark来分析和缓存一个只有100行的文本文件看起来有点儿傻。真正重要的是:同样的函数也可以用在非常巨大的数据集上,即使是在由数十个或者在数百个节点的集群上也可以使用。可以按照这里的教程体验一下如何在集群上使用Spark的spark-shell。

自定义应用

这里我们会使用Spark API来编写一个自定义应用。示例程序使用Scala(使用了sbt)编写。这个应用会非常简单,实际上,应用名就叫做SimpleApp.scala:

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val conf = new SparkConf().setAppName("Simple Application")
    val sc = new SparkContext(conf)
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
  }
}

注意:这里应该定义一个main()方法,不要使用继承scala.App这样的方式。scala.App的子类可能不会正常工作。

这个程序只是分别统计了Spark README文件中包含单词‘a’和单词‘b’的行的总数。还得记得需要将程序中的YOUR_SPARK_HOME替换为你计算机上Spark程序安装的位置。

不像前面说的那些使用Spark shell的例子,它们使用的是Spark shell的SparkContext实例,我们初始化了一个SparkContext实例作为程序的一部分。

这里我们先定义了一个SparkConf对象,在这个对象里包含我们的应用的一些信息。然后我们将这个SparkConf对象传递给SparkContext构造器。

我们的应用需要依赖Spark API,所以我们还需要添加一个sbt配置文件:simple.sbt。在这文件中添加了Spark依赖:

name := "Simple Project"

version := "1.0"

scalaVersion := "2.11.7"

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.0.0"

为了让sbt正确工作,我们需要按照正确的目录结构放置SimpleApp.scala和simple.sbt这两个文件。当一切都安排妥当后,我么可以创建一个包含应用代码的JAR包,然后使用spark-submit脚本来运行我们的程序:

# Your directory layout should look like this
$ find .
.
./simple.sbt
./src
./src/main
./src/main/scala
./src/main/scala/SimpleApp.scala

# Package a jar containing your application
$ sbt package
...
[info] Packaging {..}/{..}/target/scala-2.11/simple-project_2.11-1.0.jar

# Use spark-submit to run your application
$ YOUR_SPARK_HOME/bin/spark-submit \
  --class "SimpleApp" \
  --master local[4] \
  target/scala-2.11/simple-project_2.11-1.0.jar
...
Lines with a: 46, Lines with b: 23	

参考文档

spark算子的作用:http://www.jianshu.com/p/4ff6afbbafe4

##########

发表评论

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理