且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

Pyspark复制行基于列值

更新时间:2022-12-09 10:39:27

很遗憾,您不能在列上进行迭代这样.您始终可以使用udf,但是我确实有一个非udf hack 解决方案,如果您使用的是Spark 2.1版或更高版本,则该解决方案应该对您有用.

Unfortunately you can't iterate over a Column like that. You can always use a udf, but I do have a non-udf hack solution that should work for you if you're using Spark version 2.1 or higher.

诀窍是利用 pyspark.sql.functions.posexplode() 以获取索引值.我们通过重复逗号Column B次来创建字符串来实现此目的.然后,我们在逗号上分割此字符串,并使用posexplode获取索引.

The trick is to take advantage of pyspark.sql.functions.posexplode() to get the index value. We do this by creating a string by repeating a comma Column B times. Then we split this string on the comma, and use posexplode to get the index.

df.createOrReplaceTempView("df")  # first register the DataFrame as a temp table

query = 'SELECT '\
    '`Column A`,'\
    '`Column B`,'\
    'pos AS Index '\
    'FROM ( '\
        'SELECT DISTINCT '\
        '`Column A`,'\
        '`Column B`,'\
        'posexplode(split(repeat(",", `Column B`), ",")) '\
        'FROM df) AS a '\
    'WHERE a.pos > 0'
newDF = sqlCtx.sql(query).sort("Column A", "Column B", "Index")
newDF.show()
#+--------+--------+-----+
#|Column A|Column B|Index|
#+--------+--------+-----+
#|      T1|       3|    1|
#|      T1|       3|    2|
#|      T1|       3|    3|
#|      T2|       2|    1|
#|      T2|       2|    2|
#+--------+--------+-----+

注意:您需要将列名称包装在反引号中,因为它们中有空格,如本博文所述:

Note: You need to wrap the column names in backticks since they have spaces in them as explained in this post: How to express a column which name contains spaces in Spark SQL