Getting Started with Spark
Installing Spark
Download from the official website and extract.
Configure the JAVA_HOME environment variable and add Spark to the system PATH.
~/.bashrc:
export SPARK_HOME=/data/download/spark-3.5.1-bin-hadoop3
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
export JAVA_HOME=/usr/lib/jvm/TencentKona-8.0.17-402
export PATH=$PATH:$JAVA_HOME/bin
spark-shell
Start spark-shell:
spark-shell
Word count test:
var hFile = sc.textFile("hdfs://localhost:9000/user/wjrtest/input/capacity-scheduler.xml")
val wc = hFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
wc.take(5)
PySpark
PySpark Shell
Start PySpark:
pyspark
Startup failed with a segmentation fault.
Increase the core dump size limit:
ulimit -c unlimited
Running pyspark again triggers a Segmentation fault.
Debug with gdb:
gdb -c core
(gdb) bt
Output:
(base) [root@xxx-tencentos /data/download/spark-3.5.1-bin-hadoop3]# gdb -c core.17222
GNU gdb (GDB) Red Hat Enterprise Linux 7.6.1-120.tl2
Copyright (C) 2013 Free Software Foundation, Inc.
License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>
This is free software: you are free to change and redistribute it.
There is NO WARRANTY, to the extent permitted by law. Type "show copying"
and "show warranty" for details.
This GDB was configured as "x86_64-redhat-linux-gnu".
For bug reporting instructions, please see:
<http://www.gnu.org/software/gdb/bugs/>.
[New LWP 17222]
Core was generated by `python3'.
Program terminated with signal 11, Segmentation fault.
#0 0x00007ff3bfdf0f4f in ?? ()
(gdb) bt
#0 0x00007ff3bfdf0f4f in ?? ()
#1 0x00007ff3c0c2d049 in ?? ()
#2 0x0000000000000000 in ?? ()
(gdb)
The root cause could not be determined from the backtrace.
Further investigation — checking Spark logs, disabling the conda environment — did not resolve the issue.
The fix was to edit conf/spark-env.sh and explicitly point PySpark to the system Python 3:
export PYSPARK_DRIVER_PYTHON=/usr/bin/python3
export PYSPARK_PYTHON=/usr/bin/python3
Running pyspark again — problem resolved.
An unusual bug with no similar reports found online. My hypothesis: the conda environment had Python 3.12 set as the system default (a newer version), so PySpark’s newly spawned session picked up that Python — which is why deactivating conda in the terminal didn’t help either.
Reference: https://www.cnblogs.com/traditional/p/11724876.html
Spark job execution details and metrics are available via the web UI (port 4040).
Creating an RDD from an Existing Collection
rdd1 = sc.parallelize(range(10))
# Call sc.parallelize to convert an existing collection into an RDD
>>> rdd1 = sc.parallelize(range(10))
# This displays an RDD object
>>> rdd1
PythonRDD[1] at RDD at PythonRDD.scala:53
# To view the actual contents, call the collect method
>>> rdd1.collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
# Apply map to get rdd2
>>> rdd2 = rdd1.map(lambda x: x + 1)
>>> rdd2.collect()
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# Apply reduce
>>> rdd2.reduce(lambda x, y: x + y)
55
>>>
Specifying the number of partitions at creation:
>>> rdd1 = sc.parallelize(range(10), 5)
>>> rdd1.glom().collect()
[[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]
>>> rdd1.getNumPartitions() # Get partition count
5
SparkContext can also read files from storage systems to create RDDs. The following demonstrates reading from local filesystem and HDFS.
# Use textFile to read a file by path; partition count can also be specified
# Read from local filesystem: prefix with "file://"
>>> rdd1 = sc.textFile("file:///root/data.txt")
>>> rdd1.collect()
['hello matsuri', 'hello koishi', 'hello matsuri', 'hello marisa']
# Read from HDFS: format is "hdfs://ip:port/path", where port is the HDFS NameNode port
# Both "...//girl.txt" and ".../girl.txt" are valid
>>> rdd2 = sc.textFile("hdfs://localhost:9000/user/wjrtest/input/capacity-scheduler.xml", 4)
>>> rdd2.collect()
['hello matsuri', 'hello koishi', 'hello matsuri', 'hello marisa']
# Apply map
>>> rdd3 = rdd1.map(lambda x: x.split())
>>> rdd3.collect()
[['hello', 'matsuri'], ['hello', 'koishi'], ['hello', 'matsuri'], ['hello', 'marisa']]
textFile can also read directories: textFile("/dir"), use glob patterns: textFile("/dir/*.txt"), or read .gz archives.
RDDs can also be saved to files using saveAsTextFile.
>>> rdd = sc.parallelize(range(8), 4)
>>> rdd = rdd.map(lambda x: f"{x}号")
# Saves locally by default; "file://" prefix is optional
>>> rdd.saveAsTextFile("/home/wjr/test/a.txt")
# Save to HDFS
>>> rdd.saveAsTextFile("hdfs://satori001:9000/a.txt")
Note: although the filename is a.txt, the result is a directory, not a text file.
An RDD is an abstract concept representing a collection of partitions that may be distributed across different nodes.
# An RDD whose elements are 2-tuples is called a key-value (KV) RDD
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 2), ("b", 2), ("c", 4)])
>>> rdd.reduceByKey(lambda x, y: x + y).collect()
[('b', 3), ('c', 4), ('a', 3)]
Word Count
>>> rdd = sc.textFile("hdfs://localhost:9000/user/wjrtest/input/capacity-scheduler.xml")
# Split by whitespace
>>> rdd = rdd.flatMap(lambda x: x.split())
>>> rdd.collect()
>>> rdd = rdd.map(lambda x: (x, 1))
>>> rdd.collect()
# Group and sum within each group
>>> rdd = rdd.reduceByKey(lambda x, y: x + y)
>>> rdd.collect()
[('python', 1), ('rust', 1), ('hello', 3), ('golang', 1)]
Alternative approach:
>>> rdd = sc.textFile("hdfs://localhost:9000/user/wjrtest/input/capacity-scheduler.xml")
# Split and then group
>>> rdd = rdd.flatMap(lambda x: x.split()).groupBy(lambda x: x)
>>> rdd.collect()
[('python', <pyspark.resultiterable.ResultIterable object at 0x7fb58e91ad30>),
('rust', <pyspark.resultiterable.ResultIterable object at 0x7fb58e91aa90>),
('hello', <pyspark.resultiterable.ResultIterable object at 0x7fb58e91aa58>),
('golang', <pyspark.resultiterable.ResultIterable object at 0x7fb58e91a7b8>)]
# Elements with the same label (the word itself) are grouped together
>>> rdd = rdd.mapValues(lambda x: list(x))
>>> rdd.collect()
[('python', ['python']),
('rust', ['rust']),
('hello', ['hello', 'hello', 'hello']),
('golang', ['golang'])]
>>> rdd = rdd.map(lambda x: (x[0], len(x[1])))
>>> rdd.collect()
[('python', 1), ('rust', 1), ('hello', 3), ('golang', 1)]
>>>
RDD Persistence
Calling transformation operators on an RDD builds an execution plan. Execution only begins when an action operator is called.
One of Spark’s most important capabilities is the ability to persist (cache) data in memory. When you persist an RDD, each node stores all partitions of that RDD, making them available for reuse in subsequent actions. This makes later actions significantly faster (often orders of magnitude) — caching is especially effective for iterative algorithms and interactive use.
An RDD can be persisted by calling persist() or cache(). The data is stored in the executors’ memory on the first action trigger. Spark cache is also fault-tolerant: if a partition is lost, it will be recomputed from the original transformation chain.
persist is lazy — caching only occurs when an action is triggered. To release cache, call unpersist, which takes effect immediately.
Writing and Submitting a Python Script
Create a Python file:
# Filename: main.py
from pyspark import SparkContext
# SparkConf can be used to specify configuration, but the official recommendation
# is to pass config at job submission time rather than hardcoding it.
# Here we directly instantiate SparkContext, named sc.
sc = SparkContext()
rdd = sc.textFile("hdfs://localhost:9000/user/wjrtest/input/capacity-scheduler.xml")
# Split by whitespace
rdd = rdd.flatMap(lambda x: x.split())
rdd.collect()
rdd = rdd.map(lambda x: (x, 1))
rdd.collect()
# Group and sum within each group
rdd = rdd.reduceByKey(lambda x, y: x + y)
# Outside the shell, print is required to see results
print(rdd.collect())
# Good practice: call stop() at the end to close the connection to the Spark cluster.
# Otherwise, creating another SparkContext will raise:
# Cannot run multiple SparkContexts at once; existing SparkContext(app=..., master=local[*])
sc.stop()
Submit the script to the Spark cluster using spark-submit. Multiple run modes are available.
Local Mode
The PySpark shell runs in local mode by default.
spark-submit --master local[*] --name firstTest main.py