且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

如何将 numpy.array 作为新列添加到 pyspark.SQL DataFrame?

更新时间:2023-11-30 23:51:04

假设数据框已排序以匹配数组中值的顺序,您可以按如下方式压缩 RDD 并重建数据框:

Assuming that data frame is sorted to match order of values in an array you can zip RDDs and rebuild data frame as follows:

n = sparkdf.rdd.getNumPartitions()

# Parallelize and cast to plain integer (np.int64 won't work)
new_col = sc.parallelize(np.array([20,20,20,20]), n).map(int) 

def process(pair):
    return dict(pair[0].asDict().items() + [("new_col", pair[1])])

rdd = (sparkdf
    .rdd # Extract RDD
    .zip(new_col) # Zip with new col
    .map(process)) # Add new column

sqlContext.createDataFrame(rdd) # Rebuild data frame

您也可以使用连接:

new_col = sqlContext.createDataFrame(
    zip(range(1, 5), [20] * 4),
    ("rn", "new_col"))

sparkdf.registerTempTable("df")

sparkdf_indexed = sqlContext.sql(
    # Make sure we have specific order and add row number
    "SELECT row_number() OVER (ORDER BY a, b, c) AS rn, * FROM df")

(sparkdf_indexed
    .join(new_col, new_col.rn == sparkdf_indexed.rn)
    .drop(new_col.rn))

但窗口函数组件不可扩展,应避免用于较大的数据集.

but window function component is not scalable and should be avoided with larger datasets.

当然,如果您只需要一列单个值,您可以简单地使用 lit

Of course if all you need is a column of a single value you can simply use lit

import pyspark.sql.functions as f
sparkdf.withColumn("new_col", f.lit(20))

但我认为事实并非如此.

but I assume it is not the case.