且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

PySpark - 将上一行和下一行附加到当前行

更新时间:2022-12-16 10:46:40

您可以使用 pyspark.sql.functions.lead()pyspark.sql.functions.lag() 但首先您需要一种对行进行排序的方法.如果您还没有确定顺序的列,则可以使用 pyspark.sql.functions.monotonically_increasing_id()

然后将其与 Window 函数结合使用.

Then use this in conjunction with a Window function.

例如,如果您有以下 DataFrame df:

For example, if you had the following DataFrame df:

df.show()
#+---+---+---+---+
#|  a|  b|  c|  d|
#+---+---+---+---+
#|  1|  0|  1|  0|
#|  0|  0|  1|  1|
#|  0|  1|  0|  1|
#+---+---+---+---+

你可以这样做:

from pyspark.sql import Window
import pyspark.sql.functions as f

cols = df.columns
df = df.withColumn("id", f.monotonically_increasing_id())
df.select(
    "*", 
    *([f.lag(f.col(c),default=0).over(Window.orderBy("id")).alias("prev_"+c) for c in cols] + 
      [f.lead(f.col(c),default=0).over(Window.orderBy("id")).alias("next_"+c) for c in cols])
).drop("id").show()
#+---+---+---+---+------+------+------+------+------+------+------+------+
#|  a|  b|  c|  d|prev_a|prev_b|prev_c|prev_d|next_a|next_b|next_c|next_d|
#+---+---+---+---+------+------+------+------+------+------+------+------+
#|  1|  0|  1|  0|     0|     0|     0|     0|     0|     0|     1|     1|
#|  0|  0|  1|  1|     1|     0|     1|     0|     0|     1|     0|     1|
#|  0|  1|  0|  1|     0|     0|     1|     1|     0|     0|     0|     0|
#+---+---+---+---+------+------+------+------+------+------+------+------+