且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

从RDD中的2个值添加一个新的计算列

更新时间:2023-01-08 18:02:02

您应该可以使用 map 来扩展2组合成3元组,大致如下:

  joined.map {case(key,values)=> 
val delta = computeDelta(values)
(key,values,delta)
}

或者更简洁:

  joined.map {case(k,vs)=> (k,vs,computeDelta(vs))} 

然后你的 computeDelta 函数只能提取类型为(String,DateTime,Int,Int)的第一个和第二个值,获取第二个项目( DateTime ),并使用任何 DateTime 函数计算增量。方便。



如果您希望输出RDD仍然是一个配对RDD,那么您将需要将新的delta字段包装成一个元组,大致如下:

  joined.mapValues {values => 
val delta = computeDelta(values)
(值,delta)
}

将保留原始的PairedRDD键,并给出类型为(Iterable [(String,DateTime,Int,Int)],Long)的值



(假设您正在计算类型为的长三角形


I have 2 paired RDDs that I joined them together using the same key and I now I want to add a new calculated column using 2 columns from the values part. The new joined RDD type is:

RDD[((String, Int), Iterable[((String, DateTime, Int,Int), (String, DateTime, String, String))])]

I want to add another field to the new RDD which show the delta between the 2 DateTime fields.

How can I do this?

You should be able to do this using map to extend the 2-tuples into 3-tuples, roughly as follows:

joined.map{ case (key, values) =>
  val delta = computeDelta(values)
  (key, values, delta)
}

Or, more concisely:

joined.map{ case (k, vs) => (k, vs, computeDelta(vs)) }

Then your computeDelta function can just extract the first and second values of type (String, DateTime, Int,Int), get the second item (DateTime) from each and compute the delta using whatever DateTime functions are convenient.

If you want your output RDD to still be a paired RDD, then you will need to wrap the new delta field into a tuple, roughly as follows:

joined.mapValues{ values =>
  val delta = computeDelta(values)
  (values, delta)
}

which will preserve the original PairedRDD keys, and give you values of type (Iterable[(String, DateTime, Int,Int)], Long)

(assuming you are calculating deltas of type Long)