更新时间:2022-12-10 15:02:54
首先将日期转换为DateType
:
import pyspark.sql.functions as F
df_with_date = df.withColumn(
"date",
F.to_date("date", "dd/MM/yyyy")
# For Spark < 2.2
# F.unix_timestamp("date", "dd/MM/yyyy").cast("timestamp").cast("date")
)
下一个 groupBy
用户和城市,但像这样扩展聚合:
Next groupBy
user and city but extend aggregation like this:
df_agg = (df_with_date
.groupBy("name", "city")
.agg(F.count("city").alias("count"), F.max("date").alias("max_date")))
定义一个窗口:
from pyspark.sql.window import Window
w = Window().partitionBy("name").orderBy(F.desc("count"), F.desc("max_date"))
添加排名:
df_with_rank = (df_agg
.withColumn("rank", F.dense_rank().over(w)))
和过滤器:
result = df_with_rank.where(F.col("rank") == 1)
您可以使用如下代码检测剩余的重复项:
You can detect remaining duplicates using code like this:
import sys
final_w = Window().partitionBy("name").rowsBetween(-sys.maxsize, sys.maxsize)
result.withColumn("tie", F.count("*").over(final_w) != 1)