更新时间:2023-11-18 22:16:40
看着 RDD.checkpoint
文件,它说:
任何工作就这一RDD执行之前此功能必须在名为
块引用>事实上,如果你改变了code咯,有检查点做的前的收集
A
- 它的工作原理没有的***Error
:为(I< - 1〜1000){
一个= a.map(X => X + 1).persist 如果(I%100 == 0){
a.checkpoint()
} 变种B = a.collect() 打印(。)
}I am a beginner of Apache Spark. I am currently working on a Machine Learning program, which requires to iteratively update a RDD and then collect nearly 10KB data to driver from executors. Unfortunately, I get a *** error when it runs over 600 iterations! The following is my code. The *** error happened at collectAsMap function when iteration number is over 400! where indexedDevF and indexedData are indexedRDD (developed by AMPLab as an library provided https://github.com/amplab/spark-indexedrdd)
breakable{ while(bLow > bHigh + 2*tolerance){ indexedDevF = indexedDevF.innerJoin(indexedData){(id, a, b) => (b, a)}.mapValues( x => ( x._2 + alphaHighDiff * broad_y.value(iHigh) * kernel(x._1, dataiHigh) + alphaLowDiff * broad_y.value(iLow) * kernel(x._1, dataiLow) ) ) if (iteration % 50 == 0 ) { indexedDevF.checkpoint() } indexedDevF.persist() // essential to get correct answer val devFMap = indexedDevF.collectAsMap() //0.5s every time according to local:4040! here will *** var min_value = Double.PositiveInfinity var max_value = -min_value var min_i = -1 var max_i = -1 i = 0 while( i < m ){ if(((y(i) > 0) && (alpha(i) < cEpsilon)) || ((y(i) < 0) && (alpha(i) > epsilon))){ if( devFMap(i) <= min_value){ min_value = devFMap(i) min_i = i } } if(((y(i) > 0) && (alpha(i) > epsilon)) || ((y(i) < 0) && (alpha(i) < cEpsilon))){ if( devFMap(i) >= max_value ){ max_value = devFMap(i) max_i = i } } i = i+1 } iHigh = min_i iLow = max_i bHigh = devFMap(iHigh) bLow = devFMap(iLow) dataiHigh = indexedData.get(iHigh.toLong).get dataiLow = indexedData.get(iLow.toLong).get eta = 2 - 2 * kernel(dataiHigh, dataiLow) alphaHighOld = alpha(iHigh) alphaLowOld = alpha(iLow) var alphaDiff = alphaLowOld - alphaHighOld var lowLabel = y(iLow) var sign = y(iHigh) * lowLabel var alphaLowLowerBound = 0D var alphaLowUpperBound = 0D if (sign < 0){ if (alphaDiff < 0){ alphaLowLowerBound = 0; alphaLowUpperBound = cost + alphaDiff; } else{ alphaLowLowerBound = alphaDiff; alphaLowUpperBound = cost; } } else{ var alphaSum = alphaLowOld + alphaHighOld; if (alphaSum < cost){ alphaLowUpperBound = alphaSum; alphaLowLowerBound = 0; } else{ alphaLowLowerBound = alphaSum - cost; alphaLowUpperBound = cost; } } if (eta > 0){ alphaLowNew = alphaLowOld + lowLabel*(bHigh - bLow)/eta; if (alphaLowNew < alphaLowLowerBound) alphaLowNew = alphaLowLowerBound; else if (alphaLowNew > alphaLowUpperBound) alphaLowNew = alphaLowUpperBound; } else{ var slope = lowLabel * (bHigh - bLow); var delta = slope * (alphaLowUpperBound - alphaLowLowerBound); if (delta > 0){ if (slope > 0) alphaLowNew = alphaLowUpperBound; else alphaLowNew = alphaLowLowerBound; } else alphaLowNew = alphaLowOld; } alphaLowDiff = alphaLowNew - alphaLowOld; alphaHighDiff = -sign*(alphaLowDiff); alpha(iLow) = alphaLowNew; alpha(iHigh) = (alphaHighOld + alphaHighDiff); if(iteration % 50 == 0) print(".") iteration = iteration + 1; }
===================
The original question is as following, I find the checkpoint is useless and the program will conclude with *** errer!! I write a test simple code to describe my problem. Fortunately, a nice guy help me solve the problem, you can find the answer below! However, even the checkpoint really works, I still get *** error with my program :(
for(i <- 1 to 1000){ a = a.map(x => x+1).persist var b = a.collect() if(i%100 == 0){ a.checkpoint() } print(".") }
Looking at
RDD.checkpoint
documentation, it says:This function must be called before any job has been executed on this RDD
And indeed, if you change your code slightly, to have the checkpoint done before collecting
a
- it works with no***Error
:for(i <- 1 to 1000){ a = a.map(x => x+1).persist if(i%100 == 0){ a.checkpoint() } var b = a.collect() print(".") }