更新时间:2022-06-23 06:03:43
IIUC,你可以试试以下:
IIUC, you can try the following:
第 1 步: 创建一个新数据框,其中一行将 current_date() 作为日期,col1 和 col2 为空值,然后将其联合回 TEST_df(注意: 在最终代码中将所有 2020-08-26 更改为 current_date()
):
Step-1: create a new dataframe with a single row having current_date() as date, nulls for col1 and col2 and then union it back to the TEST_df (Note: change all 2020-08-26 to current_date()
in your final code):
df_new = TEST_df.union(spark.sql("select '2020-08-26', null, null"))
实际上,数据是分区的,每个分区应该添加一行,您可以执行以下操作:
Practically, data are partitioned and each partition should add one row, you can do something like the following:
from pyspark.sql.functions import current_date, col, lit
#columns used for Window partitionBy
cols_part = ['pcol1', 'pcol2']
df_today = TEST_df.select([
(current_date() if c == 'date' else col(c) if c in cols_part else lit(None)).alias(c)
for c in TEST_df.columns
]).distinct()
df_new = TEST_df.union(df_today)
第 2 步: 进行计算以填充上述空值:
Step-2: do calculations to fill the above null values:
df_new.selectExpr(
"date",
"IF(date < '2020-08-26', col1, lag(IF(col1>0, col1+col2,0)) over(order by date)) as col1",
"lag(col2,1,0) over(order by date) as col2"
).show()
+----------+----+----+
| date|col1|col2|
+----------+----+----+
|2020-08-17| 0| 0|
|2020-08-18| 2| 0|
|2020-08-19| 0| 1|
|2020-08-20| 3| 2|
|2020-08-21| 4| 0|
|2020-08-22| 1| 2|
|2020-08-23| 2| 3|
|2020-08-24| 1| 2|
|2020-08-25| 3| 2|
|2020-08-26| 4| 1|
+----------+----+----+