且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

如何计算pyspark中每行某些列的最大值

更新时间:2022-12-10 09:27:53

我认为将值组合到列表中而不是在列表中查找最大值是最简单的方法.

I think combing values to a list and than finding max on it would be the simplest approach.

from pyspark.sql.types import *

schema = StructType([
    StructField("ClientId", IntegerType(), True),
    StructField("m_ant21", IntegerType(), True),
    StructField("m_ant22", IntegerType(), True),
    StructField("m_ant23", IntegerType(), True),
    StructField("m_ant24", IntegerType(), True)
])

df = spark\
    .createDataFrame(
        data=[(0, None, None, None, None),
             (1, 23, 13, 17, 99),
             (2, 0, 0, 0, 1),
             (3, 0, None, 1, 0)],
        schema=schema)

import pyspark.sql.functions as F

def agg_to_list(m21,m22,m23,m24):
    return [m21,m22,m23,m24]

u_agg_to_list = F.udf(agg_to_list, ArrayType(IntegerType()))

df2 = df.withColumn('all_values', u_agg_to_list('m_ant21', 'm_ant22', 'm_ant23', 'm_ant24'))\
        .withColumn('max', F.sort_array("all_values", False)[0])\
        .select('ClientId', 'max')

df2.show()

输出:

+--------+----+
|ClientId|max |
+--------+----+
|0       |null|
|1       |99  |
|2       |1   |
|3       |1   |
+--------+----+