Contents
  1. 1. Spark
  2. 2. Spark on Yarn
  3. 3. 测试代码
  4. 4. pycharm访问spark
    1. 4.0.1. 1. 安装py4j
    2. 4.0.2. 2. 配置环境变量
    3. 4.0.3. 3. 关联Spark和Hadoop
    4. 4.0.4. 4. windows上的问题。

Spark

简单之美 | RDD:基于内存的集群计算容错抽象

Spark on Yarn

Spark 官方提供了三种集群部署方案: Standalone, Mesos, YARN,区别就在于资源管理调度平台不同。

想在已有的Hadoop集群上使用Spark,实现Spark on Yarn只需修改配置文件vi ./conf/spark-env.sh添加以下内容

export HADOOP_HOME=/share/apps/hadoop

export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

任务提交主要参数(用好可以充分利用资源)

参数名称 含义
–master MASTER_URL 可以是spark://host:port, mesos://host:port, yarn, yarn-cluster,yarn-client, local
–deploy-mode DEPLOY_MODE Driver程序运行的地方,client或者cluster
–class CLASS_NAME 主类名称,含包名
–name NAME Application名称
–jars JARS Driver依赖的第三方jar包
–py-files PY_FILES 用逗号隔开的放置在Python应用程序PYTHONPATH上的.zip, .egg, .py文件列表
–files FILES 用逗号隔开的要放置在每个executor工作目录的文件列表
–executor-memory MEM executor内存大小,默认1G
–total-executor-cores NUM executor使用的总核数,仅限于Spark Alone、Spark on Mesos模式
–executor-cores NUM 每个executor使用的内核数,默认为1,仅限于Spark on Yarn模式
–queue QUEUE_NAME 提交应用程序给哪个YARN的队列,默认是default队列,仅限于Spark on Yarn模式
–num-executors NUM 启动的executor数量,默认是2个,仅限于Spark on Yarn模式

更多内容,见博客文章

测试代码

1
./bin/spark-submit --master yarn --name spark-test --class org.apache.spark.examples.SparkPi lib/spark-examples*.jar 10
1
2
3
4
5
6
7
8
9
10
11
12
13
14
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# test_spark.py
from pyspark import SparkContext
logFile = "C:\spark-2.0.2-bin-hadoop2.6\README.md"
sc = SparkContext("local","Simple App")
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print("Lines with a: %i, lines with b: %i"%(numAs, numBs))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#!/usr/bin/python
# -*- coding: UTF-8 -*-
# testDFrame.py
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
import sys
reload(sys)
sys.setdefaultencoding("utf-8")
def schema_inference_example(spark):
sc = spark.sparkContext
# Load a text file and convert each line to a Row.
lines = sc.textFile("/user/jiangmanhua/Experiment/url_domain_labeled")
parts = lines.map(lambda l: l.split("\t"))
label = parts.map(lambda p: Row(label_name=p[0], type=p[1]))
# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(label)
schemaPeople.createOrReplaceTempView("label")
# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT type, count(label_name) as cc FROM label group by type")
# The results of SQL queries are Dataframe objects.
# rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
teenNames = teenagers.rdd.map(lambda p: "Name: " + p.type + "cnt: " + str(p.cc)).collect()
for name in teenNames:
print(name)
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
schema_inference_example(spark)
spark.stop()

pycharm访问spark

1. 安装py4j

主要两个步骤:安装py4j、配置路径

1
2
3
4
# Windows
pip install py4j
# Linux
sudo pip2 install py4j
2. 配置环境变量
1
2
HADOOP_HOME = C:\hadoop-2.6.5
SPARK_HOME = C:\spark-2.0.2-bin-hadoop2.6

也可以在pycharm中选择“Run” ->“Edit Configurations” ->“Environment variables” 增加SPARK_HOME目录以及HADOOP_HOME目录

3. 关联Spark和Hadoop

$PYTHON_HOME\lib\site-packages下新建pyspark.pth文件内容为pyspark的目录 (如D:\spark-2.0.2-bin-hadoop2.6\python)

1
2
# Linux
echo /home/manhua/app/spark/python > /home/manhua/.local/lib/python2.7/site-packages/pyspark.pth
4. windows上的问题。

下载winutils.exe放到C:\hadoop-2.6.5\bin