# 10. PySpark案例实战 #### 1. PySpark定义 \*\*Spark:\*\*Apache Spark是用于大规模数据(large-scala data)处理的统一(unified)分析引擎。 简单来说,Spark是一款分布式的计算框架,用于调度成百上千的服务器集群,计算TB、PB乃至EB级别的海量数据。 Spark作为全球顶级的分布式计算框架,支持众多的编程语言进行开发。而Python语言,则是Spark重点支持的方向。 \*\*PySpark\*\*:Spark对Python语言的支持,重点体现在,Python第三方库:PySpark之上。 \* PySpark是由Spark官方开发的Python语言第三方库。 \* PySpark是Spark的Python实现,是Spark为Python开发者提供的编程入口,用于以Python代码完成Spark任务的开发。 \* PySpark不仅可以作为Python第三方库使用,也可以将程序提交的Spark集群环境中,调度大规模集群进行执行。 ![image-20240310172545395](https://hgh-typora-image.oss-cn-guangzhou.aliyuncs.com/img/image-20240310172545395.png) #### 2. 构建PySpark执行环境入口对象 想要使用PySpark库完成数据处理,首先需要构建一个执行环境入口对象。PySpark的执行环境入口对象是:类 SparkContext 的类对象 \`\`\`python """ 演示获取PySpark的执行环境入库对象:SparkContext 并通过SparkContext对象获取当前PySpark的版本 """ # 导包 from pyspark import SparkConf, SparkContext # 创建SparkConf类对象(链式调用) conf = SparkConf().setMaster("local\[\*\]").setAppName("test_spark_app") # 基于SparkConf类对象创建SparkContext对象 sc = SparkContext(conf=conf) # 打印PySpark的运行版本 print(sc.version) # 停止SparkContext对象的运行(停止PySpark程序) sc.stop() \`\`\` #### 3.PySpark的编程模型 SparkContext类对象,是PySpark编程中一切功能的入口。PySpark的编程,主要分为如下三大步骤: !\[image-20240310182315844\](https://hgh-typora-image.oss-cn-guangzhou.aliyuncs.com/img/image-20240310182315844.png) ![image-20240310182444465](https://hgh-typora-image.oss-cn-guangzhou.aliyuncs.com/img/image-20240310182444465.png) 通过SparkContext对象,完成数据输入;输入数据后得到RDD对象,对RDD对象进行迭代计算;最终通过RDD对象的成员方法,完成数据输出工作。 \* 数据输入:通过SparkContext完成数据读取 \* 数据计算:读取到的数据转换为RDD对象,调用RDD的成员方法完成计算 \* 数据输出:调用RDD的数据输出相关成员方法,将结果输出到list、元组、字典、文本文件、数据库等 #### 4. 数据输入------RDD对象 PySpark支持多种数据的输入,在输入完成后,都会得到一个:RDD类的对象RDD全称为:\*\*弹性分布式数据集(Resilient Distributed Datasets)\*\*PySpark针对数据的处理,都是以RDD对象作为载体,即: \* 数据存储在RDD内 \* 各类数据的计算方法,也都是RDD的成员方法 \* RDD的数据计算方法,返回值依旧是RDD对象 PySpark的编程模型可以归纳为:准备数据到RDD -\> RDD迭代计算 -\> RDD导出为list、文本文件等即:源数据 -\> RDD -\> 结果数据 \*\*Python数据容器转RDD对象\*\* PySpark支持通过SparkContext对象的parallelize成员方法,将:list、tuple、set、dict、str转换为PySpark的RDD对象。 \*\*注意:\*\* \* 字符串会被拆分出1个个的字符,存入RDD对象 \* 字典仅有key会被存入RDD对象 \*\*读取文件转RDD对象\*\* PySpark也支持通过SparkContext入口对象,用\*\*textFile方法\*\*来读取文件,用\*\*parallelize方法\*\*将Python对象加载到Spark内,来构建出RDD对象。 \`\`\`python """ 演示通过PySpark代码加载数据,即数据输入 """ from pyspark import SparkConf, SparkContext conf = SparkConf().setMaster("local\[\*\]").setAppName("test_spark") sc = SparkContext(conf=conf) # 通过parallelize方法将Python对象加载到Spark内,成为RDD对象 rdd1 = sc.parallelize(\[1, 2, 3, 4, 5\]) rdd2 = sc.parallelize((1, 2, 3, 4, 5)) rdd3 = sc.parallelize("abcdefg") rdd4 = sc.parallelize({1, 2, 3, 4, 5}) rdd5 = sc.parallelize({"key1": "value1", "key2": "value2"}) # 如果要查看RDD里面有什么内容,需要用collect()方法 print(rdd1.collect()) print(rdd2.collect()) print(rdd3.collect()) print(rdd4.collect()) print(rdd5.collect()) # 用过textFile方法,读取文件数据加载到Spark内,成为RDD对象 rdd = sc.textFile("D:/hello.txt") print(rdd.collect()) sc.stop() \`\`\` ### 5. 数据计算 PySpark的数据计算,都是基于RDD对象来进行的,那么如何进行呢?自然是依赖RDD对象内置丰富的:\*\*成员方法(算子)\*\* #### 5.1 map方法(map算子) 功能:map算子,是将RDD的数据一条条处理(处理的逻辑基于map算子中接收的处理函数),返回新的RDD。 map接受一个处理函数时,可用lambda表达式快速编写,对RDD内的元素逐个处理,并返回一个新的RDD。 \*\*链式调用:\*\*对于返回值是新RDD的算子,可以通过链式调用的方式多次调用算子。 !\[image-20240310214606934\](https://hgh-typora-image.oss-cn-guangzhou.aliyuncs.com/img/image-20240310214606934.png) #### 5.2 flatMap方法(flatMap算子) \*\*功能:\*\*对rdd执行map操作,然后进行解除嵌套操作。 计算逻辑和map一样,但比map多出解除一层嵌套的功能 ![image-20240310214841820](https://hgh-typora-image.oss-cn-guangzhou.aliyuncs.com/img/image-20240310214841820.png) #### 5.3 reduceByKey算子 \*\*功能:\*\*针对KV型RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成组内数据(valve)的聚合操作。(接受一个处理函数,对数据进行两两计算)。语法如下:(参数为二元元组) !\[image-20240310221012653\](https://hgh-typora-image.oss-cn-guangzhou.aliyuncs.com/img/image-20240310221012653.png) !\[image-20240310221111054\](https://hgh-typora-image.oss-cn-guangzhou.aliyuncs.com/img/image-20240310221111054.png) 注意:reduceByKey中接收的函数,只负责聚合,不理会分组,分组是自动bykey来分组的。 \`\`\`python """ 演示RDD的reduceByKey成员方法的使用 """ from pyspark import SparkConf, SparkContext import os os.environ\['PYSPARK_PYTHON'\] = "D:/dev/python/python310/python.exe" conf = SparkConf().setMaster("local\[\*\]").setAppName("test_spark") sc = SparkContext(conf=conf) # 准备一个RDD rdd = sc.parallelize(\[('男', 99), ('男', 88), ('女', 99), ('女', 66)\]) # 求男生和女生两个组的成绩之和 rdd2 = rdd.reduceByKey(lambda a, b: a + b) print(rdd2.collect()) \`\`\` #### 5.4 综合案例1 \`\`\`python """ 完成练习案例:单词计数统计 """ # 1. 构建执行环境入口对象 from pyspark import SparkContext, SparkConf import os os.environ\['PYSPARK_PYTHON'\] = "D:/dev/python/python310/python.exe" conf = SparkConf().setMaster("local\[\*\]").setAppName("test_spark") sc = SparkContext(conf=conf) # 2. 读取数据文件 rdd = sc.textFile("D:/hello.txt") # 3. 取出全部单词 word_rdd = rdd.flatMap(lambda x: x.split(" ")) # 4. 将所有单词都转换成二元元组,单词为Key,value设置为1(用于计数) word_with_one_rdd = word_rdd.map(lambda word: (word, 1)) # 5. 分组并求和 result_rdd = word_with_one_rdd.reduceByKey(lambda a, b: a + b) # 6. 打印输出结果 print(result_rdd.collect()) \`\`\` #### 5.5 filter算子 功能:过滤想要的数据进行保留。语法如下: !\[image-20240310222800673\](https://hgh-typora-image.oss-cn-guangzhou.aliyuncs.com/img/image-20240310222800673.png) filter接受一个处理函数,可用lambda快速编写函数对RDD数据逐个处理,得到True的保留至返回值的RDD中。 #### 5.6 distinct算子 功能:对RDD数据进行去重,返回新RDD 语法:\*\*rdd.distinct ()\*\*无需传参 \`\`\`python """ 演示RDD的distinct成员方法的使用 """ from pyspark import SparkConf, SparkContext import os os.environ\['PYSPARK_PYTHON'\] = "D:/dev/python/python310/python.exe" conf = SparkConf().setMaster("local\[\*\]").setAppName("test_spark") sc = SparkContext(conf=conf) # 准备一个RDD rdd = sc.parallelize(\[1, 1, 3, 3, 5, 5, 7, 8, 8, 9, 10\]) # 对RDD的数据进行去重 rdd2 = rdd.distinct() print(rdd2.collect()) \`\`\` #### 5.7 sortBy算子 功能:对RDD数据进行排序,基于你指定的排序依据。语法: !\[image-20240310223603517\](C:/Users/23718/AppData/Roaming/Typora/typora-user-images/image-20240310223603517.png) sortBy可接收一个处理函数,可用lambda快速编写,函数表示用来决定排序的依据,可以控制升序或降序,全局排序需要设置分区数为1,通过单个分区进行排序 。 #### 5.8综合案例2 \`\`\`python """ 完成练习案例:JSON商品统计 需求: 1. 各个城市销售额排名,从大到小 2. 全部城市,有哪些商品类别在售卖 3. 北京市有哪些商品类别在售卖 """ from pyspark import SparkConf, SparkContext import os import json os.environ\['PYSPARK_PYTHON'\] = 'D:/dev/python/python310/python.exe' conf = SparkConf().setMaster("local\[\*\]").setAppName("test_spark") sc = SparkContext(conf=conf) # TODO 需求1: 城市销售额排名 # 1.1 读取文件得到RDD file_rdd = sc.textFile("D:/orders.txt") # 1.2 取出一个个JSON字符串 json_str_rdd = file_rdd.flatMap(lambda x: x.split("\|")) # 1.3 将一个个JSON字符串转换为字典 dict_rdd = json_str_rdd.map(lambda x: json.loads(x)) # 1.4 取出城市和销售额数据 # (城市,销售额) city_with_money_rdd = dict_rdd.map(lambda x: (x\['areaName'\], int(x\['money'\]))) # 1.5 按城市分组按销售额聚合 city_result_rdd = city_with_money_rdd.reduceByKey(lambda a, b: a + b) # 1.6 按销售额聚合结果进行排序 result1_rdd = city_result_rdd.sortBy(lambda x: x\[1\], ascending=False, numPartitions=1) print("需求1的结果:", result1_rdd.collect()) # TODO 需求2: 全部城市有哪些商品类别在售卖 # 2.1 取出全部的商品类别 # 2.2 对全部商品类别进行去重 category_rdd = dict_rdd.map(lambda x: x\['category'\]).distinct() print("需求2的结果:", category_rdd.collect()) # TODO 需求3: 北京市有哪些商品类别在售卖 # 3.1 过滤北京市的数据 beijing_data_rdd = dict_rdd.filter(lambda x: x\['areaName'\] == '北京') # 3.2 取出全部商品类别 # 3.3 进行商品类别去重 result3_rdd = beijing_data_rdd.map(lambda x: x\['category'\]).distinct() print("需求3的结果:", result3_rdd.collect()) \`\`\` ### 6. 数据输出 \*\*输出为Python对象的各类方法:\*\* #### 6.1 collect算子 功能:将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象。(RDD-\>list) 用法:rdd.collect(),返回值是一个ist。 #### 6.2 reduce.算子 功能:对RDD数据集按照传入的逻辑进行聚合。语法: \> rdd.reduce(func) \> \\# func:(T,T)→T \> \\# 2参数传入1个返回值,返回值和参数要求类型一致。 代码:返回值等同于计算函数的返回值 \`\`\`python rdd sc.parallelize(range(1,10)) # 将rdd的数据进行累求和 print(rdd.reduce(lambda a,b:a b)) \`\`\` #### 6.3 take算子 功能:取RDD的前N个元素,组合成list返回给你(即返回前N个元素)。用法: \> sc.parallelize(\[3,2,1,4,5,6\]).take(5) \> \> 返回:\[3,2,1,4,5\] #### 6.4 count算子 功能:计算RDD有多少条数据,返回值是一个数字。用法: \> sc.parallelize(\[3,2,1,4,5,6\]).count() \> \> 返回:6 \`\`\`python """ 演示将RDD输出为Python对象 """ from pyspark import SparkConf, SparkContext import os import json os.environ\['PYSPARK_PYTHON'\] = 'D:/dev/python/python310/python.exe' conf = SparkConf().setMaster("local\[\*\]").setAppName("test_spark") sc = SparkContext(conf=conf) # 准备RDD rdd = sc.parallelize(\[1, 2, 3, 4, 5\]) # collect算子,输出RDD为list对象 rdd_list: list = rdd.collect() print(rdd_list) print(type(rdd_list)) # reduce算子,对RDD进行两两聚合 num = rdd.reduce(lambda a, b: a + b) print(num) # take算子,取出RDD前N个元素,组成list返回 take_list = rdd.take(3) print(take_list) # count,统计rdd内有多少条数据,返回值为数字 num_count = rdd.count() print(f"rdd内有{num_count}个元素") sc.stop() \`\`\` \*\*输出到文件的各类方法:\*\* #### 6.5 saveAsTextFile算子 功能:将RDD的数据写入文本文件中。支持本地写出,hdfs等文件系统 \`\`\`python rdd sc.parallelize(\[1,2,3,4,5\]) rdd.saveAsTextFile("../data/output/test.txt") \`\`\` \*\*注意事项\*\* 调用保存文件的算子,需要配置Hadoop依赖 \* 下载Hadoop安装包: http://archive.apache.org/dist/hadoop/common/hadoop-3.0.0/hadoop-3.0.0.tar.gz \* 解压到电脑任意位置 \* 在Python代码中使用os模块配置:os.environ\['HADOOP_HOME'\] = 'HADOOP解压文件夹路径' \* 下载winutils.exe,并放入Hadoop解压文件夹的bin目录内 \* https://raw.githubusercontent.com/steveloughran/winutils/master/hadoop-3.0.0/bin/winutils.exe \* 下载hadoop.dll,并放入:C:/Windows/System32 文件夹内 \* https://raw.githubusercontent.com/steveloughran/winutils/master/hadoop-3.0.0/bin/hadoop.dll \* 修改rdd分区为1个 ![image-20240311165457648](https://hgh-typora-image.oss-cn-guangzhou.aliyuncs.com/img/image-20240311165457648.png) \*\*总结:\*\* RDD输出到文件的方法:rdd.saveAsTextFile(路径) \* 输出的结果是一个文件夹 \* 有几个分区就输出多少个结果文件 如何修改RDD分区 \* SparkConf对象设置conf.set("spark.default.parallelism", "1")创 \* 建RDD的时候,sc.parallelize方法传入numSlices参数为1 \`\`\`python """ 演示将RDD输出到文件中 """ from pyspark import SparkConf, SparkContext import os import json os.environ\['PYSPARK_PYTHON'\] = 'D:/dev/python/python310/python.exe' os.environ\['HADOOP_HOME'\] = "D:/dev/hadoop-3.0.0" conf = SparkConf().setMaster("local\[\*\]").setAppName("test_spark") # conf.set("spark.default.parallelism", "1") sc = SparkContext(conf=conf) # 准备RDD1 rdd1 = sc.parallelize(\[1, 2, 3, 4, 5\], numSlices=1) # 准备RDD2 rdd2 = sc.parallelize(\[("Hello", 3), ("Spark", 5), ("Hi", 7)\], 1) # 准备RDD3 rdd3 = sc.parallelize(\[\[1, 3, 5\], \[6, 7, 9\], \[11, 13, 11\]\], 1) # 输出到文件中 rdd1.saveAsTextFile("D:/output1") rdd2.saveAsTextFile("D:/output2") rdd3.saveAsTextFile("D:/output3") \`\`\` ### 7. 综合案例------搜索引擎日志分析 读取文件转换成RDD,并完成: \* 打印输出:热门搜索时间段(小时精度)Top3 \* 打印输出:热门搜索词Top3 \* 打印输出:统计黑马程序员关键字在哪个时段被搜索最多 \* 将数据转换为JSON格式,写出为文件 \`\`\`python """ 演示PySpark综合案例 """ from pyspark import SparkConf, SparkContext import os import json os.environ\['PYSPARK_PYTHON'\] = 'D:/dev/python/python310/python.exe' os.environ\['HADOOP_HOME'\] = "D:/dev/hadoop-3.0.0" conf = SparkConf().setMaster("local\[\*\]").setAppName("test_spark") conf.set("spark.default.parallelism", "1") sc = SparkContext(conf=conf) # 读取文件转换成RDD file_rdd = sc.textFile("D:/search_log.txt") # TODO 需求1: 热门搜索时间段Top3(小时精度) # 1.1 取出全部的时间并转换为小时(切片) # 1.2 转换为(小时, 1) 的二元元组 # 1.3 Key分组聚合Value # 1.4 排序(降序) # 1.5 取前3 result1 = file_rdd.map(lambda x: x.split("\\t")).\\ map(lambda x: x\[0\]\[:2\]).\\ map(lambda x: (x, 1)).\\ reduceByKey(lambda a, b: a + b).\\ sortBy(lambda x: x\[1\], ascending=False, numPartitions=1).\\ take(3) print("需求1的结果:", result1) # TODO 需求2: 热门搜索词Top3 # 2.1 取出全部的搜索词 # 2.2 (词, 1) 二元元组 # 2.3 分组聚合 # 2.4 排序 # 2.5 Top3 result2 = file_rdd.map(lambda x: (x.split("\\t")\[2\], 1)).\\ reduceByKey(lambda a, b: a + b).\\ sortBy(lambda x: x\[1\], ascending=False, numPartitions=1).\\ take(3) print("需求2的结果:", result2) # TODO 需求3: 统计黑马程序员关键字在什么时段被搜索的最多 # 3.1 过滤内容,只保留黑马程序员关键词 # 3.2 转换为(小时, 1) 的二元元组 # 3.3 Key分组聚合Value # 3.4 排序(降序) # 3.5 取前1 result3 = file_rdd.map(lambda x: x.split("\\t")).\\ filter(lambda x: x\[2\] == '黑马程序员').\\ map(lambda x: (x\[0\]\[:2\], 1)).\\ reduceByKey(lambda a, b: a + b).\\ sortBy(lambda x: x\[1\], ascending=False, numPartitions=1).\\ take(1) print("需求3的结果:", result3) # TODO 需求4: 将数据转换为JSON格式,写出到文件中 # 4.1 转换为JSON格式的RDD # 4.2 写出为文件 file_rdd.map(lambda x: x.split("\\t")).\\ map(lambda x: {"time": x\[0\], "user_id": x\[1\], "key_word": x\[2\], "rank1": x\[3\], "rank2": x\[4\], "url": x\[5\]}).\\ saveAsTextFile("D:/output_json") \`\`\`