且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

如何在pyspark中创建数据帧的副本?

更新时间:2023-11-17 14:59:22

正如在另一个问题的回答中所解释的,您可以制作初始架构的深层副本.然后我们可以修改该副本并使用它来初始化新的 DataFrame _X:

import pyspark.sql.functions as F从 pyspark.sql.types 导入 LongType导入副本X = spark.createDataFrame([[1,2], [3,4]], ['a', 'b'])_schema = copy.deepcopy(X.schema)_schema.add('id_col', LongType(), False) # 就地修改_X = X.rdd.zipWithIndex().map(lambda l: list(l[0]) + [l[1]]).toDF(_schema)

现在让我们检查一下:

print('X 的架构:' + str(X.schema))print('_X 的架构:' + str(_X.schema))

输出:

X 的Schema:StructType(List(StructField(a,LongType,true),StructField(b,LongType,true)))_X 的模式: StructType(List(StructField(a,LongType,true),StructField(b,LongType,true),StructField(id_col,LongType,false)))

请注意,要复制 DataFrame,您只需使用 _X = X.每当您添加新列时,例如withColumn,对象没有就地改变,而是返回一个新的副本.希望这会有所帮助!

I have a dataframe from which I need to create a new dataframe with a small change in the schema by doing the following operation.

>>> X = spark.createDataFrame([[1,2], [3,4]], ['a', 'b'])
>>> schema_new = X.schema.add('id_col', LongType(), False)
>>> _X = X.rdd.zipWithIndex().map(lambda l: list(l[0]) + [l[1]]).toDF(schema_new)

The problem is that in the above operation, the schema of X gets changed inplace. So when I print X.columns I get

>>> X.columns
['a', 'b', 'id_col']

but the values in X are still the same

>>> X.show()
+---+---+
|  a|  b|
+---+---+
|  1|  2|
|  3|  4|
+---+---+

To avoid changing the schema of X, I tried creating a copy of X using three ways - using copy and deepcopy methods from the copy module - simply using _X = X

The copy methods failed and returned a

RecursionError: maximum recursion depth exceeded

The assignment method also doesn't work

>>> _X = X
>>> id(_X) == id(X)
True

Since their id are the same, creating a duplicate dataframe doesn't really help here and the operations done on _X reflect in X.

So my question really is two fold

  • how to change the schema outplace (that is without making any changes to X)?

  • and more importantly, how to create a duplicate of a pyspark dataframe?

Note:

This question is a followup to this post

As explained in the answer to the other question, you could make a deepcopy of your initial schema. We can then modify that copy and use it to initialize the new DataFrame _X:

import pyspark.sql.functions as F
from pyspark.sql.types import LongType
import copy

X = spark.createDataFrame([[1,2], [3,4]], ['a', 'b'])
_schema = copy.deepcopy(X.schema)
_schema.add('id_col', LongType(), False) # modified inplace
_X = X.rdd.zipWithIndex().map(lambda l: list(l[0]) + [l[1]]).toDF(_schema)

Now let's check:

print('Schema of X: ' + str(X.schema))
print('Schema of _X: ' + str(_X.schema))

Output:

Schema of X: StructType(List(StructField(a,LongType,true),StructField(b,LongType,true)))
Schema of _X: StructType(List(StructField(a,LongType,true),
                  StructField(b,LongType,true),StructField(id_col,LongType,false)))

Note that to copy a DataFrame you can just use _X = X. Whenever you add a new column with e.g. withColumn, the object is not altered in place, but a new copy is returned. Hope this helps!