更新时间:2023-11-18 22:48:04
有了这样的数据:
rdd = sc.parallelize([
['usr1',('itm1',2),('itm3',3)], ['usr2',('itm2',3), ('itm3',5),('itm22',6)]
])
压平记录:
def to_record(kvs):
user, *vs = kvs # For Python 2.x use standard indexing / splicing
for item, value in vs:
yield user, item, value
records = rdd.flatMap(to_record)
转换为DataFrame
:
df = records.toDF(["user", "item", "value"])
枢轴:
result = df.groupBy("item").pivot("user").sum()
result.show()
## +-----+----+----+
## | item|usr1|usr2|
## +-----+----+----+
## | itm1| 2|null|
## | itm2|null| 3|
## | itm3| 3| 5|
## |itm22|null| 6|
## +-----+----+----+
注意:Spark DataFrames
旨在处理较长且相对较薄的数据.如果您想生成宽列联表,DataFrames
将没有用,特别是如果数据很密集并且您希望每个特征保留单独的列.
Note: Spark DataFrames
are designed to handle long and relatively thin data. If you want to generate wide contingency table, DataFrames
won't be useful, especially if data is dense and you want to keep separate column per feature.