且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

在Apache Spark中,如何使RDD/DataFrame操作变得懒惰?

更新时间:2023-11-18 22:17:28

对我来说,要实现的目标还不是很清楚,但是Scala本身至少提供了一些可能有用的工具:

It is not exactly clear to me what you try to achieve but Scala itself provides at least few tools which you may find useful:

  • 惰性值:

  • lazy vals:

val rdd = sc.range(0, 10000)

lazy val count = rdd.count  // Nothing is executed here
// count: Long = <lazy>

count  // count is evaluated only when it is actually used 
// Long = 10000   

  • 呼叫名称(在函数定义中由=>表示):

  • call-by-name (denoted by => in the function definition):

    def  foo(first: => Long, second: => Long, takeFirst: Boolean): Long =
      if (takeFirst) first else second
    
    val rdd1 = sc.range(0, 10000)
    val rdd2 = sc.range(0, 10000)
    
    foo(
      { println("first"); rdd1.count },
      { println("second"); rdd2.count },
      true  // Only first will be evaluated
    )
    // first
    // Long = 10000
    

    注意:在实践中,您应该创建本地延迟绑定,以确保不会在每次访问时都对参数进行评估.

    Note: In practice you should create local lazy binding to make sure that arguments are not evaluated on every access.

    无限懒惰集合,例如

    import org.apache.spark.mllib.random.RandomRDDs._
    
    val initial = normalRDD(sc, 1000000L, 10)
    
    // Infinite stream of RDDs and actions and nothing blows :)
    val stream: Stream[RDD[Double]] = Stream(initial).append(
      stream.map {
        case rdd if !rdd.isEmpty => 
          val mu = rdd.mean
          rdd.filter(_ > mu)
        case _ => sc.emptyRDD[Double]
      }
    )
    

  • 其中一些子集应该足以实现复杂的惰性计算.

    Some subset of these should be more than enough to implement complex lazy computations.