在 Spark 中,累加器(Accumulators)是一种可以用来在任务执行过程中进行累积的变量。它们主要用于计算全局的汇总值,如计数或求和。累加器是只加的变量(即只进行累加操作),并且是分布式的,适合于在多节点环境中进行汇总。
# -*- coding: utf-8 -*- """ ------------------------------------------------- File Name: 1.测试累加器 date: 2024/7/30 ------------------------------------------------- PRODUCT:PyCharm ------------------------------------------------- """ from pyspark import SparkContext # 初始化 SparkContext sc = SparkContext("local[*]", "测试累加器") # 创建累加器 accumulator = sc.accumulator(0) # 定义一个函数来增加累加器的值 def add_to_accumulator(x): global accumulator accumulator.add(x) # 创建一个 RDD rdd = sc.parallelize([1, 2, 3, 4]) # 使用 map 来应用函数,并累加值 rdd.foreach(lambda x: add_to_accumulator(x)) # 由于累加器的值在行动操作之后才会被更新,所以需要使用行动操作触发计算 rdd.count() # 触发计算 # 打印累加器的值 print("Accumulated value:", accumulator.value) 
# -*- coding: utf-8 -*- """ ------------------------------------------------- File Name: 4.自定义累加器测试 date: 2024/7/30 ------------------------------------------------- PRODUCT:PyCharm ------------------------------------------------- """ from pyspark import SparkContext from pyspark.accumulators import AccumulatorParam sc = SparkContext("local[*]", "自定义累加器测试") # 自定义累加器类 class ListAccumulatorParam(AccumulatorParam): def zero(self, value): return [] def addInPlace(self, acc1, acc2): return acc1 + acc2 list_accumulator = sc.accumulator([], ListAccumulatorParam()) def add_to_list_accumulator(x): global list_accumulator list_accumulator.add([x]) return x rdd = sc.parallelize([1, 2, 3, 4]) rdd.foreach(lambda x: add_to_list_accumulator(x)) rdd.count() # 触发计算 print("Accumulated list:", list_accumulator.value) 
ListAccumulatorParam 定义了一个自定义累加器,zero 方法返回一个空列表,addInPlace 方法合并两个列表。list_accumulator = sc.accumulator([], ListAccumulatorParam()) 创建了一个自定义的累加器实例。add_to_list_accumulator(x) 函数将每个元素作为列表加到累加器中。rdd.foreach(lambda x: add_to_list_accumulator(x)) 将 add_to_list_accumulator 函数应用到 RDD 的每个元素。rdd.count() 触发了 RDD 的计算,更新累加器的值。list_accumulator.value 获取累加器的最终值,即累加的列表。[1, 2, 3, 4] 被转换为单元素列表 [1], [2], [3], [4],并分别添加到累加器中。addInPlace 方法将这些列表合并成一个完整的列表。# -*- coding: utf-8 -*- """ ------------------------------------------------- File Name: 3.集合累加器测试 date: 2024/7/30 ------------------------------------------------- PRODUCT:PyCharm ------------------------------------------------- """ from pyspark import SparkContext from pyspark.accumulators import AccumulatorParam sc = SparkContext("local[*]", "集合累加器测试") # 自定义集合累加器类 class SetAccumulatorParam(AccumulatorParam): def zero(self, value): return set() def addInPlace(self, acc1, acc2): return acc1.union(acc2) set_accumulator = sc.accumulator(set(), SetAccumulatorParam()) def add_to_set_accumulator(x): global set_accumulator set_accumulator.add(set([x])) return x rdd = sc.parallelize([1, 2, 2, 3, 4, 4]) rdd.foreach(lambda x: add_to_set_accumulator(x)) rdd.count() # 触发计算 print("Unique elements:", len(set_accumulator.value)) 
SetAccumulatorParam 定义了一个自定义累加器,zero 方法返回一个空集合,addInPlace 方法合并两个集合。set_accumulator = sc.accumulator(set(), SetAccumulatorParam()) 创建了一个自定义的累加器实例。add_to_set_accumulator(x) 函数将每个元素作为集合添加到累加器中。rdd.foreach(lambda x: add_to_set_accumulator(x)) 将 add_to_set_accumulator 函数应用到 RDD 的每个元素。rdd.count() 触发了 RDD 的计算,更新累加器的值。len(set_accumulator.value) 获取累加器的最终值,即唯一元素的数量。[1, 2, 2, 3, 4, 4] 被转换为集合形式,分别是 {1}, {2}, {2}, {3}, {4}, {4}。下一篇:WPF中调用UWP API