$ spark-shell |
创建简单RDD
scala> val inputfile = sc.textFile(“input.txt”) |
inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at <console>:12 |
RDD转换返回指向新的RDD,并允许创建RDDS之间的依赖关系。 在依赖关系链中的每个RDD(依赖关系的字串)具有这样的功能,用于计算其数据并具有一个指针(依赖性)到其父RDD。
Spark是懒惰的,所以什么都不会被执行,除非调用一些改造或行动将触发作业创建和执行。看单词计数示例,如下面的代码片段。
S.No |
转换&含义
|
---|---|
1 |
map(func) 返回一个新的分布式数据集,传递源的每个元素形成通过一个函数 func |
2 |
filter(func)
返回由选择在func返回true,源元素组成了一个新的数据集
|
3 |
flatMap(func)
类似映射,但每个输入项目可以被映射到0以上输出项目(所以func应返回seq而不是单一的项目)
|
4 |
mapPartitions(func) 类似映射,只不过是单独的每个分区(块)上运行RDD,因此 func 的类型必须是Iterator<T> ⇒ Iterator<U> 对类型T在RDD上运行时 |
5 |
mapPartitionsWithIndex(func) 类似映射分区,而且还提供func 来表示分区的索引的整数值,因此 func 必须是类型 (Int, Iterator<T>) ⇒ Iterator<U> 当类型T在RDD上运行时 |
6 |
sample(withReplacement, fraction, seed)
采样数据的一小部分,有或没有更换,利用给定的随机数发生器的种子
|
7 |
union(otherDataset)
返回一个新的数据集,其中包含源数据和参数元素的结合
|
8 |
intersection(otherDataset)
返回包含在源数据和参数元素的新RDD交集
|
9 |
distinct([numTasks])
返回一个新的数据集包含源数据集的不同元素
|
10 |
groupByKey([numTasks]) 当调用(K,V)数据集,返回(K, Iterable<V>) 对数据集 |
11 |
reduceByKey(func, [numTasks]) |
12 |
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) |
13 |
sortByKey([ascending], [numTasks]) |
14 |
join(otherDataset, [numTasks]) |
15 |
cogroup(otherDataset, [numTasks]) |
16 |
cartesian(otherDataset)
当上调用类型T和U的数据集,返回(T,U)对数据集(所有元素对)
|
17 |
pipe(command, [envVars]) RDD通过shell命令每个分区,例如:一个Perl或bash脚本。RDD元素被写入到进程的标准输入和线路输出,标准输出形式返回一个字符串RDD |
18 |
coalesce(numPartitions)
减少RDD到numPartitions分区的数量。过滤大型数据集后,更高效地运行的操作
|
19 |
repartition(numPartitions)
打乱RDD数据随机创造更多或更少的分区,并在它们之间平衡。这总是打乱的所有数据在网络上
|
20 |
repartitionAndSortWithinPartitions(partitioner) 根据给定的分区重新分区RDD及在每个结果分区,排序键记录。这是调用重新分配排序在每个分区内,因为它可以推动分拣向下进入混洗机制效率更高。 |
S.No | 操作 & 含义 |
---|---|
1 |
reduce(func) 合计数据集的元素,使用函数 func (其中有两个参数和返回一行). 该函数应该是可交换和可结合,以便它可以正确地在并行计算。 |
2 |
collect() 返回数据集的所有作为数组在驱动程序的元素。这是一个过滤器或其它操作之后返回数据的一个足够小的子集,通常是有用的 |
3 |
count()
返回该数据集的元素数
|
4 |
first()
返回的数据集的第一个元素(类似于使用(1))
|
5 |
take(n)
返回与该数据集的前n个元素的阵列。
|
6 |
takeSample (withReplacement,num, [seed]) 返回数组的数据集num个元素,有或没有更换随机抽样,预指定的随机数发生器的种子可选 |
7 |
takeOrdered(n, [ordering])
返回RDD使用或者按其自然顺序或自定义比较的前第n个元素
|
8 |
saveAsTextFile(path) 写入数据集是一个文本文件中的元素(或一组文本文件),在给定的目录的本地文件系统,HDFS或任何其他的Hadoop支持的文件系统。Spark调用每个元素的 toString,将其转换为文件中的文本行 |
9 |
saveAsSequenceFile(path) (Java and Scala) 写入数据集,为Hadoop SequenceFile元素在给定的路径写入在本地文件系统,HDFS或任何其他Hadoop支持的文件系统。 这是适用于实现Hadoop可写接口RDDS的键 - 值对。在Scala中,它也可以在属于隐式转换为可写(Spark包括转换为基本类型,如 Int, Double, String 等等)类型。 |
10 |
saveAsObjectFile(path) (Java and Scala) 写入数据集的内容使用Java序列化为一个简单的格式,然后可以使用SparkContext.objectFile()加载。 |
11 |
countByKey() 仅适用于RDDS的类型 (K, V). 返回(K, Int)对与每个键的次数的一个HashMap。 |
12 |
foreach(func) 数据集的每个元素上运行函数func。这通常对于不良反应,例如更新累加器或与外部存储系统进行交互进行。 注 − 在 foreach()以外修改变量,其他累加器可能会导致不确定的行为。请参阅了解闭包的更多细节。 |
考虑一个单词计数的例子 − 它计算出现在文档中的每个单词。请看下面的文字为输入并保存在主目录中的 input.txt 文件。
input.txt − 作为输入文件
people are not as beautiful as they look, |
as they walk or as they talk. |
they are only as beautiful as they love, |
as they care as they share. |
下面的命令用来打开spark shell. 通常情况下,spark 使用Scala构建。因此,Spark 程序需要在 Scala 环境中运行。
$ spark-shell |
Spark assembly has been built with Hive, including Datanucleus jars on classpath |
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties |
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop |
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop |
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled; |
ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) |
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server |
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292. |
Welcome to |
____ __ |
/ __/__ ___ _____/ /__ |
_\ \/ _ \/ _ `/ __/ '_/ |
/___/ .__/\_,_/_/ /_/\_\ version 1.4.0 |
/_/ |
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71) |
Type in expressions to have them evaluated. |
Spark context available as sc |
scala> |
下面的命令被用于从给定位置读出的文件。这里,新的 RDD 使用输入文件名创建。这是在 textFile(“”)方法的参数字符串是用于输入文件名的绝对路径。然而,如果仅给出文件名,那么它输入文件则在当前位置。
scala> val inputfile = sc.textFile("input.txt") |
我们的目标是计算一个文件中的字数。分裂每一行成词创建一个平面地图(flatMap(line ⇒ line.split(“ ”)).
接下来,读每个词作为一个键和值 ‘1’ (<key, value> = <word,1>) 使用映射函数 (map(word ⇒ (word, 1)).
最后,加入类似的键值降低这些键 (reduceByKey(_+_)).
下面的命令用于执行字数统计逻辑。执行此操作后,不会有任何输出,因为这不是一个动作,这是一个转换; 指向一个新的RDD或告诉spark,用给定的数据来做什么)。
scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_); |
同时用RDD工作,如果想了解当前的RDD,那么可使用下面的命令。 它会告诉你关于当前RDD及其依赖调试的描述。
scala> counts.toDebugString |
可以使用 persist() 或 cache() 方法标记一个RDD。在第一次计算的操作,这将被保存在存储器中的节点上。使用下面的命令来存储中间转换在内存中。
scala> counts.cache() |
应用动作(操作),比如存储所有的转换结果到一个文本文件中。saveAsTextFile(“”)方法字符串参数是输出文件夹的绝对路径。试试下面的命令来保存输出文本文件。在下面的例子中, ‘output’ 的文件夹为当前位置。
scala> counts.saveAsTextFile("output") |
打开另一个终端进入主目录(其中spark 在其他终端中执行)。下面的命令用于检查输出目录。
[hadoop@localhost ~]$ cd output/ |
[hadoop@localhost output]$ ls -1 |
part-00000 |
part-00001 |
_SUCCESS |
[hadoop@localhost output]$ cat part-00000 |
(people,1) |
(are,2) |
(not,1) |
(as,8) |
(beautiful,2) |
(they, 7) |
(look,1) |
[hadoop@localhost output]$ cat part-00001 |
(walk, 1) |
(or, 1) |
(talk, 1) |
(only, 1) |
(love, 1) |
(care, 1) |
(share, 1) |
Scala> counts.unpersist() |
15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list |
15/06/27 00:57:33 INFO BlockManager: Removing RDD 9 |
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1 |
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810) |
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0 |
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106) |
res7: cou.type = ShuffledRDD[9] at reduceByKey at <console>:14 |