且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

Scala:如何按行号拆分数据帧?

更新时间:2023-11-18 23:18:16

您可以使用 RDD API 中的 zipWithIndex(遗憾的是在 SparkSQL 中没有等效项)将每一行映射到一个索引,范围介于 0rdd.count - 1.

You could use zipWithIndex from the RDD API (no equivalent in SparkSQL unfortunately) that maps each row to an index, ranging between 0 and rdd.count - 1.

因此,如果您有一个我认为已相应排序的数据框,则您需要按如下方式在两个 API 之间来回切换:

So if you have a dataframe that I assumed to be sorted accordingly, you would need to go back and forth between the two APIs as follows:

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

// creating mock data
val df = spark.range(100).withColumn("test", 'id % 10)

// zipping the data
val partitionSize = 5 // I use 5 but you can use 100000 in your case
val zipped_rdd = df.rdd
    .zipWithIndex.map{ case (row, id) => 
        Row.fromSeq(row.toSeq :+ id / partitionSize ) 
    }

//back to df
val newField = StructField("partition", LongType, false)
val zipped_df = spark
    .createDataFrame(zipped_rdd, df.schema.add(newField))

让我们看一下数据,我们有一个名为 partition 的新列,它对应于您想要拆分数据的方式.

Let's have a look at the data, we have a new column called partition and that corresponds to the way you want to split your data.

zipped_df.show(15) // 5 rows by partition
+---+----+---------+
| id|test|partition|
+---+----+---------+
|  0|   0|        0|
|  1|   1|        0|
|  2|   2|        0|
|  3|   3|        0|
|  4|   4|        0|
|  5|   5|        1|
|  6|   6|        1|
|  7|   7|        1|
|  8|   8|        1|
|  9|   9|        1|
| 10|   0|        2|
| 11|   1|        2|
| 12|   2|        2|
| 13|   3|        2|
| 14|   4|        2|
+---+----+---------+

// using partitionBy to write the data
zipped_df.write
    .partitionBy("partition")
    .csv(".../testPart.csv")