且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

为什么这个 Pig UDF 会导致“错误:Java 堆空间"?鉴于我将 DataBag 溢出到磁盘?

更新时间:2023-11-06 08:11:46

每次附加到 outputBag 时都应该增加 spillCount,而不是每次从迭代器.只有当溢出计数是 1000 的倍数并且不满足 if 条件时,您才会溢出,这可能不会经常发生(取决于逻辑).这或许可以解释为什么您看不到不同溢出阈值的太大差异.

You should increment spillCount every time you append to outputBag, not every time you get a tuple from the iterator. You are only spilling whenever the spillCount is a multiple of 1000 AND your if condition is not met, which may not happen that often (depending on the logic). This may explain why you don't see much difference for different spill thresholds.

如果这不能解决您的问题,我会尝试扩展 AccumulatorEvalFunc.在您的情况下,您实际上不需要访问整个包.您的实现适合累加器风格的实现,因为您只需要访问当前元组.这可能会减少内存使用.本质上,您将拥有一个 DataBag 类型的实例变量来累积最终输出.您还将拥有 aggregatedOutput 的实例变量,该变量将具有当前聚合.对 accumulate() 的调用将 1) 更新当前聚合,或 2) 将当前聚合添加到 aggregatedOutput 并开始新的聚合.这基本上遵循 for 循环的主体.

If that doesn't solve your problem I would try extending AccumulatorEvalFunc<DataBag>. In your case you don't actually need access to the whole bag. Your implementation fits with an accumulator style implementation because you only need access to the current tuple. This may reduce memory usage. Essentially you would have an instance variable of type DataBag that accumulates the final output. You would also have an instance variable for aggregatedOutput that would have the current aggregate. A call to accumulate() would either 1) update the current aggregate, or 2) add the current aggregate to aggregatedOutput and begin a new aggregate. This essentially follows the body of your for loop.