且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

将列总和添加为PySpark数据框中的新列

更新时间:2022-06-13 09:08:49

这并不明显.我看不到spark Dataframes API中定义的列的基于行的总和.

This was not obvious. I see no row-based sum of the columns defined in the spark Dataframes API.

这可以通过非常简单的方式完成:

This can be done in a fairly simple way:

newdf = df.withColumn('total', sum(df[col] for col in df.columns))

pyspark提供了

df.columns作为字符串列表,给出了Spark Dataframe中的所有列名称.对于不同的总和,您可以提供任何其他列名列表.

df.columns is supplied by pyspark as a list of strings giving all of the column names in the Spark Dataframe. For a different sum, you can supply any other list of column names instead.

我没有尝试将其作为第一个解决方案,因为我不确定它的行为.但这有效.

I did not try this as my first solution because I wasn't certain how it would behave. But it works.

这太复杂了,但效果也不错.

This is overly complicated, but works as well.

您可以执行以下操作:

  1. 使用df.columns获取列名的列表
  2. 使用该名称列表创建列的列表
  3. 将该列表传递给将在
  1. use df.columns to get a list of the names of the columns
  2. use that names list to make a list of the columns
  3. pass that list to something that will invoke the column's overloaded add function in a fold-type functional manner

借助python的 reduce ,一些关于运算符重载工作原理的知识,以及此处的列的pyspark代码:

With python's reduce, some knowledge of how operator overloading works, and the pyspark code for columns here that becomes:

def column_add(a,b):
     return  a.__add__(b)

newdf = df.withColumn('total_col', 
         reduce(column_add, ( df[col] for col in df.columns ) ))

请注意,这是python简化,而不是spark RDD简化,第二个参数reduce中的括号项需要括号,因为它是一个列表生成器表达式.

Note this is a python reduce, not a spark RDD reduce, and the parenthesis term in the second parameter to reduce requires the parenthesis because it is a list generator expression.

经测试,有效!

$ pyspark
>>> df = sc.parallelize([{'a': 1, 'b':2, 'c':3}, {'a':8, 'b':5, 'c':6}, {'a':3, 'b':1, 'c':0}]).toDF().cache()
>>> df
DataFrame[a: bigint, b: bigint, c: bigint]
>>> df.columns
['a', 'b', 'c']
>>> def column_add(a,b):
...     return a.__add__(b)
...
>>> df.withColumn('total', reduce(column_add, ( df[col] for col in df.columns ) )).collect()
[Row(a=1, b=2, c=3, total=6), Row(a=8, b=5, c=6, total=19), Row(a=3, b=1, c=0, total=4)]