更新时间:2023-11-18 22:52:28
在 PySpark 中处理列映射的另一种方法是通过 dictionary
.字典帮助您使用 key/value
结构将初始数据帧的列映射到最终数据帧的列,如下所示:
Another way for handling column mapping in PySpark is via dictionary
. Dictionaries help you to map the columns of the initial dataframe into the columns of the final dataframe using the the key/value
structure as shown below:
from pyspark.sql.functions import col
df = spark.createDataFrame([
[1, "John", "2019-12-01 10:00:00"],
[2, "Michael", "2019-12-01 11:00:00"],
[2, "Michael", "2019-12-01 11:01:00"],
[3, "Tom", "2019-11-13 20:00:00"],
[3, "Tom", "2019-11-14 00:00:00"],
[4, "Sofy", "2019-10-01 01:00:00"]
], ["A", "B", "C"])
col_map = {"A":"Z", "B":"X", "C":"Y"}
df.select(*[col(k).alias(col_map[k]) for k in col_map]).show()
# +---+-------+-------------------+
# | Z| X| Y|
# +---+-------+-------------------+
# | 1| John|2019-12-01 10:00:00|
# | 2|Michael|2019-12-01 11:00:00|
# | 2|Michael|2019-12-01 11:01:00|
# | 3| Tom|2019-11-13 20:00:00|
# | 3| Tom|2019-11-14 00:00:00|
# | 4| Sofy|2019-10-01 01:00:00|
# +---+-------+-------------------+
这里我们分别将 A、B、C 映射到 Z、X、Y.
Here we map A, B, C into Z, X, Y respectively.
如果你想要一个模块化的解决方案,你也可以把所有东西都放在一个函数中:
And if you want a modular solution you also put everything inside a function:
def transform_cols(mappings, df):
return df.select(*[col(k).alias(mappings[k]) for k in mappings])
或者通过使用猴子补丁来扩展现有的功能DataFrame
类.将下一个代码放在您的 PySpark 代码之上(您也可以创建一个迷你库并在需要时将其包含在您的代码中):
Or even more modular by using monkey patching to extend the existing functionality of the DataFrame
class. Place the next code on top of your PySpark code (you can also create a mini library and include it on your code when needed):
from pyspark.sql import DataFrame
def transform_cols(self, mappings):
return self.select(*[col(k).alias(mappings[k]) for k in mappings])
DataFrame.transform = transform_cols
然后调用它:
df.transform(col_map).show()
PS:这可能是通过创建自己的库并通过 DataFrame 和猴子补丁(熟悉 C# 的扩展方法)公开它们来扩展 DataFrame 功能的便捷方式.