且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

如何使用 Spark Scala 加入 3 个 RDD

更新时间:2023-11-18 22:43:40

像这样使用 RDD 的 JOIN 很痛苦,这也是为什么 DF 更好的另一个原因.

JOINs like this with RDDs are painful, that's another reason why DFs are nicer.

你没有得到数据,因为 RDD = K 对,V 没有最后一个 RDD 的 K 部分的公共数据.带有 101、102 的 K 将加入,但与 901、902 没有共性.你需要改变一些东西,像这样,我的更有限的例子:

You get no data as the pair RDD = K, V has no common data for the K part of the last RDD. The K's with 101, 102 will join, but there is no commonality with the 901, 902. You need to shift things around, like this, my more limited example:

val rdd1 = sc.parallelize(Seq(
           (101,("James","Stewart","M")),
           (102,("Deborah","Kerr","F")),
           (103,("Peter","OToole","M")),
           (104,("Robert","De Niro","M")) 
           ))

val rdd2 = sc.parallelize(Seq(
           (101,(901,"John Scottie Ferguson")),
           (102,(902,"Miss Giddens")),
           (103,(903,"T.E. Lawrence")),
           (104,(904,"Michael"))
           ))

val rdd3 = sc.parallelize(Seq(
          (901,("Vertigo",1958 )),
          (902,("The Innocents",1961)) 
          ))

val rdd4 = rdd1.join(rdd2)

val new_rdd4 = rdd4.keyBy(x => x._2._2._1)  // Redefine Key for join with rdd3
val rdd5 = rdd3.join(new_rdd4)
rdd5.collect

返回:

res14: Array[(Int, ((String, Int), (Int, ((String, String, String), (Int, String)))))] = Array((901,((Vertigo,1958),(101,((James,Stewart,M),(901,John Scottie Ferguson))))), (902,((The Innocents,1961),(102,((Deborah,Kerr,F),(902,Miss Giddens))))))

你需要通过地图剥离数据,我把它留给你.默认情况下内部联接.

You will need to strip out the data via a map, I leave that to you. INNER join per default.