在 PySpark 中的 GroupedData 上应用 UDF(带有可运行的 Python 示例)

2025-02-21 08:50:00
admin
原创
5
摘要:问题描述:我有一段在 pandas 数据框中本地运行的 python 代码:df_result = pd.DataFrame(df .groupby('A') .apply(lambda x: myFunc...

问题描述:

我有一段在 pandas 数据框中本地运行的 python 代码:

df_result = pd.DataFrame(df
                          .groupby('A')
                          .apply(lambda x: myFunction(zip(x.B, x.C), x.name))

我想在 PySpark 中运行它,但在处理 pyspark.sql.group.GroupedData 对象时遇到了问题。

我尝试了以下方法:

sparkDF
 .groupby('A')
 .agg(myFunction(zip('B', 'C'), 'A')) 

返回

KeyError: 'A'

我推测是因为“A”不再是一列,而且我找不到 x.name 的等效项。

进而

sparkDF
 .groupby('A')
 .map(lambda row: Row(myFunction(zip('B', 'C'), 'A'))) 
 .toDF()

但出现以下错误:

AttributeError: 'GroupedData' object has no attribute 'map'

如有任何建议我将非常感谢!


解决方案 1:

从 Spark 2.3 开始,您可以使用pandas_udf.GROUPED_MAP获取Callable[[pandas.DataFrame], pandas.DataFrame]或换句话说,一个函数,该函数DataFrame从与输入相同形状的 Pandas 映射到输出DataFrame

例如,如果数据如下所示:

df = spark.createDataFrame(
    [("a", 1, 0), ("a", -1, 42), ("b", 3, -1), ("b", 10, -2)],
    ("key", "value1", "value2")
)

并且您想要计算之间的成对最小值的平均值value1 value2,您必须定义输出模式:

from pyspark.sql.types import *

schema = StructType([
    StructField("key", StringType()),
    StructField("avg_min", DoubleType())
])

pandas_udf

import pandas as pd

from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType

@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
    result = pd.DataFrame(df.groupby(df.key).apply(
        lambda x: x.loc[:, ["value1", "value2"]].min(axis=1).mean()
    ))
    result.reset_index(inplace=True, drop=False)
    return result

并应用它:

df.groupby("key").apply(g).show()
+---+-------+
|key|avg_min|
+---+-------+
|  b|   -1.5|
|  a|   -0.5|
+---+-------+

除模式定义和装饰器外,您当前的 Pandas 代码可以按原样应用。

自 Spark 2.4.0 起,还有一个GROUPED_AGG变体,它采用Callable[[pandas.Series, ...], T],其中T是原始标量:

import numpy as np

@pandas_udf(DoubleType(), functionType=PandasUDFType.GROUPED_AGG)
def f(x, y):
    return np.minimum(x, y).mean()

可与标准group_by/agg构造一起使用:

df.groupBy("key").agg(f("value1", "value2").alias("avg_min")).show()
+---+-------+
|key|avg_min|
+---+-------+
|  b|   -1.5|
|  a|   -0.5|
+---+-------+

请注意, 和的行为方式与或GROUPED_MAP都不同,它更接近于或 具有无界框架的窗口函数。首先对数据进行混洗,然后才应用 UDF。GROUPPED_AGG pandas_udf`UserDefinedAggregateFunctionAggregatorgroupByKey`

为了优化执行,您应该实现 ScalaUserDefinedAggregateFunction并添加 Python 包装器。

另请参阅在 PySpark 中应用于窗口的用户定义函数?

解决方案 2:

您尝试编写的是 UDAF(用户定义聚合函数),而不是 UDF(用户定义函数)。UDAF 是处理按键分组数据的函数。具体来说,它们需要定义如何在单个分区中合并组中的多个值,然后如何跨按键分区合并结果。目前 Python 中没有办法实现 UDAF,只能在 Scala 中实现。

但是,您可以在 Python 中解决这个问题。您可以使用 collect set 来收集分组值,然后使用常规 UDF 对它们执行所需的操作。唯一需要注意的是 collect_set 仅适用于原始值,因此您需要将它们编码为字符串。

from pyspark.sql.types import StringType
from pyspark.sql.functions import col, collect_list, concat_ws, udf

def myFunc(data_list):
    for val in data_list:
        b, c = data.split(',')
        # do something

    return <whatever>

myUdf = udf(myFunc, StringType())

df.withColumn('data', concat_ws(',', col('B'), col('C'))) \n  .groupBy('A').agg(collect_list('data').alias('data'))
  .withColumn('data', myUdf('data'))

如果您想要重复数据删除,请使用 collect_set。此外,如果您的某些键有很多值,那么这会很慢,因为键的所有值都需要收集到集群中某个分区中。如果您的最终结果是通过某种方式组合每个键的值(例如对它们求和)构建的值,那么使用RDD 的aggregateByKey方法实现它可能会更快,该方法允许您在对数据进行混洗之前为分区中的每个键构建一个中间值。

编辑:2018 年 11 月 21 日

自从这个答案写出来后,pyspark 使用 Pandas 增加了对 UDAF 的支持。使用 Panda 的 UDF 和 UDAF 而不是使用 RDD 的直接 Python 函数时,性能会有一些不错的改进。在底层,它会对列进行矢量化(将来自多行的值分批处理以优化处理和压缩)。请查看此处以获得更好的解释,或查看下面user6910411的答案作为示例。

解决方案 3:

我将扩展上述答案。

因此,您可以使用@pandas_udf 在 pyspark 中实现与 pandas.groupby().apply 相同的逻辑,它是矢量化方法,并且比简单的 udf 更快。

from pyspark.sql.functions import pandas_udf, PandasUDFType
import pandas as pd

df3 = spark.createDataFrame([('a', 1, 0), ('a', -1, 42), ('b', 3, -1),
                            ('b', 10, -2)], ('key', 'value1', 'value2'))

from pyspark.sql.types import *

schema = StructType([StructField('key', StringType()),
                    StructField('avg_value1', DoubleType()),
                    StructField('avg_value2', DoubleType()),
                    StructField('sum_avg', DoubleType()),
                    StructField('sub_avg', DoubleType())])


@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
    gr = df['key'].iloc[0]
    x = df.value1.mean()
    y = df.value2.mean()
    w = df.value1.mean() + df.value2.mean()
    z = df.value1.mean() - df.value2.mean()
    return pd.DataFrame([[gr] + [x] + [y] + [w] + [z]])

df3.groupby('key').apply(g).show()

您将得到以下结果:

+---+----------+----------+-------+-------+
|key|avg_value1|avg_value2|sum_avg|sub_avg|
+---+----------+----------+-------+-------+
|  b|       6.5|      -1.5|    5.0|    8.0|
|  a|       0.0|      21.0|   21.0|  -21.0|
+---+----------+----------+-------+-------+

因此,您可以在分组数据中的其他字段之间进行更多计算,并将它们以列表格式添加到数据框中。

解决方案 4:

PySpark 版本 3.0.0 中的另一个新扩展:
applyInPandas

df = spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], 
                            ("id", "v"))  

def mean_func(key, pdf):
   # key is a tuple of one numpy.int64, which is the value
   # of 'id' for the current group
   return pd.DataFrame([key + (pdf.v.mean(),)])

df.groupby('id').applyInPandas(mean_func, schema="id long, v double").show() 

结果:

+---+---+
| id|  v|
+---+---+
|  1|1.5|
|  2|6.0|
+---+---+

有关更多详细信息,请参阅:https ://spark.apache.org/docs/3.2.0/api/python/reference/api/pyspark.sql.GroupedData.applyInPandas.html

相关推荐
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1267  
  IPD(Integrated Product Development)即集成产品开发,是一套先进的、成熟的产品开发管理理念、模式和方法。随着市场竞争的日益激烈,企业对于提升产品开发效率、降低成本、提高产品质量的需求愈发迫切,IPD 项目管理咨询市场也迎来了广阔的发展空间。深入探讨 IPD 项目管理咨询的市场需求与发展,...
IPD集成产品开发流程   27  
  IPD(Integrated Product Development)产品开发流程是一套先进的、被广泛应用的产品开发管理体系,它涵盖了从产品概念产生到产品推向市场并持续优化的全过程。通过将市场、研发、生产、销售等多个环节紧密整合,IPD旨在提高产品开发的效率、质量,降低成本,增强企业的市场竞争力。深入了解IPD产品开发...
IPD流程中TR   31  
  IPD(Integrated Product Development)测试流程是确保产品质量、提升研发效率的关键环节。它贯穿于产品从概念到上市的整个生命周期,对企业的成功至关重要。深入理解IPD测试流程的核心要点,有助于企业优化研发过程,打造更具竞争力的产品。以下将详细阐述IPD测试流程的三大核心要点。测试策略规划测试...
华为IPD   26  
  华为作为全球知名的科技企业,其成功背后的管理体系备受关注。IPD(集成产品开发)流程作为华为核心的产品开发管理模式,在创新管理与技术突破方面发挥了至关重要的作用。深入剖析华为 IPD 流程中的创新管理与技术突破,对于众多企业探索自身发展路径具有重要的借鉴意义。IPD 流程概述IPD 流程是一种先进的产品开发管理理念和方...
TR评审   26  
热门文章
项目管理软件有哪些?
云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用