且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

Spark数据框-窗口功能-滞后插入与插入引线更新输出

更新时间:2023-11-18 23:31:22

我们需要定义两个Window来达到您的预期输出.一个用于检查DEPART列中的更改,第二个用于检查COL1COL3之和的差异.

We need to define two Window's to arrive at your expected output. One for checking the change in the DEPART column, the second for checking the difference in the sum of COL1 to COL3.

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val w_col = Window.partitionBy("KEY", "COL1", "COL2", "COL3").orderBy("LAYER_NO")
                  .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
val w_key = Window.partitionBy("KEY").orderBy("LAYER_NO")

然后,我们只需用正确的值替换DEPART列中的值,然后将数据过滤到滞后总和不同于当前列总和的行(以及LAYER_NO === 0中的行).最后,我们将LAYER_NO替换为等级.

Then we simply replace the values in DEPART column by the correct values, and filter the data to rows where the lagged sum differs from the current sum of columns (and rows where LAYER_NO === 0). Lastly, we replace LAYER_NO by rank.

inputDF.withColumn("DEPART", last("DEPART").over(w_col))
   .withColumn("row_sum",($"COL1" + $"COL2" + $"COL3"))
   .withColumn("lag_sum", lag($"row_sum",1).over(w_key))
   .filter($"LAYER_NO" === 0 || not($"row_sum" === $"lag_sum"))
   .withColumn("LAYER_NO", rank.over(w_key)-1)
   .drop("row_sum", "lag_sum").show()
+-----+--------+----+----+----+------+
|  KEY|LAYER_NO|COl1|COl2|COl3|DEPART|
+-----+--------+----+----+----+------+
|key_1|       0| 200| 300| 400|   abc|
|key_1|       1| 200| 300| 600|   xyz|
|key_2|       0| 500| 700| 900|   prq|
|key_2|       1| 888| 555| 900|   tep|
|key_3|       0| 111| 222| 333|   lgh|
|key_3|       1| 084| 222| 333|   rrr|
+-----+--------+----+----+----+------+