更新时间:2023-11-18 22:25:52
实现的问题是,您会触发 words
中的每个单词,从而完全遍历 RDD
然后收集元素.解决问题的一种方法是将单词序列与您的 RDD
:
The problem with your implementation is that you trigger for each word in words
a complete traversal of your RDD
and then collect the elements. One way to solve your problem is to join the sequence of words with your RDD
:
case class E(word: String, value: Int)
object App {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("Test").setMaster("local[4]")
val sc = new SparkContext(sparkConf)
val entries = sc.parallelize(List(E("a", 1), E("b", 2), E("c", 3), E("c", 3)))
val words = Seq("a", "a", "c")
val wordsRDD = sc.parallelize(words).map(x => (x, x))
val matchingEntries = entries
.map(x => (x.word, x))
.join(wordsRDD)
.map{
case (_, (entry, _)) => entry
}
.collect
println(matchingEntries.mkString("\n"))
}
}
输出为
E(a,1)
E(a,1)
E(c,3)
E(c,3)