Spark上手
JRQZ
安装Spark
官网下载,解压缩
配置环境变量JAVA_HOME,以及添加spark到系统路径
~/.bashrc
export SPARK_HOME=/data/download/spark-3.5.1-bin-hadoop3
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
export JAVA_HOME=/usr/lib/jvm/TencentKona-8.0.17-402
export PATH=$PATH:$JAVA_HOME/bin
spark-shell
启动sparkshell
spark-shell
单词计数程序测试
var hFile = sc.textFile("hdfs://localhost:9000/user/wjrtest/input/capacity-scheduler.xml")
val wc = hFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
wc.take(5)
pyspark
pyspark shell
运行pyspark
pyspark
启动失败,出现segmentation fault
增加core文件的大小限制
ulimit -c unlimited
再次运行pyspark
,触发Segmentation fault
错误
使用gdb调试
gdb -c core
(gdb) bt
结果如下:
(base) [root@xxx-tencentos /data/download/spark-3.5.1-bin-hadoop3]# gdb -c core.17222
GNU gdb (GDB) Red Hat Enterprise Linux 7.6.1-120.tl2
Copyright (C) 2013 Free Software Foundation, Inc.
License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>
This is free software: you are free to change and redistribute it.
There is NO WARRANTY, to the extent permitted by law. Type "show copying"
and "show warranty" for details.
This GDB was configured as "x86_64-redhat-linux-gnu".
For bug reporting instructions, please see:
<http://www.gnu.org/software/gdb/bugs/>.
[New LWP 17222]
Core was generated by `python3'.
Program terminated with signal 11, Segmentation fault.
#0 0x00007ff3bfdf0f4f in ?? ()
(gdb) bt
#0 0x00007ff3bfdf0f4f in ?? ()
#1 0x00007ff3c0c2d049 in ?? ()
#2 0x0000000000000000 in ?? ()
(gdb)
显然无法确定具体原因
继续排查,如查找spark日志,关闭conda环境,均没有解决
最后确定需要修改 vim conf/spark-env.sh
,使用系统自带的python3
export PYSPARK_DRIVER_PYTHON=/usr/bin/python3
export PYSPARK_PYTHON=/usr/bin/python3
此时再运行pyspark
,问题解决
很神奇的坑,网上转了一圈没有碰到类似的,个人推测问题在于,本机配置的conda环境为python3.12,版本过高,且设定为系统默认版本;这样启动pyspark就会在spark启动的新会话中启动系统默认的python(也就是为什么在终端退出conda也不起作用)
参考:https://www.cnblogs.com/traditional/p/11724876.html
Spark 执行任务的相关细节和信息,可以通过 webUI 查看(端口是 4040)
从已存在的集合创建rdd
rdd1 = sc.parallelize(range(10))
# 调用 sc.parallelize 方法,可以将已经存在的集合转为 RDD
>>> rdd1 = sc.parallelize(range(10))
# 显示的是一个 RDD 对象
>>> rdd1
PythonRDD[1] at RDD at PythonRDD.scala:53
# 如果想查看具体内容,可以调用 collect 方法,这些后面会说
>>> rdd1.collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
# 进行 map 操作得到 rdd2
>>> rdd2 = rdd1.map(lambda x: x + 1)
>>> rdd2.collect()
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# 进行 reduce 操作
>>> rdd2.reduce(lambda x, y: x + y)
55
>>>
创建时指定分区数量
>>> rdd1 = sc.parallelize(range(10), 5)
>>> rdd1.glom().collect()
[[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]
>>> rdd1.getNumPartitions() # 获取分区数
5
SparkContext 还可以读取存储系统里面的文件来创建 RDD,我们演示一下从本地读取文件、和从 HDFS 上读取文件。
# 读取文件使用 textFile 方法,传递一个文件路径,同时也可以指定分区
# 可以从本地读取,读取的格式为 "file://文件路径"
>>> rdd1 = sc.textFile("file:///root/data.txt")
>>> rdd1.collect()
['hello matsuri', 'hello koishi', 'hello matsuri', 'hello marisa']
# 从 HDFS 上读取,格式为 "hdfs://ip:port/文件路径",port 就是 HDFS 集群的 NameNode 端口
# 注意格式:"...//girl.txt" 和 ".../girl.txt" 都是合法的
>>> rdd2 = sc.textFile("hdfs://localhost:9000/user/wjrtest/input/capacity-scheduler.xml", 4)
>>> rdd2.collect()
['hello matsuri', 'hello koishi', 'hello matsuri', 'hello marisa']
# 执行 map 操作
>>> rdd3 = rdd1.map(lambda x: x.split())
>>> rdd3.collect()
[['hello', 'matsuri'], ['hello', 'koishi'], ['hello', 'matsuri'], ['hello', 'marisa']]
另外 textFile 不光可以读取文件,还可以读取目录:textFile("/dir"),模糊读取:textFile("/dir/*.txt"),以及读取 gz 压缩包等等
也可以将 RDD 保存为文件,通过 saveAsTextFile 方法
>>> rdd = sc.parallelize(range(8), 4)
>>> rdd = rdd.map(lambda x: f"{x}号")
# 默认保存在本地,当然也可以加上 file://
>>> rdd.saveAsTextFile("/home/wjr/test/a.txt")
# 保存到 HDFS
>>> rdd.saveAsTextFile("hdfs://satori001:9000/a.txt")
虽然我们保存的文件名是 a.txt,但它并不是一个文本文件,而是一个目录
RDD 其实是一个抽象的概念,它表示一系列分区的集合,而这些分区可以分布在不同的节点上
# 内部元素是二元元组的 RDD,我们称之为 KV 型 RDD
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 2), ("b", 2), ("c", 4)])
>>> rdd.reduceByKey(lambda x, y: x + y).collect()
[('b', 3), ('c', 4), ('a', 3)]
词频统计
>>> rdd = sc.textFile("hdfs://localhost:9000/user/wjrtest/input/capacity-scheduler.xml")
# 按照空格分隔
>>> rdd = rdd.flatMap(lambda x: x.split())
>>> rdd.collect()
>>> rdd = rdd.map(lambda x: (x, 1))
>>> rdd.collect()
# 分组,然后组内元素相加即可
>>> rdd = rdd.reduceByKey(lambda x, y: x + y)
>>> rdd.collect()
[('python', 1), ('rust', 1), ('hello', 3), ('golang', 1)]
另一种方法
>>> rdd = sc.textFile("hdfs://localhost:9000/user/wjrtest/input/capacity-scheduler.xml")
# 分隔之后,进行分组
>>> rdd = rdd.flatMap(lambda x: x.split()).groupBy(lambda x: x)
>>> rdd.collect()
[('python', <pyspark.resultiterable.ResultIterable object at 0x7fb58e91ad30>),
('rust', <pyspark.resultiterable.ResultIterable object at 0x7fb58e91aa90>),
('hello', <pyspark.resultiterable.ResultIterable object at 0x7fb58e91aa58>),
('golang', <pyspark.resultiterable.ResultIterable object at 0x7fb58e91a7b8>)]
# 相同标签的归为一组,这里的标签就是元素本身
>>> rdd = rdd.mapValues(lambda x: list(x))
>>> rdd.collect()
[('python', ['python']),
('rust', ['rust']),
('hello', ['hello', 'hello', 'hello']),
('golang', ['golang'])]
>>> rdd = rdd.map(lambda x: (x[0], len(x[1])))
>>> rdd.collect()
[('python', 1), ('rust', 1), ('hello', 3), ('golang', 1)]
>>>
RDD持久化
调用 RDD 的 transformation 算子,相当于在构建执行计划,调用 action 算子时才会开始执行
Spark 一个最重要的能力就是它可以通过一些操作来持久化(或者缓存)内存中的数据,当你持久化一个RDD,节点就会存储这个 RDD 的所有分区,以后可以直接在内存中计算、或者在其它的 action 操作时能够重用。这一特性使得之后的 action 操作能够变得更快(通常是 10 个数量级),因此缓存对于迭代式算法或者快速的交互式使用是一个非常有效的工具。
可以通过调用 persist() 或者 cache() 方法来持久化一个 RDD,当第一次 action 操作触发时,所有分区数据就会被保存到其他节点的内存当中。并且 Spark Cache 具有容错性:如果 RDD 的某个分区数据丢失了,那么会根据原来创建它的 transformation 操作重新计算
persist 是惰性的,只有在遇到一次 action 操作的时候,才会缓存 RDD 的分区数据。如果不想缓存了,可以调用 unpersist,而 unpersist 是立刻执行的
编写py文件方法
编写py文件
# 文件名:main.py
from pyspark import SparkContext
# SparkConf 可以用来指定相关的配置,但是官方不推荐这种硬编码的模式
# 而是通过提交任务的时候指定,因此可以不用创建 SparkConf 对象
# 这里我们直接实例化 SparkContext 对象,命名为 sc
sc = SparkContext()
rdd = sc.textFile("hdfs://localhost:9000/user/wjrtest/input/capacity-scheduler.xml")
# 按照空格分隔
rdd = rdd.flatMap(lambda x: x.split())
rdd.collect()
rdd = rdd.map(lambda x: (x, 1))
rdd.collect()
# 分组,然后组内元素相加即可
rdd = rdd.reduceByKey(lambda x, y: x + y)
# 不在 shell 里面了,我们需要 print 才能看到结果
print(rdd.collect())
# 好的习惯,编程结束之后 stop 掉,表示关闭与 Spark 集群连接
# 否则当你再次创建相同的 SparkContext 实例的时候就会报错
# 会提示你:Cannot run multiple SparkContexts at once; existing SparkContext(app=..., master=local[*])
sc.stop()
接下来就要把它提交到 Spark 集群上运行了,通过 spark-submit,但是运行模式可以有多种选择
local运行模式
pyspark shell,便是以 local 模式启动的
spark-submit --master local[*] --name firstTest main.py