且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

PySpark groupby 和最大值选择

更新时间:2022-12-10 15:02:54

首先将日期转换为DateType:

import pyspark.sql.functions as F

df_with_date = df.withColumn(
    "date",
    F.to_date("date", "dd/MM/yyyy")
    # For Spark < 2.2
    # F.unix_timestamp("date", "dd/MM/yyyy").cast("timestamp").cast("date")
)

下一个 groupBy 用户和城市,但像这样扩展聚合:

Next groupBy user and city but extend aggregation like this:

df_agg = (df_with_date
    .groupBy("name", "city")
    .agg(F.count("city").alias("count"), F.max("date").alias("max_date")))

定义一个窗口:

from pyspark.sql.window import Window

w = Window().partitionBy("name").orderBy(F.desc("count"), F.desc("max_date"))

添加排名:

df_with_rank = (df_agg
    .withColumn("rank", F.dense_rank().over(w)))

和过滤器:

result = df_with_rank.where(F.col("rank") == 1)

您可以使用如下代码检测剩余的重复项:

You can detect remaining duplicates using code like this:

import sys

final_w = Window().partitionBy("name").rowsBetween(-sys.maxsize, sys.maxsize)
result.withColumn("tie", F.count("*").over(final_w) != 1)