且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

如何在pyspark中的给定df中创建行并增加它

更新时间:2022-06-23 06:03:43

IIUC,你可以试试以下:

IIUC, you can try the following:

第 1 步: 创建一个新数据框,其中一行将 current_date() 作为日期,col1 和 col2 为空值,然后将其联合回 TEST_df(注意: 在最终代码中将所有 2020-08-26 更改为 current_date()):

Step-1: create a new dataframe with a single row having current_date() as date, nulls for col1 and col2 and then union it back to the TEST_df (Note: change all 2020-08-26 to current_date() in your final code):

df_new = TEST_df.union(spark.sql("select '2020-08-26', null, null")) 

实际上,数据是分区的,每个分区应该添加一行,您可以执行以下操作:

Practically, data are partitioned and each partition should add one row, you can do something like the following:

from pyspark.sql.functions import current_date, col, lit

#columns used for Window partitionBy
cols_part = ['pcol1', 'pcol2']

df_today = TEST_df.select([
    (current_date() if c == 'date' else col(c) if c in cols_part else lit(None)).alias(c)
        for c in TEST_df.columns
]).distinct()

df_new = TEST_df.union(df_today)

第 2 步: 进行计算以填充上述空值:

Step-2: do calculations to fill the above null values:

df_new.selectExpr(
  "date", 
  "IF(date < '2020-08-26', col1, lag(IF(col1>0, col1+col2,0)) over(order by date)) as col1",
  "lag(col2,1,0) over(order by date) as col2"
).show()
+----------+----+----+
|      date|col1|col2|
+----------+----+----+
|2020-08-17|   0|   0|
|2020-08-18|   2|   0|
|2020-08-19|   0|   1|
|2020-08-20|   3|   2|
|2020-08-21|   4|   0|
|2020-08-22|   1|   2|
|2020-08-23|   2|   3|
|2020-08-24|   1|   2|
|2020-08-25|   3|   2|
|2020-08-26|   4|   1|
+----------+----+----+