本文最后更新于 2024-07-06,文章内容可能已经过时。

10. PySpark案例实战

1. PySpark定义

Spark:Apache Spark是用于大规模数据(large-scala data)处理的统一(unified)分析引擎。

简单来说,Spark是一款分布式的计算框架,用于调度成百上千的服务器集群,计算TB、PB乃至EB级别的海量数据。

Spark作为全球顶级的分布式计算框架,支持众多的编程语言进行开发。而Python语言,则是Spark重点支持的方向。

PySpark:Spark对Python语言的支持,重点体现在,Python第三方库:PySpark之上。

  • PySpark是由Spark官方开发的Python语言第三方库。
  • PySpark是Spark的Python实现,是Spark为Python开发者提供的编程入口,用于以Python代码完成Spark任务的开发。
  • PySpark不仅可以作为Python第三方库使用,也可以将程序提交的Spark集群环境中,调度大规模集群进行执行。

image-20240310172545395

2. 构建PySpark执行环境入口对象

想要使用PySpark库完成数据处理,首先需要构建一个执行环境入口对象。PySpark的执行环境入口对象是:类 SparkContext 的类对象

"""
演示获取PySpark的执行环境入库对象:SparkContext
并通过SparkContext对象获取当前PySpark的版本
"""

# 导包
from pyspark import SparkConf, SparkContext
# 创建SparkConf类对象(链式调用)
conf = SparkConf().setMaster("local[*]").setAppName("test_spark_app")
# 基于SparkConf类对象创建SparkContext对象
sc = SparkContext(conf=conf)
# 打印PySpark的运行版本
print(sc.version)
# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()

3.PySpark的编程模型

SparkContext类对象,是PySpark编程中一切功能的入口。PySpark的编程,主要分为如下三大步骤:

image-20240310182315844

image-20240310182444465

通过SparkContext对象,完成数据输入;输入数据后得到RDD对象,对RDD对象进行迭代计算;最终通过RDD对象的成员方法,完成数据输出工作。

  • 数据输入:通过SparkContext完成数据读取
  • 数据计算:读取到的数据转换为RDD对象,调用RDD的成员方法完成计算
  • 数据输出:调用RDD的数据输出相关成员方法,将结果输出到list、元组、字典、文本文件、数据库等

4. 数据输入——RDD对象

PySpark支持多种数据的输入,在输入完成后,都会得到一个:RDD类的对象RDD全称为:弹性分布式数据集(Resilient Distributed Datasets)PySpark针对数据的处理,都是以RDD对象作为载体,即:

  • 数据存储在RDD内
  • 各类数据的计算方法,也都是RDD的成员方法
  • RDD的数据计算方法,返回值依旧是RDD对象

PySpark的编程模型可以归纳为:准备数据到RDD -> RDD迭代计算 -> RDD导出为list、文本文件等即:源数据 -> RDD -> 结果数据

Python数据容器转RDD对象

PySpark支持通过SparkContext对象的parallelize成员方法,将:list、tuple、set、dict、str转换为PySpark的RDD对象。

注意:

  • 字符串会被拆分出1个个的字符,存入RDD对象
  • 字典仅有key会被存入RDD对象

读取文件转RDD对象

PySpark也支持通过SparkContext入口对象,用textFile方法来读取文件,用parallelize方法将Python对象加载到Spark内,来构建出RDD对象。

"""
演示通过PySpark代码加载数据,即数据输入
"""
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 通过parallelize方法将Python对象加载到Spark内,成为RDD对象
rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize((1, 2, 3, 4, 5))
rdd3 = sc.parallelize("abcdefg")
rdd4 = sc.parallelize({1, 2, 3, 4, 5})
rdd5 = sc.parallelize({"key1": "value1", "key2": "value2"})

# 如果要查看RDD里面有什么内容,需要用collect()方法
print(rdd1.collect())
print(rdd2.collect())
print(rdd3.collect())
print(rdd4.collect())
print(rdd5.collect())

# 用过textFile方法,读取文件数据加载到Spark内,成为RDD对象
rdd = sc.textFile("D:/hello.txt")
print(rdd.collect())
sc.stop()

5. 数据计算

PySpark的数据计算,都是基于RDD对象来进行的,那么如何进行呢?自然是依赖RDD对象内置丰富的:成员方法(算子)

5.1 map方法(map算子)

功能:map算子,是将RDD的数据一条条处理(处理的逻辑基于map算子中接收的处理函数),返回新的RDD。

map接受一个处理函数时,可用lambda表达式快速编写,对RDD内的元素逐个处理,并返回一个新的RDD。

链式调用:对于返回值是新RDD的算子,可以通过链式调用的方式多次调用算子。

image-20240310214606934

5.2 flatMap方法(flatMap算子)

功能:对rdd执行map操作,然后进行解除嵌套操作。

计算逻辑和map一样,但比map多出解除一层嵌套的功能

image-20240310214841820

5.3 reduceByKey算子

功能:针对KV型RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成组内数据(valve)的聚合操作。(接受一个处理函数,对数据进行两两计算)。语法如下:(参数为二元元组)

image-20240310221012653

image-20240310221111054

注意:reduceByKey中接收的函数,只负责聚合,不理会分组,分组是自动bykey来分组的。

"""
演示RDD的reduceByKey成员方法的使用
"""
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:/dev/python/python310/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 准备一个RDD
rdd = sc.parallelize([('男', 99), ('男', 88), ('女', 99), ('女', 66)])
# 求男生和女生两个组的成绩之和
rdd2 = rdd.reduceByKey(lambda a, b: a + b)
print(rdd2.collect())

5.4 综合案例1

"""
完成练习案例:单词计数统计
"""
# 1. 构建执行环境入口对象
from pyspark import SparkContext, SparkConf
import os
os.environ['PYSPARK_PYTHON'] = "D:/dev/python/python310/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)
# 2. 读取数据文件
rdd = sc.textFile("D:/hello.txt")
# 3. 取出全部单词
word_rdd = rdd.flatMap(lambda x: x.split(" "))
# 4. 将所有单词都转换成二元元组,单词为Key,value设置为1(用于计数)
word_with_one_rdd = word_rdd.map(lambda word: (word, 1))
# 5. 分组并求和
result_rdd = word_with_one_rdd.reduceByKey(lambda a, b: a + b)
# 6. 打印输出结果
print(result_rdd.collect())

5.5 filter算子

功能:过滤想要的数据进行保留。语法如下:

image-20240310222800673

filter接受一个处理函数,可用lambda快速编写函数对RDD数据逐个处理,得到True的保留至返回值的RDD中。

5.6 distinct算子

功能:对RDD数据进行去重,返回新RDD
语法:rdd.distinct ()无需传参

"""
演示RDD的distinct成员方法的使用
"""
from pyspark import SparkConf, SparkContext
import os
os.environ['PYSPARK_PYTHON'] = "D:/dev/python/python310/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 准备一个RDD
rdd = sc.parallelize([1, 1, 3, 3, 5, 5, 7, 8, 8, 9, 10])
# 对RDD的数据进行去重
rdd2 = rdd.distinct()

print(rdd2.collect())

5.7 sortBy算子

功能:对RDD数据进行排序,基于你指定的排序依据。语法:

image-20240310223603517

sortBy可接收一个处理函数,可用lambda快速编写,函数表示用来决定排序的依据,可以控制升序或降序,全局排序需要设置分区数为1,通过单个分区进行排序 。

5.8综合案例2

"""
完成练习案例:JSON商品统计
需求:
1. 各个城市销售额排名,从大到小
2. 全部城市,有哪些商品类别在售卖
3. 北京市有哪些商品类别在售卖
"""
from pyspark import SparkConf, SparkContext
import os
import json
os.environ['PYSPARK_PYTHON'] = 'D:/dev/python/python310/python.exe'
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# TODO 需求1: 城市销售额排名
# 1.1 读取文件得到RDD
file_rdd = sc.textFile("D:/orders.txt")
# 1.2 取出一个个JSON字符串
json_str_rdd = file_rdd.flatMap(lambda x: x.split("|"))
# 1.3 将一个个JSON字符串转换为字典
dict_rdd = json_str_rdd.map(lambda x: json.loads(x))
# 1.4 取出城市和销售额数据
# (城市,销售额)
city_with_money_rdd = dict_rdd.map(lambda x: (x['areaName'], int(x['money'])))
# 1.5 按城市分组按销售额聚合
city_result_rdd = city_with_money_rdd.reduceByKey(lambda a, b: a + b)
# 1.6 按销售额聚合结果进行排序
result1_rdd = city_result_rdd.sortBy(lambda x: x[1], ascending=False, numPartitions=1)
print("需求1的结果:", result1_rdd.collect())

# TODO 需求2: 全部城市有哪些商品类别在售卖
# 2.1 取出全部的商品类别
# 2.2 对全部商品类别进行去重
category_rdd = dict_rdd.map(lambda x: x['category']).distinct()
print("需求2的结果:", category_rdd.collect())

# TODO 需求3: 北京市有哪些商品类别在售卖
# 3.1 过滤北京市的数据
beijing_data_rdd = dict_rdd.filter(lambda x: x['areaName'] == '北京')
# 3.2 取出全部商品类别
# 3.3 进行商品类别去重
result3_rdd = beijing_data_rdd.map(lambda x: x['category']).distinct()
print("需求3的结果:", result3_rdd.collect())

6. 数据输出

输出为Python对象的各类方法:

6.1 collect算子

功能:将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象。(RDD->list)

用法:rdd.collect(),返回值是一个ist。

6.2 reduce.算子

功能:对RDD数据集按照传入的逻辑进行聚合。语法:

rdd.reduce(func)
# func:(T,T)→T
# 2参数传入1个返回值,返回值和参数要求类型一致。

代码:返回值等同于计算函数的返回值

rdd sc.parallelize(range(1,10))
# 将rdd的数据进行累求和
print(rdd.reduce(lambda a,b:a b))

6.3 take算子

功能:取RDD的前N个元素,组合成list返回给你(即返回前N个元素)。用法:

sc.parallelize([3,2,1,4,5,6]).take(5)

返回:[3,2,1,4,5]

6.4 count算子

功能:计算RDD有多少条数据,返回值是一个数字。用法:

sc.parallelize([3,2,1,4,5,6]).count()

返回:6

"""
演示将RDD输出为Python对象
"""
from pyspark import SparkConf, SparkContext
import os
import json
os.environ['PYSPARK_PYTHON'] = 'D:/dev/python/python310/python.exe'
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

# 准备RDD
rdd = sc.parallelize([1, 2, 3, 4, 5])

# collect算子,输出RDD为list对象
rdd_list: list = rdd.collect()
print(rdd_list)
print(type(rdd_list))
# reduce算子,对RDD进行两两聚合
num = rdd.reduce(lambda a, b: a + b)
print(num)
# take算子,取出RDD前N个元素,组成list返回
take_list = rdd.take(3)
print(take_list)
# count,统计rdd内有多少条数据,返回值为数字
num_count = rdd.count()
print(f"rdd内有{num_count}个元素")

sc.stop()

输出到文件的各类方法:

6.5 saveAsTextFile算子

功能:将RDD的数据写入文本文件中。支持本地写出,hdfs等文件系统

rdd sc.parallelize([1,2,3,4,5])
rdd.saveAsTextFile("../data/output/test.txt")

注意事项

调用保存文件的算子,需要配置Hadoop依赖

  • 下载Hadoop安装包: http://archive.apache.org/dist/hadoop/common/hadoop-3.0.0/hadoop-3.0.0.tar.gz
  • 解压到电脑任意位置
  • 在Python代码中使用os模块配置:os.environ[‘HADOOP_HOME’] = ‘HADOOP解压文件夹路径’
  • 下载winutils.exe,并放入Hadoop解压文件夹的bin目录内
  • https://raw.githubusercontent.com/steveloughran/winutils/master/hadoop-3.0.0/bin/winutils.exe
  • 下载hadoop.dll,并放入:C:/Windows/System32 文件夹内
  • https://raw.githubusercontent.com/steveloughran/winutils/master/hadoop-3.0.0/bin/hadoop.dll
  • 修改rdd分区为1个

image-20240311165457648

总结:

RDD输出到文件的方法:rdd.saveAsTextFile(路径)

  • 输出的结果是一个文件夹
  • 有几个分区就输出多少个结果文件

如何修改RDD分区

  • SparkConf对象设置conf.set(“spark.default.parallelism”, “1”)创
  • 建RDD的时候,sc.parallelize方法传入numSlices参数为1
"""
演示将RDD输出到文件中
"""
from pyspark import SparkConf, SparkContext
import os
import json
os.environ['PYSPARK_PYTHON'] = 'D:/dev/python/python310/python.exe'
os.environ['HADOOP_HOME'] = "D:/dev/hadoop-3.0.0"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
# conf.set("spark.default.parallelism", "1")
sc = SparkContext(conf=conf)

# 准备RDD1
rdd1 = sc.parallelize([1, 2, 3, 4, 5], numSlices=1)
# 准备RDD2
rdd2 = sc.parallelize([("Hello", 3), ("Spark", 5), ("Hi", 7)], 1)
# 准备RDD3
rdd3 = sc.parallelize([[1, 3, 5], [6, 7, 9], [11, 13, 11]], 1)

# 输出到文件中
rdd1.saveAsTextFile("D:/output1")
rdd2.saveAsTextFile("D:/output2")
rdd3.saveAsTextFile("D:/output3")

7. 综合案例——搜索引擎日志分析

读取文件转换成RDD,并完成:

  • 打印输出:热门搜索时间段(小时精度)Top3
  • 打印输出:热门搜索词Top3
  • 打印输出:统计黑马程序员关键字在哪个时段被搜索最多
  • 将数据转换为JSON格式,写出为文件
"""
演示PySpark综合案例
"""
from pyspark import SparkConf, SparkContext
import os
import json
os.environ['PYSPARK_PYTHON'] = 'D:/dev/python/python310/python.exe'
os.environ['HADOOP_HOME'] = "D:/dev/hadoop-3.0.0"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
conf.set("spark.default.parallelism", "1")
sc = SparkContext(conf=conf)

# 读取文件转换成RDD
file_rdd = sc.textFile("D:/search_log.txt")
# TODO 需求1: 热门搜索时间段Top3(小时精度)
# 1.1 取出全部的时间并转换为小时(切片)
# 1.2 转换为(小时, 1) 的二元元组
# 1.3 Key分组聚合Value
# 1.4 排序(降序)
# 1.5 取前3
result1 = file_rdd.map(lambda x: x.split("\t")).\
    map(lambda x: x[0][:2]).\
    map(lambda x: (x, 1)).\
    reduceByKey(lambda a, b: a + b).\
    sortBy(lambda x: x[1], ascending=False, numPartitions=1).\
    take(3)
print("需求1的结果:", result1)

# TODO 需求2: 热门搜索词Top3
# 2.1 取出全部的搜索词
# 2.2 (词, 1) 二元元组
# 2.3 分组聚合
# 2.4 排序
# 2.5 Top3
result2 = file_rdd.map(lambda x: (x.split("\t")[2], 1)).\
    reduceByKey(lambda a, b: a + b).\
    sortBy(lambda x: x[1], ascending=False, numPartitions=1).\
    take(3)
print("需求2的结果:", result2)

# TODO 需求3: 统计黑马程序员关键字在什么时段被搜索的最多
# 3.1 过滤内容,只保留黑马程序员关键词
# 3.2 转换为(小时, 1) 的二元元组
# 3.3 Key分组聚合Value
# 3.4 排序(降序)
# 3.5 取前1
result3 = file_rdd.map(lambda x: x.split("\t")).\
    filter(lambda x: x[2] == '黑马程序员').\
    map(lambda x: (x[0][:2], 1)).\
    reduceByKey(lambda a, b: a + b).\
    sortBy(lambda x: x[1], ascending=False, numPartitions=1).\
    take(1)
print("需求3的结果:", result3)

# TODO 需求4: 将数据转换为JSON格式,写出到文件中
# 4.1 转换为JSON格式的RDD
# 4.2 写出为文件
file_rdd.map(lambda x: x.split("\t")).\
    map(lambda x: {"time": x[0], "user_id": x[1], "key_word": x[2], "rank1": x[3], "rank2": x[4], "url": x[5]}).\
    saveAsTextFile("D:/output_json")