且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

将 List 元素作为列添加到现有 pyspark 数据框

更新时间:2022-12-12 09:28:11

不太确定它是否必须是这样的,或者您是否期待其他的东西.如果您的列表项和数据框行数必须相同,那么这里有一个简单的方法.

对于具有三列的给定示例数据框:

 l = [(1,'DEF',33),(2,'KLM',22),(3,'ABC',32),(4,'XYZ',77)]df=spark.createDataFrame(l, ['id', 'value','age'])

让我们说这是一个列表:

lists=[5,6,7,8]

可以从这个列表创建一个 rdd 并使用带有数据框的 zip 函数并在其上使用 map 函数.

listrdd = sc.parallelize(lists)newdf=df.rdd.zip(listrdd).map(lambda (x,y) : ([x for x in x] + [y])).toDF(["id", "Value",",age","List_element"])>>>ziprdd=df.rdd.zip(listrdd)>>>ziprdd.take(50)[(Row(id=1, value=u'DEF', age=33), 5), (Row(id=2, value=u'KLM', age=22), 6), (Row(id=3, value=u'ABC', age=32), 7), (Row(id=4, value=u'XYZ', age=77), 8)]

作为 zip 函数返回键值对,第一个元素包含来自第一个 rdd 的数据,第二个元素包含来自第二个 rdd 的数据.我对第一个元素使用列表理解并将其与第二个元素连接.

它是动态的,可以用于 n 列,但列表元素和数据框行必须相同.

>>>newdf.show()]+---+-----+----+------------+|id|值|,年龄|List_element|+---+-----+----+------------+|1|防御|33|5||2|荷航|22|6||3|ABC|32|7||4|XYZ|77|8|+---+-----+----+------------+

注意:使用 zip 方法时两个 rdd 分区计数必须相同,否则会出现错误

ValueError: 只能使用具有相同分区数的 RDD 进行压缩

I have a list lists=[0,1,2,3,5,6,7]. Order is not sequential. I have a pyspark dataframe with 9 columns.

+-------------------+--------+--------+--------+--------+--------+--------+---------------+-----+----+
|               date|ftt (°c)|rtt (°c)|fbt (°c)|rbt (°c)|fmt (°c)|rmt (°c)|fmhhumidityunit|index|Diff|
+-------------------+--------+--------+--------+--------+--------+--------+---------------+-----+----+
|2019-02-01 05:29:47|     NaN|     NaN|     NaN|     NaN|     NaN|     NaN|            NaN|    0| NaN|
|2019-02-01 05:29:17|     NaN|     NaN|     NaN|     NaN|     NaN|    NaN|           NaN|    1| NaN |

I need to add my lists as a column to my existing dataframe. My lists is not in order so iam not able to use udf. Is there a way to do it?.Please help me I want it to be like this

+-------------------+--------+--------+--------+--------+--------+--------+---------------+-----+----+------+
|               date|ftt (°c)|rtt (°c)|fbt (°c)|rbt (°c)|fmt (°c)|rmt (°c)|fmhhumidityunit|index|Diff|lists |
+-------------------+--------+--------+--------+--------+--------+--------+---------------+-----+----+-------+
|2019-02-01 05:29:47|     NaN|     NaN|     NaN|     NaN|     NaN|     NaN|            NaN|    0| NaN|0     |
|2019-02-01 05:29:17|     NaN|     NaN|     NaN|     NaN|     NaN|     NaN|           NaN|    1| NaN |1     |

Not too sure if it has to be something like this or were you expecting something else. If your number of list items and dataframe rows has to be same then here's a simple approach.

For a given sample dataframe with three columns:

 l = [(1,'DEF',33),(2,'KLM',22),(3,'ABC',32),(4,'XYZ',77)]
 df=spark.createDataFrame(l, ['id', 'value','age'])

Lets say here's a list:

lists=[5,6,7,8]

Can create a rdd from this list and use a zip function with the dataframe and use map function over it.

listrdd = sc.parallelize(lists)

newdf=df.rdd.zip(listrdd).map(lambda (x,y ) : ([x for x in x] + [y])).toDF(["id", "Value",",age","List_element"])

>>> ziprdd=df.rdd.zip(listrdd)
>>> ziprdd.take(50)
[(Row(id=1, value=u'DEF', age=33), 5), (Row(id=2, value=u'KLM', age=22), 6), (Row(id=3, value=u'ABC', age=32), 7), (Row(id=4, value=u'XYZ', age=77), 8)]

As zip function return key value pairs having first element contains data from first rdd and second element contains data from second rdd. I am using list comprehension for first element and concatenating it with second element.

It's dynamic and can work for n number of columns but list elements and dataframe rows has to be same.

>>> newdf.show()
]+---+-----+----+------------+
| id|Value|,age|List_element|
+---+-----+----+------------+
|  1|  DEF|  33|           5|
|  2|  KLM|  22|           6|
|  3|  ABC|  32|           7|
|  4|  XYZ|  77|           8|
+---+-----+----+------------+

Note: Both rdd partition count has to be same for using zip method else you will get an error

ValueError: Can only zip with RDD which has the same number of partitions