更新时间:2023-11-18 22:47:52
几件事:
Iterator.size
计划稍后使用数据。 Iterators
是 TraversableOnce
。计算 Iterator
大小的唯一方法是遍历其所有元素,之后不再有数据要读取。 mapPartitions
之类的转换。如果您想执行某些类型的IO使用操作,例如 foreach
/ foreachPartition
。这是一个不好的做法,并不保证给定的代码片段只会被执行一次。 collect
或 toLocalIterator
获取数据。稍后写入分布式存储并获取数据可能会更好。
I have some intermediate data that I need to be stored in HDFS and local as well. I'm using Spark 1.6. In HDFS as intermediate form I'm getting data in /output/testDummy/part-00000
and /output/testDummy/part-00001
. I want to save these partitions in local using Java/Scala so that I could save them as /users/home/indexes/index.nt
(by merging both in local) or /users/home/indexes/index-0000.nt
and /home/indexes/index-0001.nt
separately.
Here is my code:
Note: testDummy is same as test, output is with two partitions. I want to store them separately or combined but local with index.nt
file. I prefer to store separately in two data-nodes. I'm using cluster and submit spark job on YARN. I also added some comments, how many times and what data I'm getting. How could I do? Any help is appreciated.
val testDummy = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).saveAsTextFile(outputFilePathForHDFS+"/testDummy")
println("testDummy done") //1 time print
def savesData(iterator: Iterator[(String)]): Iterator[(String)] = {
println("Inside savesData") // now 4 times when coalesce(Constants.INITIAL_PARTITIONS)=2
println("iter size"+iterator.size) // 2 735 2 735 values
val filenamesWithExtension = outputPath + "/index.nt"
println("filenamesWithExtension "+filenamesWithExtension.length) //4 times
var list = List[(String)]()
val fileWritter = new FileWriter(filenamesWithExtension,true)
val bufferWritter = new BufferedWriter(fileWritter)
while (iterator.hasNext){ //iterator.hasNext is false
println("inside iterator") //0 times
val dat = iterator.next()
println("datadata "+iterator.next())
bufferWritter.write(dat + "\n")
bufferWritter.flush()
println("index files written")
val dataElements = dat.split(" ")
println("dataElements") //0
list = list.::(dataElements(0))
list = list.::(dataElements(1))
list = list.::(dataElements(2))
}
bufferWritter.close() //closing
println("savesData method end") //4 times when coal=2
list.iterator
}
println("before saving data into local") //1
val test = outputFlatMapTuples.coalesce(Constants.INITIAL_PARTITIONS).mapPartitions(savesData)
println("testRDD partitions "+test.getNumPartitions) //2
println("testRDD size "+test.collect().length) //0
println("after saving data into local") //1
PS: I followed, this and this but not exactly same what I'm looking for, I did somehow but not getting anything in index.nt
A couple of things:
Iterator.size
if you plan to use data later. Iterators
are TraversableOnce
. The only way to compute Iterator
size is to traverse all its element and after that there is no more data to be read.mapPartitions
for side effects. If you want to perform some type of IO use actions like foreach
/ foreachPartition
. It is a bad practice and doesn't guarantee that given piece of code will be executed only once.collect
or toLocalIterator
. It could be better though to write to distributed storage and fetch data later.