让我们同样以计算字数为例子,在使用之前,使用shell命令。 在这里,我们考虑同样 spark 应用程序的例子。
下面的文字是输入数据,并命名该文件为 in.txt.
people are not as beautiful as they look, |
as they walk or as they talk. |
they are only as beautiful as they love, |
as they care as they share. |
import org.apache.spark.SparkContext |
import org.apache.spark.SparkContext._ |
import org.apache.spark._ |
object SparkWordCount { |
def main(args: Array[String]) { |
val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map()) |
/* local = master URL; Word Count = application name; */ |
/* /usr/local/spark = Spark Home; Nil = jars; Map = environment */ |
/* Map = variables to work nodes */ |
/*creating an inputRDD to read text file (in.txt) through Spark context*/ |
val input = sc.textFile("in.txt") |
/* Transform the inputRDD into countRDD */ |
valcount = input.flatMap(line ⇒ line.split(" ")) |
.map(word ⇒ (word, 1)) |
.reduceByKey(_ + _) |
/* saveAsTextFile method is an action that effects on the RDD */ |
count.saveAsTextFile("outfile") |
System.out.println("OK"); |
} |
} |
注 − 虽然转化 inputRDD 成 countRDD 我们使用 flatMap() 用于标记化(从文本文件),行成单词, map() 方法统计词频和 reduceByKey() 方法计算每个单词的重复。
使用以下步骤来提交应用程序。通过终端在 spark-application目录中执行所有步骤。
Spark需要核心 jar 来编译,因此,从下面的链接下载spark-core_2.10-1.3.0.jar 移动下载 jar 的文件到 spark-application 应用程序目录。
使用下面给出的命令编译上述程序。这个命令应该在spark-application应用程序目录下执行。这里,/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar ,Spark 采用了 Hadoop 的 jar 支持程序。
$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala |
jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar |
使用以下命令提交 spark 应用
spark-submit --class SparkWordCount --master local wordcount.jar |
如果成功执行,那么会发现有下面给出的输出。在下面输出的正常用户识别,这是程序的最后一行。如果仔细阅读下面的输出,会发现不同的东西,比如 −
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started |
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954. |
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.217:42954] |
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB |
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server |
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707. |
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040 |
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029 |
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader |
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54 |
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver |
(MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable |
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11) |
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s |
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s |
OK |
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook |
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040 |
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler |
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! |
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion. |
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared |
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped |
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped |
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext |
15/07/08 13:56:14 INFO Utils: Shutdown hook called |
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af |
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! |
$ cd outfile |
$ ls |
Part-00000 part-00001 _SUCCESS |
$ cat part-00000 |
(people,1) |
(are,2) |
(not,1) |
(as,8) |
(beautiful,2) |
(they, 7) |
(look,1) |
$ cat part-00001 |
(walk, 1) |
(or, 1) |
(talk, 1) |
(only, 1) |
(love, 1) |
(care, 1) |
(share, 1) |
spark-submit [options] <app jar | python file> [app arguments] |
S.No | 选项 | 描述 |
---|---|---|
1 | --master | spark://host:port, mesos://host:port, yarn, 或 local. |
2 | --deploy-mode | 无论是在本地启动驱动程序(client),或在工作人员的机器中的一个集群内 ("cluster") (默认: client) |
3 | --class |
应用程序的主类(适用于 Java/Scala 的应用程序)
|
4 | --name |
应用程序的名称
|
5 | --jars |
以逗号分隔本地 jar 列表包括驱动器和执行者类路径
|
6 | --packages |
逗号分隔 jar 的 Maven 坐标系列表,包括驱动器和执行者类路径
|
7 | --repositories | 逗号分隔额外远程存储库列表搜索Maven给定的坐标,使用 --packages |
8 | --py-files |
用逗号分隔 .zip,.egg 或.py文件的列表放在Python路径中的 Python 应用程序
|
9 | --files |
逗号分隔放置在每一个执行者的工作目录中的文件的列表
|
10 | --conf (prop=val) |
任意 Spark 配置属性
|
11 | --properties-file |
路径从一个文件来加载额外属性。如果没有指定,这将在 conf/spark-defaults 寻找默认值
|
12 | --driver-memory | 存储驱动程序 (e.g. 1000M, 2G) (默认: 512M) |
13 | --driver-java-options |
额外的Java选项传递给驱动程序
|
14 | --driver-library-path |
额外的库路径条目传递给驱动程序
|
15 | --driver-class-path |
额外的类路径条目传递给驱动程序
需要注意的是使用 --jars 添加 jar 会自动包含在类路径中
|
16 | --executor-memory | 每个执行者的内存(e.g. 1000M, 2G) (默认: 1G) |
17 | --proxy-user |
用户在提交申请时模仿
|
18 | --help, -h |
显示此帮助信息并退出
|
19 | --verbose, -v |
打印额外的调试输出
|
20 | --version |
打印当前 Spark 版本
|
21 | --driver-cores NUM |
核心驱动程序(默认值:1)
|
22 | --supervise |
如果给定,重新启动对故障的驱动程序
|
23 | --kill |
如果给定,杀死指定的驱动程序
|
24 | --status |
如果给定,请求指定的驱动程序的状态
|
25 | --total-executor-cores |
为所有执行者的核心总数
|
26 | --executor-cores |
每执行者内核的数量。 (默认值:1是YARN模式,或在独立模式下,工人利用多内核)
|