Spark内核是由Scala语言开发的,因此使用Scala语言开发Spark应用程序是自然而然的事情。如果你对Scala语言还不太熟悉,可以阅读网络教程A Scala Tutorial for Java Programmers或者相关Scala书籍进行学习。
本文将介绍3个Scala Spark编程实例,分别是WordCount、TopK和SparkJoin,分别代表了Spark的三种典型应用。
1. WordCount编程实例
WordCount是一个最简单的分布式应用实例,主要功能是统计输入目录中所有单词出现的总次数,编写步骤如下:
步骤1:创建一个SparkContext对象,该对象有四个参数:Spark master位置、应用程序名称,Spark安装目录和jar存放位置,对于Spark On YARN而言,最重要的是前两个参数,第一个参数指定为“yarn-standalone”,第二个参数是自定义的字符串,举例如下:
1
2
|
val sc = new SparkContext(args( 0 ), "WordCount" ,
System.getenv( "SPARK_HOME" ), Seq(System.getenv( "SPARK_TEST_JAR" )))
|
步骤2:读取输入数据。我们要从HDFS上读取文本数据,可以使用SparkContext中的textFile函数将输入文件转换为一个RDD,该函数采用的是Hadoop中的TextInputFormat解析输入数据,举例如下:
1
|
val textFile = sc.textFile(args( 1 ))
|
当然,Spark允许你采用任何Hadoop InputFormat,比如二进制输入格式SequenceFileInputFormat,此时你可以使用SparkContext中的hadoopRDD函数,举例如下:
1
2
|
val inputFormatClass = classOf[SequenceFileInputFormat[Text,Text]]
var hadoopRdd = sc.hadoopRDD(conf, inputFormatClass, classOf[Text], classOf[Text])
|
或者直接创建一个HadoopRDD对象:
1
2
|
var hadoopRdd = new HadoopRDD(sc, conf,
classOf[SequenceFileInputFormat[Text,Text, classOf[Text], classOf[Text])
|
步骤3:通过RDD转换算子操作和转换RDD,对于WordCount而言,首先需要从输入数据中每行字符串中解析出单词,然后将相同单词放到一个桶中,最后统计每个桶中每个单词出现的频率,举例如下:
1
2
3
|
val result = hadoopRdd.flatMap{
case (key, value) = > value.toString().split( "\\s+" );
}.map(word = > (word, 1 )). reduceByKey ( _ + _ )
|
其中,flatMap函数可以将一条记录转换成多条记录(一对多关系),map函数将一条记录转换为另一条记录(一对一关系),reduceByKey函数将key相同的数据划分到一个桶中,并以key为单位分组进行计算,这些函数的具体含义可参考:Spark Transformation。
步骤4:将产生的RDD数据集保存到HDFS上。可以使用SparkContext中的saveAsTextFile哈数将数据集保存到HDFS目录下,默认采用Hadoop提供的TextOutputFormat,每条记录以“(key,value)”的形式打印输出,你也可以采用saveAsSequenceFile函数将数据保存为SequenceFile格式等,举例如下:
1
|
result.saveAsSequenceFile(args( 2 ))
|
当然,一般我们写Spark程序时,需要包含以下两个头文件:
1
2
|
import org.apache.spark. _
import SparkContext. _
|
WordCount完整程序已在“Apache Spark学习:利用Eclipse构建Spark集成开发环境”一文中进行了介绍,在次不赘述。
需要注意的是,指定输入输出文件时,需要指定hdfs的URI,比如输入目录是hdfs://hadoop-test/tmp/input,输出目录是hdfs://hadoop-test/tmp/output,其中,“hdfs://hadoop-test”是由Hadoop配置文件core-site.xml中参数fs.default.name指定的,具体替换成你的配置即可。
2. TopK编程实例
TopK程序的任务是对一堆文本进行词频统计,并返回出现频率最高的K个词。如果采用MapReduce实现,则需要编写两个作业:WordCount和TopK,而使用Spark则只需一个作业,其中WordCount部分已由前面实现了,接下来顺着前面的实现,找到Top K个词。注意,本文的实现并不是最优的,有很大改进空间。
步骤1:首先需要对所有词按照词频排序,如下:
1
2
3
|
val sorted = result.map {
case (key, value) = > (value, key); //exchange key and value
}.sortByKey( true , 1 )
|
步骤2:返回前K个:
1
|
val topK = sorted.top(args( 3 ).toInt)
|
步骤3:将K各词打印出来:
1
|
topK.foreach(println) |
注意,对于应用程序标准输出的内容,YARN将保存到Container的stdout日志中。在YARN中,每个Container存在三个日志文件,分别是stdout、stderr和syslog,前两个保存的是标准输出产生的内容,第三个保存的是log4j打印的日志,通常只有第三个日志中有内容。
本程序完整代码、编译好的jar包和运行脚本可以从这里下载。下载之后,按照“Apache Spark学习:利用Eclipse构建Spark集成开发环境”一文操作流程运行即可。
3. SparkJoin编程实例
在推荐领域有一个著名的开放测试集是movielens给的,下载链接是:http://grouplens.org/datasets/movielens/,该测试集包含三个文件,分别是ratings.dat、sers.dat、movies.dat,具体介绍可阅读:README.txt,本节给出的SparkJoin实例则通过连接ratings.dat和movies.dat两个文件得到平均得分超过4.0的电影列表,采用的数据集是:ml-1m。程序代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
import org.apache.spark. _
import SparkContext. _
object SparkJoin {
def main(args : Array[String]) {
if (args.length ! = 4 ){
println( "usage is org.test.WordCount <master> <rating> <movie> <output>" )
return
}
val sc = new SparkContext(args( 0 ), "WordCount" ,
System.getenv( "SPARK_HOME" ), Seq(System.getenv( "SPARK_TEST_JAR" )))
// Read rating from HDFS file
val textFile = sc.textFile(args( 1 ))
//extract (movieid, rating)
val rating = textFile.map(line = > {
val fileds = line.split( "::" )
(fileds( 1 ).toInt, fileds( 2 ).toDouble)
})
val movieScores = rating
.groupByKey()
.map(data = > {
val avg = data. _ 2 .sum / data. _ 2 .size
(data. _ 1 , avg)
})
// Read movie from HDFS file
val movies = sc.textFile(args( 2 ))
val movieskey = movies.map(line = > {
val fileds = line.split( "::" )
(fileds( 0 ).toInt, fileds( 1 ))
}).keyBy(tup = > tup. _ 1 )
// by join, we get <movie, averageRating, movieName>
val result = movieScores
.keyBy(tup = > tup. _ 1 )
.join(movieskey)
.filter(f = > f. _ 2 . _ 1 . _ 2 > 4.0 )
.map(f = > (f. _ 1 , f. _ 2 . _ 1 . _ 2 , f. _ 2 . _ 2 . _ 2 ))
result.saveAsTextFile(args( 3 ))
}
} |
你可以从这里下载代码、编译好的jar包和运行脚本。
这个程序直接使用Spark编写有些麻烦,可以直接在Shark上编写HQL实现,Shark是基于Spark的类似Hive的交互式查询引擎,具体可参考:Shark。
4. 总结
Spark 程序设计对Scala语言的要求不高,正如Hadoop程序设计对Java语言要求不高一样,只要掌握了最基本的语法就能编写程序,且常见的语法和表达方式是很少的。通常,刚开始仿照官方实例编写程序,包括Scala、Java和Python三种语言实例。
相关推荐
使用IDEA开发spark scala程序,配置idea开发工具,使用hadoop进行文件搜索
spark程序一般使用scala开发,此代码是java开发spark的示例代码。
通过java/python/scala开发spark程序,由浅入深,注重理论,也有代码样例
idea开发spark程序的环境搭建,idea+maven+spark+scala,详细介绍过程的每一步!详细介绍过程的每一步!详细介绍过程的每一步!
随着spark兴起,scala作为spark项目的开发语言也越来越受到大家的关注。这种函数式与面向对象的编程语言极大的简化了代码的编写量,同时它就是为并发式编程而生的。这个打包文件中包含了《SCALA程序设计-JAVA虚拟机...
创建SparkContext WordCount程序
二、 实验目的 掌握使用IntelliJ IDEA开发Spark应用程序。 三、 实验要求 使用IntelliJ IDEA开发本地Spark应用程序。 部署分布式Spark应用程序
Spark集群及开发环境搭建,适合初学者,一步一步并配有截图。 目录 一、 软件及下载 2 二、 集群环境信息 2 ...八、 Scala开发 20 1、插件下载 20 2、插件安装 21 3、scala开发 22 4、程序执行 22
1、资源内容:Scala实现的基于熟悉的XML与SQL实现快速开发基于Spark的大数据ETL程序+源代码+文档说明 2、代码特点:内含运行结果,不会运行可私信,参数化编程、参数可方便更改、代码编程思路清晰、注释明细,都经过...
intellij上的scala插件,intellij上本地开发Scala程序或者是spark程序用到的插件,非常的实用
Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。...Spark 由加州大学伯克利分校 AMP 实验室 (Algorithms, Machines, and People Lab) 开发,可用来构建大型的、低延迟的数据分析应用程序。
Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。...Spark 由加州大学伯克利分校 AMP 实验室 (Algorithms, Machines, and People Lab) 开发,可用来构建大型的、低延迟的数据分析应用程序。
该项目包含Scala编程语言中的Spark程序。 Spark 2.1涵盖的主题 使用Spark-2.1实现自定义UDF,UDAF,Partitioner 使用数据框(ComplexSchema,DropDuplicates,DatasetConversion,GroupingAndAggregation) 使用...
本节将介绍如何实际动手进行 RDD 的转换与操作,以及如何编写、编译、打包和运行 Spark 应用程序。 启动 Spark Shell Spark 的交互式脚本是一种学习 API 的简单途径,也是分析数据集交互的有力工具。Spark 包含多种...
Sparkour为Spark新手提供了扩展的教程,以及较短的独立配方,可满足Java,Python,R和Scala中常见的开发人员需求。 整个产品都是根据Apache License 2.0授权的。 Sparkour源代码在Apache License 2.0下发布。 可以...
2.2 Spark程序与作业概念映射 55 2.3 Spark作业运行流程 55 3 Spark工作原理 55 3.1 作业调度简介 55 3.2 Application调度 56 3.3 Job调度 56 3.4 Tasks延时调度 56 第七章 Spark运行原理 57 1 Spark运行基本流程 57...
Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。...Spark 由加州大学伯克利分校 AMP 实验室 (Algorithms, Machines, and People Lab) 开发,可用来构建大型的、低延迟的数据分析应用程序。
Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。...Spark 由加州大学伯克利分校 AMP 实验室 (Algorithms, Machines, and People Lab) 开发,可用来构建大型的、低延迟的数据分析应用程序。
安装ScalaIDE搭建Scala语言开发环境很容易,ScalaIDE官网下载合适的版本并解压就可以完成安装,本文使用的版本是4.1.0安装Scala语言包如果下载的ScalaIDE自带的Scala语言包与Spark1.3.1使用的Scala版本(2.10.x)不...
这是用于开发Apache Spark应用程序的基础图像。 Docker集线器 使用以下方法可以轻松获取最新图像: docker pull enriquegrodrigo/docker-sparkdev:latest 建立形象 要构建图像: git clone ...