且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

在 Apache Spark 中,如何使 RDD/DataFrame 操作变得懒惰?

更新时间:2023-11-18 22:17:22

我不太清楚您尝试实现的目标,但 Scala 本身至少提供了一些您可能会觉得有用的工具:

It is not exactly clear to me what you try to achieve but Scala itself provides at least few tools which you may find useful:

  • 懒惰的vals:

  • lazy vals:

val rdd = sc.range(0, 10000)

lazy val count = rdd.count  // Nothing is executed here
// count: Long = <lazy>

count  // count is evaluated only when it is actually used 
// Long = 10000   

  • call-by-name(在函数定义中用=>表示):

    def  foo(first: => Long, second: => Long, takeFirst: Boolean): Long =
      if (takeFirst) first else second
    
    val rdd1 = sc.range(0, 10000)
    val rdd2 = sc.range(0, 10000)
    
    foo(
      { println("first"); rdd1.count },
      { println("second"); rdd2.count },
      true  // Only first will be evaluated
    )
    // first
    // Long = 10000
    

    注意:在实践中,您应该创建本地延迟绑定,以确保不会在每次访问时评估参数.

    Note: In practice you should create local lazy binding to make sure that arguments are not evaluated on every access.

    无限惰性集合,如

    infinite lazy collections like Stream

    import org.apache.spark.mllib.random.RandomRDDs._
    
    val initial = normalRDD(sc, 1000000L, 10)
    
    // Infinite stream of RDDs and actions and nothing blows :)
    val stream: Stream[RDD[Double]] = Stream(initial).append(
      stream.map {
        case rdd if !rdd.isEmpty => 
          val mu = rdd.mean
          rdd.filter(_ > mu)
        case _ => sc.emptyRDD[Double]
      }
    )
    

  • 其中的一些子集应该足以实现复杂的惰性计算.

    Some subset of these should be more than enough to implement complex lazy computations.