更新时间:2023-11-18 22:12:46
这是一个有趣的问题.我还花了一些时间弄清楚一种方法.这是我想出的:
It's an interesting problem. I also spent some time figuring out an approach. This is what I came up with:
给出Action(id, time, x)
和Historic(id, time, y)
的案例类
在Spark中:
val actionById = actions.keyBy(_.id)
val historyById = historic.keyBy(_.id)
val actionByHistory = actionById.join(historyById)
val filteredActionByidTime = actionByHistory.collect{ case (k,(action,historic)) if (action.time>historic.t) => ((action.id, action.time),(action,historic))}
val topHistoricByAction = filteredActionByidTime.reduceByKey{ case ((a1:Action,h1:Historic),(a2:Action, h2:Historic)) => (a1, if (h1.t>h2.t) h1 else h2)}
// we are done, let's produce a report now
val report = topHistoricByAction.map{case ((id,time),(action,historic)) => (id,time,action.X -historic.y)}
使用上面提供的数据,报告如下:
Using the data provided above, the report looks like:
report.collect
Array[(Int, Long, Int)] = Array((1,43500,100), (1,45000,50), (2,45000,50))
(我将时间转换为秒,以简化时间戳记)
(I transformed the time to seconds to have a simplistic timestamp)