且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

Spark数据框:选择不同的行

更新时间:2023-11-18 23:18:58

您遇到的问题已在异常消息中明确说明-因为 MapType 列既不可散列也不可排序,不能用作一部分对表达式进行分组或分区.

The problem you face is explicitly stated in the exception message - because MapType columns are neither hashable nor orderable cannot be used as a part of grouping or partitioning expression.

您对SQL解决方案的看法在逻辑上不等同于 Dataset 上的 distinct .如果要基于一组兼容的列对数据进行重复数据删除,则应使用 dropDuplicates :

Your take on SQL solution is not logically equivalent to distinct on Dataset. If you want to deduplicate data based on a set of compatible columns you should use dropDuplicates:

df.dropDuplicates("timestamp")

等同于

SELECT timestamp, first(c1) AS c1, first(c2) AS c2,  ..., first(cn) AS cn,
       first(canvasHashes) AS canvasHashes
FROM df GROUP BY timestamp

不幸的是,如果您的目标是实际的 DISTINCT ,那么就不会那么容易了.一种可能的解决方案是利用Scala * Map 哈希.您可以这样定义 Scala udf :

Unfortunately if your goal is actual DISTINCT it won't be so easy. On possible solution is to leverage Scala* Map hashing. You could define Scala udf like this:

spark.udf.register("scalaHash", (x: Map[String, String]) => x.##)

,然后在Java代码中使用它来派生可用于 dropDuplicates 的列:

and then use it in your Java code to derive column that can be used to dropDuplicates:

 df
  .selectExpr("*", "scalaHash(canvasHashes) AS hash_of_canvas_hashes")
  .dropDuplicates(
    // All columns excluding canvasHashes / hash_of_canvas_hashes
    "timestamp",  "c1", "c2", ..., "cn" 
    // Hash used as surrogate of canvasHashes
    "hash_of_canvas_hashes"         
  )

与SQL等效

SELECT 
  timestamp, c1, c2, ..., cn,   -- All columns excluding canvasHashes
  first(canvasHashes) AS canvasHashes
FROM df GROUP BY
  timestamp, c1, c2, ..., cn    -- All columns excluding canvasHashes


*请注意,带有 hashCode java.util.Map 无效,因为 hashCode 不一致.