spark 相关
一、安装 jdk
curl -O https://download.oracle.com/otn/java/jdk/8u221-b11/230deb18db3e4014bb8e3e8324f81b43/jdk-8u221-linux-x64.tar.gz?AuthParam=1571023421_c1f05abfd1ce9b457c29b0565494cc83
tar -zxvf jdk-8u221-linux-x64.tar.gz
vim /etc/profile
export JAVA_HOME=/root/jdk1.8.0_221
export PATH=$PATH:$JAVA_HOME/bin
source /etc/profile
二、hadoop安装
1、下载
curl -O https://www-us.apache.org/dist/hadoop/common/hadoop-3.2.1/hadoop-3.2.1-src.tar.gz
tar -zxvf hadoop-3.2.1-src.tar.gz
2、配置
cd /root/hadoop-3.2.1/etc/hadoop
mapred-site.xml
exprot JAVA_HOME=${JAVA_HOME}
hdfs-site.xml
<configuration>
<property>
<name>dfs.namenode.http-address</name>
<value>master:50070</value>
</property>
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
<property>
<name>dfs.permissions</name>
<value>true</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:/root/hadoop-3.2.1/dfs/name</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file:/root/hadoop-3.2.1/dfs/data</value>
</property>
</configuration>
yarn-env.sh
export JAVA_HOME=/root/jdk1.8.0_221
yarn-site.xml
<configuration>
<property>
<name>yarn.resoursemanager.hostname</name>
<value>master</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
<property>
<name>yarn.resourcemanager.address</name>
<value>master:8032</value>
</property>
<property>
<name>yarn.resourcemanager.scheduler.address</name>
<value>master:8030</value>
</property>
<property>
<name>yarn.resourcemanager.resource-tracker.address</name>
<value>master:8035</value>
</property>
<property>
<name>yarn.resourcemanager.admin.address</name>
<value>master:8033</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.address</name>
<value>master:8088</value>
</property>
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>30720</value>
</property>
</configuration>
works
node1
node2
core-site.xml
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://master:9000</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>file:/usr/local/hadoop/tmp</value>
<description>Abase for other temporary directories.</description>
</property>
</configuration>
mapred-site.xml
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>mapreduce.jobhistory.address</name>
<value>master:10020</value>
</property>
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>master:19888</value>
</property>
</configuration>
3、启动
/root/hadoop-3.2.1/sbin/start-all.sh
三、安装spark
1、下载
curl -O https://www.apache.org/dyn/closer.lua/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
tar -zxvf spark-2.4.4-bin-hadoop2.7.tgz
2、配置
cd spark-2.4.4-bin-hadoop2.7/conf
cp spark-env.sh.template spark-env.sh
cp spark-defaults.conf.template spark-defaults.conf
cp slaves.template slaves
spark-env.sh
export JAVA_HOME=/root/jdk1.8.0_221
export SPARK_MASTER_IP=192.168.0.128
slaves
node1
node2
spark-defaults.conf
spark.master spark://master:7077
spark.eventLog.enabled true
spark.driver.cores 2
spark.driver.memory 2g
spark.executor.memory 7g
spark.rpc.message.maxSize 1024
3、启动
/root/spark-2.4.4-bin-hadoop2.7/sbin/start-all.sh
jps
四、使用
1、使用示例代码
spark-submit --class org.apache.spark.examples.JavaWordCount /usr/local/src/spark/examples/jars/spark-examples_2.11-2.4.4.jar /usr/local/src/spark/README.md
spark-submit --class org.apache.spark.examples.JavaSparkPi /usr/local/src/spark/examples/jars/spark-examples_2.11-2.4.4.jar /usr/local/src/spark/README.md
spark-submit /usr/local/src/spark/examples/src/main/python/pi.py 10
cd spark-2.4.4-bin-hadoop2.7/
./bin/spark-shell
[root@master spark-2.4.4-bin-hadoop2.7]# ./bin/pyspark
Python 2.7.5 (default, Oct 30 2018, 23:45:53)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-36)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
19/10/14 14:36:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
19/10/14 14:36:05 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.4
/_/
Using Python version 2.7.5 (default, Oct 30 2018 23:45:53)
SparkSession available as 'spark'.
vim /root/spark-2.4.4-bin-hadoop2.7/examples/src/main/python/pi.py
from __future__ import print_function
import sys
from random import random
from operator import add
from pyspark.sql import SparkSession
if __name__ == "__main__":
"""
Usage: pi [partitions]
"""
spark = SparkSession\
.builder\
.appName("PythonPi")\
.getOrCreate()
partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
n = 100000 * partitions
def f(_):
x = random() * 2 - 1
y = random() * 2 - 1
return 1 if x ** 2 + y ** 2 <= 1 else 0
count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
print("Pi is roughly %f" % (4.0 * count / n))
result = [4.0*count/n]
spark.sparkContext.parallelize(result, 1).saveAsTextFile("/opt/hl")
spark.stop()
textFile = spark.read.text("README.md")
textFile.count()
textFile.first()
linesWithSpark = textFile.filter(textFile.value.contains("Spark"))
hadoop dfsadmin -safemode leave
spark-submit --executor-memory 8G --num-executors 2 --master yarn --deploy-mode cluster --driver-memory 2G pie.py 10
yarn logs -applicationId application_1571131648061_0016
8080 spark
8088 yarn
50070 handoop
五、资源配置
Spark任务的core,executor,memory资源配置方法
1、Spark应用当中术语的基本定义:
- Partitions : 分区是大型分布式数据集的一小部分。 Spark使用分区来管理数据,这些分区有助于并行化数据处理,并且使executor之间的数据交换最小化
- Task:任务是一个工作单元,可以在分布式数据集的分区上运行,并在单个Excutor上执行。并行执行的单位是任务级别。单个Stage中的Tasks可以并行执行
- Executor:在一个worker节点上为应用程序创建的JVM,Executor将巡行task的数据保存在内存或者磁盘中。每个应用都有自己的一些executors,单个节点可以运行多个Executor,并且一个应用可以跨多节点。Executor始终伴随Spark应用执行过程,并且以多线程方式运行任务。spark应用的executor个数可以通过SparkConf或者命令行 –num-executor进行配置
- Cores:CPU最基本的计算单元,一个CPU可以有一个或者多个core执行task任务,更多的core带来更高的计算效率,Spark中,cores决定了一个executor中并行task的个数
- Cluster Manager:cluster manager负责从集群中请求资源
2、cluster模式执行的Spark任务包含了如下步骤:
- driver端,SparkContext连接cluster manager(Standalone/Mesos/Yarn)
- Cluster Manager在其他应用之间定位资源,只要executor执行并且能够相互通信,可以使用任何Cluster Manager
- Spark获取集群中节点的Executor,每个应用都能够有自己的executor处理进程
- 发送应用程序代码到executor中
- SparkContext将Tasks发送到executors
以上步骤可以清晰看到executors个数和内存设置在spark中的重要作用。
3、例子1
硬件资源: 6 节点,每个节点16 cores, 64 GB 内存
每个节点在计算资源时候,给操作系统和Hadoop的进程预留1core,1GB,所以每个节点剩下15个core和63GB
内存。
core的个数,决定一个executor能够并发任务的个数。所以通常认为,一个executor越多的并发任务能够得到更好的性能,但有研究显示一个应用并发任务超过5,导致更差的性能。所以core的个数暂设置为5个。
5个core是表明executor并发任务的能力,并不是说一个系统有多少个core,即使我们一个CPU有32个core,也设置5个core不变。
executor个数,接下来,一个executor分配 5 core,一个node有15 core,从而我们计算一个node上会有3 executor(15 / 5),然后通过每个node的executor个数得到整个任务可以分配的executors个数。
我们有6个节点,每个节点3个executor,6 × 3 = 18个executors,额外预留1个executor给AM,最终要配置17个executors。
最后spark-submit启动脚本中配置 –num-executors = 17
memory,配置每个executor的内存,一个node,3 executor, 63G内存可用,所以每个executor可配置内存为63 / 3 = 21G
从Spark的内存模型角度,Executor占用的内存分为两部分:ExecutorMemory和MemoryOverhead,预留出MemoryOverhead的内存量之后,才是ExecutorMemory的内存。
MemoryOverhead的计算公式: max(384M, 0.07 × spark.executor.memory)
因此 MemoryOverhead值为0.07 × 21G = 1.47G > 384M
最终executor的内存配置值为 21G – 1.47 ≈ 19 GB
至此, Cores = 5, Executors= 17, Executor Memory = 19 GB
六、RDD
1、转换操作
RDD 的转换操作是返回新的 RDD 的操作。转换出来的 RDD 是惰性求值的,只有在行动操作中用到这些 RDD 时才会被计算。
rdd1={1, 2, 5, 3},rdd2={3,4,5}
rdd1.flatMap(x=>x.to(3))
1 -> {1,2,3}
2 -> {2,3}
3 -> {}
3 -> {3}
123233
spark-shell
var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
rdd1.flatMap(x=>x.to(4)).collect()
函数名 | 作用 | 示例 | 结果 |
---|---|---|---|
map() | 将函数应用于 RDD 的每个元素,返回值是新的 RDD,有返回值 | rdd1.map(x=>x+l) | {2,3,4,4} |
flatMap() | 将函数应用于 RDD 的每个元素,将元素数据进行拆分,变成迭代器,返回值是新的 RDD,压平操作,先map后flat,将函数应用于RDD中的每一项,对于每一项都产生一个集合,并将集合中的元素压扁成一个集合。 | rdd1.flatMap(x=>x.to(3)) | {1,2,3,2,3,3,3} |
filter() | 函数会过滤掉不符合条件的元素,返回值是新的 RDD | rdd1.filter(x=>x!=1) | {2,3,3} |
distinct() | 将 RDD 里的元素进行去重操作 | rdd1.distinct() | (1,2,3) |
union() | 生成包含两个 RDD 所有元素的新的 RDD,并集 | rdd1.union(rdd2) | {1,2,3,3,3,4,5} |
intersection() | 求出两个 RDD 的共同元素,交集 | rdd1.intersection(rdd2) | {3} |
subtract() | 将原 RDD 里和参数 RDD 里相同的元素去掉 | rdd1.subtract(rdd2) | {1,2} |
cartesian() | 求两个 RDD 的笛卡儿积 | rdd1.cartesian(rdd2) | {(1,3),(1,4)……(3,5)} |
groupBy() | 分组 | ||
sortBy() | 排序 |
2、行动操作
行动操作用于执行计算并按指定的方式输出结果。行动操作接受 RDD,但是返回非 RDD,即输出一个值或者结果。在 RDD 执行过程中,真正的计算发生在行动操作
rdd={1,2,3,3}
函数名 | 作用 | 示例 | 结果 |
---|---|---|---|
collect() | 返回 RDD 的所有元素 | rdd.collect() | {1,2,3,3} |
count() | RDD 里元素的个数 | rdd.count() | 4 |
countByValue() | 各元素在 RDD 中的出现次数 | rdd.countByValue() | {(1,1),(2,1),(3,2})} |
take(num) | 从 RDD 中返回 num 个元素 | rdd.take(2) | {1,2} |
top(num) | 从 RDD 中,按照默认(降序)或者指定的排序返回最前面的 num 个元素 | rdd.top(2) | {3,3} |
reduce() | 并行整合所有 RDD 数据,如求和操作 | rdd.reduce((x,y)=>x+y) | 9 |
reduceByKey() | |||
groupByKey() | 根据key进行分组 | ||
fold(zero)(func) | 和 reduce() 功能一样,但需要提供初始值 | rdd.fold(0)((x,y)=>x+y) | 9 |
foreach(func) | 对 RDD 的每个元素都使用特定函数,没有返回值 | rdd1.foreach(x=>printIn(x)) | 打印每一个元素 |
saveAsTextFile(path) | 将数据集的元素,以文本的形式保存到文件系统中 | rdd1.saveAsTextFile(file://home/test) | |
saveAsSequenceFile(path) | 将数据集的元素,以顺序文件格式保存到指 定的目录下 | saveAsSequenceFile(hdfs://home/test) | |
first | 返回第一个元素 | rdd1.first() | 1 |
cogroup() | 表示将多个rdd的同一个key对应的value组合到一起 |
七、示例代码
pi
from __future__ import print_function
import sys
from random import random
from operator import add
from pyspark.sql import SparkSession
if __name__ == "__main__":
"""
Usage: pi [partitions]
"""
spark = SparkSession\
.builder\
.appName("PythonPi")\
.getOrCreate()
partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
n = 100000 * partitions
def f(_):
x = random() * 2 - 1
y = random() * 2 - 1
return 1 if x ** 2 + y ** 2 <= 1 else 0
count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
print("Pi is roughly %f" % (4.0 * count / n))
spark.stop()
wordcount
from __future__ import print_function
import sys
from operator import add
from pyspark.sql import SparkSession
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: wordcount <file>", file=sys.stderr)
sys.exit(-1)
spark = SparkSession\
.builder\
.appName("PythonWordCount")\
.getOrCreate()
lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
.reduceByKey(add)
output = counts.collect()
for (word, count) in output:
print("%s: %i" % (word, count))
spark.stop()
八、pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.hl</groupId>
<artifactId>SparkPi</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.4</version>
</dependency>
</dependencies>
</project>
VM options:
-Dspark.master=local