且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

如何从Pyspark中的spark数据帧创建边缘列表?

更新时间:2023-11-18 22:25:58

编辑1

不确定这是否是更好的解决方法,但是我做了一个变通方法:

Edit 1

Not sure if it's the better way to solve, but I did a workaround:

import pyspark.sql.functions as f

df = df.withColumn('match', f.collect_set('id').over(Window.partitionBy('group')))

df = df.select(f.col('id').alias('src'),
               f.explode('match').alias('dst'),
               f.col('group'))

df = df.withColumn('duplicate_edges', f.array_sort(f.array('src', 'dst')))
df = (df
      .where(f.col('src') != f.col('dst'))
      .drop_duplicates(subset=['duplicate_edges'])
      .drop('duplicate_edges'))

df.sort('group', 'src', 'dst').show()

输出

+---+---+-----+
|src|dst|group|
+---+---+-----+
|  a|  c|    1|
|  a|  f|    1|
|  c|  f|    1|
|  b|  d|    2|
|  e|  a|    3|
+---+---+-----+

原始答案

尝试一下:

import pyspark.sql.functions as f

df = (df
      .groupby('group')
      .agg(f.first('id').alias('src'),
           f.last('id').alias('dst')))

df.show()

输出:

+-----+---+---+
|group|src|dst|
+-----+---+---+
|    1|  a|  c|
|    3|  e|  a|
|    2|  b|  d|
+-----+---+---+