且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

以字节数组为键的ReduceByKey

更新时间:2022-11-19 17:56:38

自定义比较器是不够的,因为Spark使用对象的hashCode来组织分区中的键. (至少HashPartitioner会做到这一点,您可以提供一个可以处理数组的自定义分区程序)

Custom comparers are insufficient because Spark uses the hashCode of the objects to organize keys in partitions. (At least the HashPartitioner will do that, you could provide a custom partitioner that can deal with arrays)

包装数组以提供正确的equalshashCode应该可以解决此问题. 一个轻量级的包装程序可以解决这个问题:

Wrapping the array to provide proper equals and hashCode should address the issue. A lightweight wrapper should do the trick:

class SerByteArr(val bytes: Array[Byte]) extends Serializable {
    override val hashCode = bytes.deep.hashCode
    override def equals(obj:Any) = obj.isInstanceOf[SerByteArr] && obj.asInstanceOf[SerByteArr].bytes.deep == this.bytes.deep
}

快速测试:

import scala.util.Random
val data = (1 to 100000).map(_ => Random.nextInt(100).toString.getBytes("UTF-8"))
val rdd = sparkContext.parallelize(data)
val byKey = rdd.keyBy(identity)
// this won't work b/c the partitioner does not support arrays as keys
val grouped = byKey.groupByKey
// org.apache.spark.SparkException: Default partitioner cannot partition array keys.

// let's use the wrapper instead   

val keyable = rdd.map(elem =>  new SerByteArr(elem))
val bySerKey = keyable.keyBy(identity)
val grouped = bySerKey.groupByKey
grouped.count
// res14: Long = 100