更新时间:2023-11-18 22:25:58
不确定这是否是更好的解决方法,但是我做了一个变通方法:
Not sure if it's the better way to solve, but I did a workaround:
import pyspark.sql.functions as f
df = df.withColumn('match', f.collect_set('id').over(Window.partitionBy('group')))
df = df.select(f.col('id').alias('src'),
f.explode('match').alias('dst'),
f.col('group'))
df = df.withColumn('duplicate_edges', f.array_sort(f.array('src', 'dst')))
df = (df
.where(f.col('src') != f.col('dst'))
.drop_duplicates(subset=['duplicate_edges'])
.drop('duplicate_edges'))
df.sort('group', 'src', 'dst').show()
输出
+---+---+-----+
|src|dst|group|
+---+---+-----+
| a| c| 1|
| a| f| 1|
| c| f| 1|
| b| d| 2|
| e| a| 3|
+---+---+-----+
尝试一下:
import pyspark.sql.functions as f
df = (df
.groupby('group')
.agg(f.first('id').alias('src'),
f.last('id').alias('dst')))
df.show()
输出:
+-----+---+---+
|group|src|dst|
+-----+---+---+
| 1| a| c|
| 3| e| a|
| 2| b| d|
+-----+---+---+