且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

Spark:如何按时间范围加入RDD

更新时间:2023-11-18 22:12:46

这是一个有趣的问题.我还花了一些时间弄清楚一种方法.这是我想出的:

It's an interesting problem. I also spent some time figuring out an approach. This is what I came up with:

给出Action(id, time, x)Historic(id, time, y)的案例类

  • 将动作与历史记录结合在一起(这可能很繁琐)
  • 过滤与给定操作无关的所有历史数据
  • 通过(id,time)键入结果-在不同的时间区分相同的密钥
  • 通过动作将历史减少到最大值,从而为我们提供给定动作的相关历史记录

在Spark中:

val actionById = actions.keyBy(_.id)
val historyById = historic.keyBy(_.id)
val actionByHistory = actionById.join(historyById)
val filteredActionByidTime = actionByHistory.collect{ case (k,(action,historic)) if (action.time>historic.t) => ((action.id, action.time),(action,historic))}
val topHistoricByAction = filteredActionByidTime.reduceByKey{ case ((a1:Action,h1:Historic),(a2:Action, h2:Historic)) =>  (a1, if (h1.t>h2.t) h1 else h2)}

// we are done, let's produce a report now
val report = topHistoricByAction.map{case ((id,time),(action,historic)) => (id,time,action.X -historic.y)}

使用上面提供的数据,报告如下:

Using the data provided above, the report looks like:

report.collect
Array[(Int, Long, Int)] = Array((1,43500,100), (1,45000,50), (2,45000,50))

(我将时间转换为秒,以简化时间戳记)

(I transformed the time to seconds to have a simplistic timestamp)