且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

如何使用PySpark从2个数据框中的列获取乘积和

更新时间:2022-12-09 08:21:07

这是一个从示例数据帧派生的简单示例.我认为它也应该可扩展到您的真实数据.

Here's a simpler example derived from your sample dataframes. I think it should also be scalable to your real data.

df1.show()
+-------+------+----+-----+
|StoreId|ItemID|Date|Price|
+-------+------+----+-----+
| HH-101| item1| d_1|   €9|
| HH-101| item1| d_2|   €7|
+-------+------+----+-----+

df2.show()
+-------+------+---+---+
|StoreId|ItemID|d_1|d_2|
+-------+------+---+---+
| HH-101| item1|  2|  4|
| HH-101| item2|  1|  0|
+-------+------+---+---+

您可以使用 stack 取消透视 df2 的操作,该查询字符串是根据列名的列表理解生成的查询字符串,然后使用前三列,按商店ID和商品ID分组,并获得价格*号的总和.

You can unpivot df2 using stack with a query string generated from a list comprehension of the column names, then join to df1 using the first 3 columns, group by the store id and item id, and get the sum of price * number.

result = df2.selectExpr(
    'StoreId', 'ItemID',
    'stack(2, ' + ', '.join(["'%s', %s" % (c, c) for c in df2.columns[2:]]) + ') as (Date, Number)'
    # "stack(2, 'd_1', d_1, 'd_2', d_2) as (Date, Number)"
).join(
    df1, df1.columns[:3]
).groupBy(
    'StoreId', 'ItemID'
).agg(
    F.expr('sum(Number * float(substr(Price, 2))) as Total')
)

result.show()
+-------+------+-----+
|StoreId|ItemID|Total|
+-------+------+-----+
| HH-101| item1| 46.0|
+-------+------+-----+