且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

使用mapPartition和迭代器保存spark spark RDD

更新时间:2023-11-18 22:47:52

几件事:


  • 如果你不要调用 Iterator.size 计划稍后使用数据。 Iterators TraversableOnce 。计算 Iterator 大小的唯一方法是遍历其所有元素,之后不再有数据要读取。

  • Don对于副作用,使用 mapPartitions 之类的转换。如果您想执行某些类型的IO使用操作,例如 foreach / foreachPartition 。这是一个不好的做法,并不保证给定的代码片段只会被执行一次。

  • 动作或转换中的本地路径是特定worker的本地路径。如果你想直接在客户端机器上写,你应该首先使用 collect toLocalIterator 获取数据。稍后写入分布式存储并获取数据可能会更好。


I have some intermediate data that I need to be stored in HDFS and local as well. I'm using Spark 1.6. In HDFS as intermediate form I'm getting data in /output/testDummy/part-00000 and /output/testDummy/part-00001. I want to save these partitions in local using Java/Scala so that I could save them as /users/home/indexes/index.nt(by merging both in local) or /users/home/indexes/index-0000.nt and /home/indexes/index-0001.nt separately.

Here is my code: Note: testDummy is same as test, output is with two partitions. I want to store them separately or combined but local with index.nt file. I prefer to store separately in two data-nodes. I'm using cluster and submit spark job on YARN. I also added some comments, how many times and what data I'm getting. How could I do? Any help is appreciated.

 val testDummy = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).saveAsTextFile(outputFilePathForHDFS+"/testDummy")
 println("testDummy done")   //1 time print

def savesData(iterator: Iterator[(String)]): Iterator[(String)] = {
    println("Inside savesData")                                 //  now 4 times when coalesce(Constants.INITIAL_PARTITIONS)=2
    println("iter size"+iterator.size)                           //  2 735 2 735 values
    val filenamesWithExtension = outputPath + "/index.nt"
    println("filenamesWithExtension "+filenamesWithExtension.length)   //4 times
    var list = List[(String)]()

    val fileWritter = new FileWriter(filenamesWithExtension,true)
    val bufferWritter = new BufferedWriter(fileWritter)

     while (iterator.hasNext){                       //iterator.hasNext is false
       println("inside iterator")                    //0 times 
       val dat = iterator.next()
       println("datadata "+iterator.next())

       bufferWritter.write(dat + "\n")
       bufferWritter.flush()
       println("index files written")

       val dataElements = dat.split(" ")
       println("dataElements")                                    //0
       list = list.::(dataElements(0))
       list = list.::(dataElements(1))
       list = list.::(dataElements(2))
     }
    bufferWritter.close() //closing
    println("savesData method end")                         //4 times when coal=2
    list.iterator
}

println("before saving data into local")                              //1
val test = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).mapPartitions(savesData)
println("testRDD partitions "+test.getNumPartitions)                               //2
println("testRDD size "+test.collect().length)                                //0
println("after saving data into local")   //1

PS: I followed, this and this but not exactly same what I'm looking for, I did somehow but not getting anything in index.nt

A couple of things:

  • Never call Iterator.size if you plan to use data later. Iterators are TraversableOnce. The only way to compute Iterator size is to traverse all its element and after that there is no more data to be read.
  • Don't use transformations like mapPartitions for side effects. If you want to perform some type of IO use actions like foreach / foreachPartition. It is a bad practice and doesn't guarantee that given piece of code will be executed only once.
  • Local path inside action or transformations is a local path of particular worker. If you want to write directly on the client machine you should fetch data first with collect or toLocalIterator. It could be better though to write to distributed storage and fetch data later.