且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

如何在 Spark SQL 中压缩两个数组列

更新时间:2023-01-29 15:56:28

与 Python 等效的 Spark SQL 将是 pyspark.sql.functions.arrays_zip:

A Spark SQL equivalent of Python's would be pyspark.sql.functions.arrays_zip:

pyspark.sql.functions.arrays_zip(*cols)

集合函数:返回一个合并的结构数组,其中第 N 个结构包含输入数组的所有第 N 个值.

Collection function: Returns a merged array of structs in which the N-th struct contains all N-th values of input arrays.

所以如果你已经有两个数组:

So if you already have two arrays:

from pyspark.sql.functions import split

df = (spark
    .createDataFrame([('abc, def, ghi', '1.0, 2.0, 3.0')])
    .toDF("column_1", "column_2")
    .withColumn("column_1", split("column_1", "\s*,\s*"))
    .withColumn("column_2", split("column_2", "\s*,\s*")))

您可以将其应用于结果

from pyspark.sql.functions import arrays_zip

df_zipped = df.withColumn(
  "zipped", arrays_zip("column_1", "column_2")
)

df_zipped.select("zipped").show(truncate=False)

+------------------------------------+
|zipped                              |
+------------------------------------+
|[[abc, 1.0], [def, 2.0], [ghi, 3.0]]|
+------------------------------------+

现在结合结果,您可以transform(如何使用变换高阶函数?a>, TypeError: Column is not iterable - How to iterate over ArrayType()?):

Now to combine the results you can transform (How to use transform higher-order function?, TypeError: Column is not iterable - How to iterate over ArrayType()?):

df_zipped_concat = df_zipped.withColumn(
    "zipped_concat",
     expr("transform(zipped, x -> concat_ws('_', x.column_1, x.column_2))")
) 

df_zipped_concat.select("zipped_concat").show(truncate=False)

+---------------------------+
|zipped_concat              |
+---------------------------+
|[abc_1.0, def_2.0, ghi_3.0]|
+---------------------------+

注意:

高阶函数 transformarrays_zip 已在 Apache Spark 2.4 中引入.

Higher order functions transform and arrays_zip has been introduced in Apache Spark 2.4.