让我们同样以计算字数为例子,在使用之前,使用shell命令。 在这里,我们考虑同样 spark 应用程序的例子。
下面的文字是输入数据,并命名该文件为 in.txt.
people are not as beautiful as they look, as they walk or as they talk. they are only as beautiful as they love, as they care as they share.请看下面的程序
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark._ object SparkWordCount { def main(args: Array[String]) { val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map()) /* local = master URL; Word Count = application name; */ /* /usr/local/spark = Spark Home; Nil = jars; Map = environment */ /* Map = variables to work nodes */ /*creating an inputRDD to read text file (in.txt) through Spark context*/ val input = sc.textFile("in.txt") /* Transform the inputRDD into countRDD */ valcount = input.flatMap(line ⇒ line.split(" ")) .map(word ⇒ (word, 1)) .reduceByKey(_ + _) /* saveAsTextFile method is an action that effects on the RDD */ count.saveAsTextFile("outfile") System.out.println("OK"); } }保存上述程序到指定的文件 SparkWordCount.scala 并将其放置在一个用户定义的目录名为 spark-application.
注 − 虽然转化 inputRDD 成 countRDD 我们使用 flatMap() 用于标记化(从文本文件),行成单词, map() 方法统计词频和 reduceByKey() 方法计算每个单词的重复。
使用以下步骤来提交应用程序。通过终端在 spark-application目录中执行所有步骤。
Spark需要核心 jar 来编译,因此,从下面的链接下载spark-core_2.10-1.3.0.jar 移动下载 jar 的文件到 spark-application 应用程序目录。
使用下面给出的命令编译上述程序。这个命令应该在spark-application应用程序目录下执行。这里,/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar ,Spark 采用了 Hadoop 的 jar 支持程序。
$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala
jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar第4步:提交spark应用
使用以下命令提交 spark 应用
spark-submit --class SparkWordCount --master local wordcount.jar
如果成功执行,那么会发现有下面给出的输出。在下面输出的正常用户识别,这是程序的最后一行。如果仔细阅读下面的输出,会发现不同的东西,比如 −
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started 15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954. 15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.217:42954] 15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB 15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server 15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707. 15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040 15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029 15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader 15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54 15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable 15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11) 15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s 15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s OK 15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook 15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040 15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler 15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion. 15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared 15/07/08 13:56:14 INFO BlockManager: BlockManager stopped 15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped 15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext 15/07/08 13:56:14 INFO Utils: Shutdown hook called 15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af 15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!第5步:检查输出
$ cd outfile $ ls Part-00000 part-00001 _SUCCESSpart-00000 文件检查输出命令
$ cat part-00000 (people,1) (are,2) (not,1) (as,8) (beautiful,2) (they, 7) (look,1)part-00001文件查看输出命令
$ cat part-00001 (walk, 1) (or, 1) (talk, 1) (only, 1) (love, 1) (care, 1) (share, 1)通过下面的部分更多地了解“spark-submit”命令。
spark-submit [options] <app jar | python file> [app arguments]
S.No | 选项 | 描述 |
---|---|---|
1 | --master | spark://host:port, mesos://host:port, yarn, 或 local. |
2 | --deploy-mode | 无论是在本地启动驱动程序(client),或在工作人员的机器中的一个集群内 ("cluster") (默认: client) |
3 | --class |
应用程序的主类(适用于 Java/Scala 的应用程序)
|
4 | --name |
应用程序的名称
|
5 | --jars |
以逗号分隔本地 jar 列表包括驱动器和执行者类路径
|
6 | --packages |
逗号分隔 jar 的 Maven 坐标系列表,包括驱动器和执行者类路径
|
7 | --repositories | 逗号分隔额外远程存储库列表搜索Maven给定的坐标,使用 --packages |
8 | --py-files |
用逗号分隔 .zip,.egg 或.py文件的列表放在Python路径中的 Python 应用程序
|
9 | --files |
逗号分隔放置在每一个执行者的工作目录中的文件的列表
|
10 | --conf (prop=val) |
任意 Spark 配置属性
|
11 | --properties-file |
路径从一个文件来加载额外属性。如果没有指定,这将在 conf/spark-defaults 寻找默认值
|
12 | --driver-memory | 存储驱动程序 (e.g. 1000M, 2G) (默认: 512M) |
13 | --driver-java-options |
额外的Java选项传递给驱动程序
|
14 | --driver-library-path |
额外的库路径条目传递给驱动程序
|
15 | --driver-class-path |
额外的类路径条目传递给驱动程序
需要注意的是使用 --jars 添加 jar 会自动包含在类路径中
|
16 | --executor-memory | 每个执行者的内存(e.g. 1000M, 2G) (默认: 1G) |
17 | --proxy-user |
用户在提交申请时模仿
|
18 | --help, -h |
显示此帮助信息并退出
|
19 | --verbose, -v |
打印额外的调试输出
|
20 | --version |
打印当前 Spark 版本
|
21 | --driver-cores NUM |
核心驱动程序(默认值:1)
|
22 | --supervise |
如果给定,重新启动对故障的驱动程序
|
23 | --kill |
如果给定,杀死指定的驱动程序
|
24 | --status |
如果给定,请求指定的驱动程序的状态
|
25 | --total-executor-cores |
为所有执行者的核心总数
|
26 | --executor-cores |
每执行者内核的数量。 (默认值:1是YARN模式,或在独立模式下,工人利用多内核)
|