更新时间:2023-11-18 23:31:22
我们需要定义两个Window
来达到您的预期输出.一个用于检查DEPART
列中的更改,第二个用于检查COL1
与COL3
之和的差异.
We need to define two Window
's to arrive at your expected output. One for checking the change in the DEPART
column, the second for checking the difference in the sum of COL1
to COL3
.
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
val w_col = Window.partitionBy("KEY", "COL1", "COL2", "COL3").orderBy("LAYER_NO")
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
val w_key = Window.partitionBy("KEY").orderBy("LAYER_NO")
然后,我们只需用正确的值替换DEPART
列中的值,然后将数据过滤到滞后总和不同于当前列总和的行(以及LAYER_NO === 0
中的行).最后,我们将LAYER_NO
替换为等级.
Then we simply replace the values in DEPART
column by the correct values, and filter the data to rows where the lagged sum differs from the current sum of columns (and rows where LAYER_NO === 0
). Lastly, we replace LAYER_NO
by rank.
inputDF.withColumn("DEPART", last("DEPART").over(w_col))
.withColumn("row_sum",($"COL1" + $"COL2" + $"COL3"))
.withColumn("lag_sum", lag($"row_sum",1).over(w_key))
.filter($"LAYER_NO" === 0 || not($"row_sum" === $"lag_sum"))
.withColumn("LAYER_NO", rank.over(w_key)-1)
.drop("row_sum", "lag_sum").show()
+-----+--------+----+----+----+------+
| KEY|LAYER_NO|COl1|COl2|COl3|DEPART|
+-----+--------+----+----+----+------+
|key_1| 0| 200| 300| 400| abc|
|key_1| 1| 200| 300| 600| xyz|
|key_2| 0| 500| 700| 900| prq|
|key_2| 1| 888| 555| 900| tep|
|key_3| 0| 111| 222| 333| lgh|
|key_3| 1| 084| 222| 333| rrr|
+-----+--------+----+----+----+------+