更新时间:2023-02-07 13:49:40
要将 RDD[Array[String]]
转换为 RDD[Row]
需要执行以下操作步骤:
import org.apache.spark.sql.Row
val headerRDD = sc.parallelize(Seq(headerDescs.split(","))).map(x=>Row(x(0),x(1),x(2))))标度>val headerSchema = StructType(headerDescs.split(",").map(fieldName => StructField(fieldName, StringType, true)))headerSchema: org.apache.spark.sql.types.StructType = StructType(StructField(Name,StringType,true), StructField(Age,StringType,true), StructField(Location,StringType,true))标度>val headerRDD = sc.parallelize(Seq(headerDescs.split(","))).map(x=>Row(x(0),x(1),x(2)))headerRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[6] at map at :34标度>val headerDf = sqlContext.createDataFrame(headerRDD, headerSchema)headerDf:org.apache.spark.sql.DataFrame = [名称:字符串,年龄:字符串,位置:字符串]标度>headerDf.printSchema根|-- 名称:字符串(可为空 = 真)|-- 年龄:字符串(可为空 = 真)|-- 位置:字符串(可为空 = 真)标度>头文件显示+----+---+--------+|姓名|年龄|地点|+----+---+--------+|姓名|年龄|地点|+----+---+--------+
这会给你一个 RDD[Row]
用于读取文件
val vRDD = sc.textFile("..**filepath**.").map(_.split(",")).map(a => Row.fromSeq(一种))val headerDf = sqlContext.createDataFrame(vRDD, headerSchema)
使用 Spark-CSV 包:
val df = sqlContext.read.format(com.databricks.spark.csv").option("header", "true")//使用所有文件的第一行作为标题.schema(headerSchema)//基于自定义模式定义.load(cars.csv")
或
val df = sqlContext.read.format(com.databricks.spark.csv").option("header", "true")//使用所有文件的第一行作为标题.option("inferSchema", "true")//自动推断数据类型.load(cars.csv")
您还可以在其文档中探索各种选项.
I would like to dynamically generate a dataframe containing a header record for a report, so creating a dataframe from the value of the string below:
val headerDescs : String = "Name,Age,Location"
val headerSchema = StructType(headerDescs.split(",").map(fieldName => StructField(fieldName, StringType, true)))
However now I want to do the same for the data (which is in effect the same data i.e. the metadata).
I create an RDD :
val headerRDD = sc.parallelize(headerDescs.split(","))
I then intended to use createDataFrame to create it:
val headerDf = sqlContext.createDataFrame(headerRDD, headerSchema)
however that fails because createDataframe
is expecting a RDD[Row]
, however my RDD is an array of strings - I can't find a way of converting my RDD to a Row RDD and then mapping the fields dynamically. Examples I've seen assume you know the number of columns beforehand, however I want the ability eventually to be able to change the columns without changing the code - having the columns in a file for example.
Code excerpt based on first answer:
val headerDescs : String = "Name,Age,Location"
// create the schema from a string, splitting by delimiter
val headerSchema = StructType(headerDescs.split(",").map(fieldName => StructField(fieldName, StringType, true)))
// create a row from a string, splitting by delimiter
val headerRDDRows = sc.parallelize(headerDescs.split(",")).map( a => Row(a))
val headerDf = sqlContext.createDataFrame(headerRDDRows, headerSchema)
headerDf.show()
Executing this Results in:
+--------+---+--------+
| Name|Age|Location|
+--------+---+--------+
| Name|
| Age|
|Location|
+--------+---+-------
For converting RDD[Array[String]]
to RDD[Row]
you need to do following steps:
import org.apache.spark.sql.Row
val headerRDD = sc.parallelize(Seq(headerDescs.split(","))).map(x=>Row(x(0),x(1),x(2)))
scala> val headerSchema = StructType(headerDescs.split(",").map(fieldName => StructField(fieldName, StringType, true)))
headerSchema: org.apache.spark.sql.types.StructType = StructType(StructField(Name,StringType,true), StructField(Age,StringType,true), StructField(Location,StringType,true))
scala> val headerRDD = sc.parallelize(Seq(headerDescs.split(","))).map(x=>Row(x(0),x(1),x(2)))
headerRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[6] at map at <console>:34
scala> val headerDf = sqlContext.createDataFrame(headerRDD, headerSchema)
headerDf: org.apache.spark.sql.DataFrame = [Name: string, Age: string, Location: string]
scala> headerDf.printSchema
root
|-- Name: string (nullable = true)
|-- Age: string (nullable = true)
|-- Location: string (nullable = true)
scala> headerDf.show
+----+---+--------+
|Name|Age|Location|
+----+---+--------+
|Name|Age|Location|
+----+---+--------+
This would give you a RDD[Row]
For reading through file
val vRDD = sc.textFile("..**filepath**.").map(_.split(",")).map(a => Row.fromSeq(a))
val headerDf = sqlContext.createDataFrame(vRDD , headerSchema)
Using Spark-CSV package :
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.schema(headerSchema) // defining based on the custom schema
.load("cars.csv")
OR
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true") // Use first line of all files as header
.option("inferSchema", "true") // Automatically infer data types
.load("cars.csv")
There are are various options also which you can explore in its documentation.