且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

如何使用Spark Scala加入3个RDD

更新时间:2023-11-18 22:13:04

像这样的带有RDD的联接很痛苦,这也是DF更好的另一个原因.

JOINs like this with RDDs are painful, that's another reason why DFs are nicer.

由于对RDD = K,您没有任何数据,对于最后一个RDD的K部分,V没有公共数据.带有101、102的K将加入,但与901、902没有共同点.您需要转移一些东西,像这样,我的例子比较有限:

You get no data as the pair RDD = K, V has no common data for the K part of the last RDD. The K's with 101, 102 will join, but there is no commonality with the 901, 902. You need to shift things around, like this, my more limited example:

val rdd1 = sc.parallelize(Seq(
           (101,("James","Stewart","M")),
           (102,("Deborah","Kerr","F")),
           (103,("Peter","OToole","M")),
           (104,("Robert","De Niro","M")) 
           ))

val rdd2 = sc.parallelize(Seq(
           (101,(901,"John Scottie Ferguson")),
           (102,(902,"Miss Giddens")),
           (103,(903,"T.E. Lawrence")),
           (104,(904,"Michael"))
           ))

val rdd3 = sc.parallelize(Seq(
          (901,("Vertigo",1958 )),
          (902,("The Innocents",1961)) 
          ))

val rdd4 = rdd1.join(rdd2)

val new_rdd4 = rdd4.keyBy(x => x._2._2._1)  // Redefine Key for join with rdd3
val rdd5 = rdd3.join(new_rdd4)
rdd5.collect

返回:

res14: Array[(Int, ((String, Int), (Int, ((String, String, String), (Int, String)))))] = Array((901,((Vertigo,1958),(101,((James,Stewart,M),(901,John Scottie Ferguson))))), (902,((The Innocents,1961),(102,((Deborah,Kerr,F),(902,Miss Giddens))))))

您将需要通过地图剥离数据,我留给您.默认情况下为INNER加入.

You will need to strip out the data via a map, I leave that to you. INNER join per default.