更新时间:2023-01-29 15:56:28
与 Python 等效的 Spark SQL 将是 pyspark.sql.functions.arrays_zip
:
A Spark SQL equivalent of Python's would be pyspark.sql.functions.arrays_zip
:
pyspark.sql.functions.arrays_zip(*cols)
集合函数:返回一个合并的结构数组,其中第 N 个结构包含输入数组的所有第 N 个值.
Collection function: Returns a merged array of structs in which the N-th struct contains all N-th values of input arrays.
所以如果你已经有两个数组:
So if you already have two arrays:
from pyspark.sql.functions import split
df = (spark
.createDataFrame([('abc, def, ghi', '1.0, 2.0, 3.0')])
.toDF("column_1", "column_2")
.withColumn("column_1", split("column_1", "\s*,\s*"))
.withColumn("column_2", split("column_2", "\s*,\s*")))
您可以将其应用于结果
from pyspark.sql.functions import arrays_zip
df_zipped = df.withColumn(
"zipped", arrays_zip("column_1", "column_2")
)
df_zipped.select("zipped").show(truncate=False)
+------------------------------------+
|zipped |
+------------------------------------+
|[[abc, 1.0], [def, 2.0], [ghi, 3.0]]|
+------------------------------------+
现在结合结果,您可以transform
(如何使用变换高阶函数?a>, TypeError: Column is not iterable - How to iterate over ArrayType()?):
Now to combine the results you can transform
(How to use transform higher-order function?, TypeError: Column is not iterable - How to iterate over ArrayType()?):
df_zipped_concat = df_zipped.withColumn(
"zipped_concat",
expr("transform(zipped, x -> concat_ws('_', x.column_1, x.column_2))")
)
df_zipped_concat.select("zipped_concat").show(truncate=False)
+---------------------------+
|zipped_concat |
+---------------------------+
|[abc_1.0, def_2.0, ghi_3.0]|
+---------------------------+
注意:
高阶函数 transform
和 arrays_zip
已在 Apache Spark 2.4 中引入.
Higher order functions transform
and arrays_zip
has been introduced in Apache Spark 2.4.