且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

插入缺失的日期行并在新行中插入旧值 PySpark

更新时间:2022-03-11 07:09:48

您希望向前填充数据集.这变得有点复杂,因为您需要按类别(人)执行此操作.

You’re looking to forward-fill a dataset. This is being made a bit more complex because you need to do it per category (person).

一种方法是这样的:创建一个新的 DataFrame,其中包含您想要为每个人设置值的所有日期(见下文,这只是 dates_by_person).

One way to do it would be like this: create a new DataFrame that has all the dates you want to have a value for, per person (see below, this is just dates_by_person).

然后,将原始 DataFrame 左连接到这个 DataFrame,这样您就可以开始创建缺失的行了.

Then, left-join the original DataFrame to this one, so you start creating the missing rows.

接下来用加窗函数找出每组person中,按日期排序,最后一个非空的权重.如果您每个日期可以有多个条目(因此一个人在一个特定日期填写了多个记录),您还必须按时间戳列进行排序.

Next, use a windowing function to find in each group of person, sorted by the date, the last non-null weight. In case you can have multiple entries per date (so one person has multiple filled in records on one specific date), you must also order by the timestamp column.

最后合并列,以便将任何空字段替换为预期值.

Finally you coalesce the columns, so that any null-field gets replaced by the intended value.

from datetime import datetime, timedelta
from itertools import product

import pyspark.sql.functions as psf
from pyspark.sql import Window

data = (  # recreate the DataFrame
    (1, datetime(2019, 12, 2, 14, 54, 17), 49.94),
    (1, datetime(2019, 12, 3, 8, 58, 39), 50.49),
    (1, datetime(2019, 12, 6, 10, 44, 1), 50.24),
    (2, datetime(2019, 12, 2, 8, 58, 39), 62.32),
    (2, datetime(2019, 12, 4, 10, 44, 1), 65.64))
df = spark.createDataFrame(data, schema=("person", "timestamp", "weight"))

min_max_timestamps = df.agg(psf.min(df.timestamp), psf.max(df.timestamp)).head()
first_date, last_date = [ts.date() for ts in min_max_timestamps]
all_days_in_range = [first_date + timedelta(days=d)
                     for d in range((last_date - first_date).days + 1)]
people = [row.person for row in df.select("person").distinct().collect()]
dates_by_person = spark.createDataFrame(product(people, all_days_in_range),
                                        schema=("person", "date"))

df2 = (dates_by_person.join(df,
                            (psf.to_date(df.timestamp) == dates_by_person.date)
                            & (dates_by_person.person == df.person),
                            how="left")
       .drop(df.person)
       )
wind = (Window
        .partitionBy("person")
        .rangeBetween(Window.unboundedPreceding, -1)
        .orderBy(psf.unix_timestamp("date"))
        )
df3 = df2.withColumn("last_weight",
                     psf.last("weight", ignorenulls=True).over(wind))
df4 = df3.select(
    df3.person,
    psf.coalesce(df3.timestamp, psf.to_timestamp(df3.date)).alias("timestamp"),
    psf.coalesce(df3.weight, df3.last_weight).alias("weight"))
df4.show()
# +------+-------------------+------+
# |person|          timestamp|weight|
# +------+-------------------+------+
# |     1|2019-12-02 14:54:17| 49.94|
# |     1|2019-12-03 08:58:39| 50.49|
# |     1|2019-12-04 00:00:00| 50.49|
# |     1|2019-12-05 00:00:00| 50.49|
# |     1|2019-12-06 10:44:01| 50.24|
# |     2|2019-12-02 08:58:39| 62.32|
# |     2|2019-12-03 00:00:00| 62.32|
# |     2|2019-12-04 10:44:01| 65.64|
# |     2|2019-12-05 00:00:00| 65.64|
# |     2|2019-12-06 00:00:00| 65.64|
# +------+-------------------+------+

正如 David 在评论中所建议的,如果您的人数非常多,dates_by_people 的构建可以通过不需要将所有内容都交给驱动程序的方式来完成.在这个例子中,我们谈论的是少量的整数,没什么大不了的.但如果它变大,请尝试:

as suggested by David in the comments, if you have a very large number of people, the construction of dates_by_people can be done in a way that doesn’t require bringing everything to the driver. In this example, we’re talking about a small number of integers, nothing big. But if it gets big, try:

dates = spark.createDataFrame(((d,) for d in all_days_in_range),
                              schema=("date",))
people = df.select("person").distinct()
dates_by_person = dates.crossJoin(people)