且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

DataFrame到RDD [(String,String)]转换

更新时间:2022-12-12 16:21:54

如果要将行映射到其他RDD元素,则可以使用df.map(row => ...)将数据帧转换为RDD .

You can use df.map(row => ...) to convert the dataframe to a RDD if you want to map a row to a different RDD element.

例如:

val df = Seq(("table1",432),
      ("table2",567),
      ("table3",987),
      ("table1",789)).
      toDF("tablename", "Code").toDF()

    df.show()

    +---------+----+
|tablename|Code|
+---------+----+
|   table1| 432|
|   table2| 567|
|   table3| 987|
|   table1| 789|
+---------+----+

    val rddDf = df.map(r => (r(0), r(1))).rdd // Type:RDD[(Any,Any)]

    OR

    val rdd = df.map(r => (r(0).toString, r(1).toString)).rdd  //Type: RDD[(String,String)]

请参考 https://community.hortonworks.com/questions/106500/error-in-spark-streaming-kafka-integration-structu.html 关于 AnalysisException:具有流源的查询必须使用writeStream.start()

您需要等待使用查询终止查询. awaitTermination() 为了防止在查询处于活动状态时退出该过程.

You need to wait for the termination of the query using query.awaitTermination() To prevent the process from exiting while the query is active.