且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

如何在 Spark SQL 中定义和使用用户定义的聚合函数?

更新时间:2022-12-06 11:58:14

支持的方法

Spark >= 3.0

Scala UserDefinedAggregateFunction 已被弃用 (SPARK-30423 弃用 UserDefinedAggregateFunction) 以支持已注册的 Aggregator.

Spark >= 2.3

矢量化 udf(仅限 Python):

from pyspark.sql.functions import pandas_udf从 pyspark.sql.functions 导入 PandasUDFType从 pyspark.sql.types 导入 *将熊猫导入为 pddf = sc.parallelize([("a", 0), ("a", 1), ("b", 30), ("b", -50)]).toDF(["组", "功率"])def below_threshold(threshold, group="group", power="power"):@pandas_udf("struct<group: string, under_threshold: boolean>", PandasUDFType.GROUPED_MAP)def below_threshold_(df):df = pd.DataFrame(df.groupby(group).apply(lambda x: (x[power] 

示例用法:

df.groupBy("group").apply(below_threshold(-40)).show()## +-----+---------------+## |组|低于阈值|## +-----+-------------+## |乙|真|## |一个|假|## +-----+-------------+

另请参阅在 PySpark 中的 GroupedData 上应用 UDF(带有运行的 Python 示例)

Spark >= 2.0(可选 1.6,但 API 略有不同):

可以在类型化的 Datasets 上使用 Aggregators:

import org.apache.spark.sql.expressions.Aggregator导入 org.apache.spark.sql.{Encoder, Encoders}class belowThreshold[I](f: I => Boolean) extends Aggregator[I, Boolean, Boolean]带有可序列化的 {定义零 = 假def reduce(acc: Boolean, x: I) = acc |f(x)def merge(acc1: Boolean, acc2: Boolean) = acc1 |ACC2def Finish(acc: Boolean) = accdef bufferEncoder: Encoder[Boolean] = Encoders.scalaBooleandef outputEncoder: Encoder[Boolean] = Encoders.scalaBoolean}值低于阈值 = 新低于阈值 [(String, Int)](_._2 < - 40).toColumndf.as[(String, Int)].groupByKey(_._1).agg(低于阈值)

Spark >= 1.5:

在 Spark 1.5 中,您可以像这样创建 UDAF,尽管这很可能是一种矫枉过正:

import org.apache.spark.sql.expressions._导入 org.apache.spark.sql.types._导入 org.apache.spark.sql.Row低于阈值的对象扩展了 UserDefinedAggregateFunction {//作为输入获得的架构def inputSchema = new StructType().add("power", IntegerType)//用于聚合的行的模式def bufferSchema = new StructType().add("ind", BooleanType)//返回类型def 数据类型 = BooleanType//不言自明def确定性=真//零值def initialize(buffer: MutableAggregationBuffer) = buffer.update(0, false)//总体上类似于 seqOpdef更新(缓冲区:MutableAggregationBuffer,输入:行)= {如果 (!input.isNullAt(0))buffer.update(0, buffer.getBoolean(0) | input.getInt(0) 

示例用法:

df.groupBy($"group").agg(belowThreshold($"power").alias("belowThreshold")).展示//+-----+--------------+//|组|低于阈值|//+-----+--------------+//|一个|假|//|乙|真|//+-----+--------------+

Spark 1.4 解决方法:

我不确定我是否正确理解您的要求,但据我所知,简单的旧聚合在这里应该足够了:

val df = sc.parallelize(Seq(("a", 0), ("a", 1), ("b", 30), ("b", -50))).toDF("group", "power")df.withColumn("belowThreshold", ($"power".lt(-40)).cast(IntegerType)).groupBy($"group").agg(sum($"belowThreshold").notEqual(0).alias("belowThreshold")).展示//+-----+--------------+//|组|低于阈值|//+-----+--------------+//|一个|假|//|乙|真|//+-----+--------------+

Spark :

据我所知,目前(Spark 1.4.1),除了 Hive 之外,不支持 UDAF.Spark 1.5 应该可以实现(参见 SPARK-3947).>

不受支持的/内部方法

Spark 内部使用了许多类,包括 ImperativeAggregates引用一>.

仅供内部使用,可能会更改,恕不另行通知,因此您可能不想在生产代码中使用它,而只是为了完整性 BelowThresholdDeclarativeAggregate代码> 可以这样实现(用 Spark 2.2-SNAPSHOT 测试):

import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate导入 org.apache.spark.sql.catalyst.expressions._导入 org.apache.spark.sql.types._案例类低于阈值(子:表达式,阈值:表达式)扩展声明聚合{覆盖 def 子项:Seq[Expression] = Seq(child, threshold)覆盖 def 可为空的:Boolean = false覆盖定义数据类型:数据类型 = BooleanType低于阈值的私有惰性值 = AttributeReference(低于阈值",布尔类型,可为空 = false)()//用于派生模式覆盖惰性 val aggBufferAttributes = belowThreshold :: Nil覆盖惰性 val initialValues = Seq(文字(假))覆盖惰性 val updateExpressions = Seq(Or(低于阈值,If(IsNull(child), Literal(false), LessThan(child, threshold))))覆盖惰性 val mergeExpressions = Seq(或(低于Threshold.left,低于Threshold.right))覆盖惰性 val 评估表达式 = 低于阈值覆盖 def defaultResult: Option[Literal] = Option(Literal(false))}

它应该进一步包裹在 withAggregateFunction.

I know how to write a UDF in Spark SQL:

def belowThreshold(power: Int): Boolean = {
        return power < -40
      }

sqlContext.udf.register("belowThreshold", belowThreshold _)

Can I do something similar to define an aggregate function? How is this done?

For context, I want to run the following SQL query:

val aggDF = sqlContext.sql("""SELECT span, belowThreshold(opticalReceivePower), timestamp
                                    FROM ifDF
                                    WHERE opticalReceivePower IS NOT null
                                    GROUP BY span, timestamp
                                    ORDER BY span""")

It should return something like

Row(span1, false, T0)

I want the aggregate function to tell me if there's any values for opticalReceivePower in the groups defined by span and timestamp which are below the threshold. Do I need to write my UDAF differently to the UDF I pasted above?

Supported methods

Spark >= 3.0

Scala UserDefinedAggregateFunction is being deprecated (SPARK-30423 Deprecate UserDefinedAggregateFunction) in favor of registered Aggregator.

Spark >= 2.3

Vectorized udf (Python only):

from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType

from pyspark.sql.types import *
import pandas as pd

df = sc.parallelize([
    ("a", 0), ("a", 1), ("b", 30), ("b", -50)
]).toDF(["group", "power"])

def below_threshold(threshold, group="group", power="power"):
    @pandas_udf("struct<group: string, below_threshold: boolean>", PandasUDFType.GROUPED_MAP)
    def below_threshold_(df):
        df = pd.DataFrame(
           df.groupby(group).apply(lambda x: (x[power] < threshold).any()))
        df.reset_index(inplace=True, drop=False)
        return df

    return below_threshold_

Example usage:

df.groupBy("group").apply(below_threshold(-40)).show()

## +-----+---------------+
## |group|below_threshold|
## +-----+---------------+
## |    b|           true|
## |    a|          false|
## +-----+---------------+

See also Applying UDFs on GroupedData in PySpark (with functioning python example)

Spark >= 2.0 (optionally 1.6 but with slightly different API):

It is possible to use Aggregators on typed Datasets:

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders}

class BelowThreshold[I](f: I => Boolean)  extends Aggregator[I, Boolean, Boolean]
    with Serializable {
  def zero = false
  def reduce(acc: Boolean, x: I) = acc | f(x)
  def merge(acc1: Boolean, acc2: Boolean) = acc1 | acc2
  def finish(acc: Boolean) = acc

  def bufferEncoder: Encoder[Boolean] = Encoders.scalaBoolean
  def outputEncoder: Encoder[Boolean] = Encoders.scalaBoolean
}

val belowThreshold = new BelowThreshold[(String, Int)](_._2 < - 40).toColumn
df.as[(String, Int)].groupByKey(_._1).agg(belowThreshold)

Spark >= 1.5:

In Spark 1.5 you can create UDAF like this although it is most likely an overkill:

import org.apache.spark.sql.expressions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

object belowThreshold extends UserDefinedAggregateFunction {
    // Schema you get as an input
    def inputSchema = new StructType().add("power", IntegerType)
    // Schema of the row which is used for aggregation
    def bufferSchema = new StructType().add("ind", BooleanType)
    // Returned type
    def dataType = BooleanType
    // Self-explaining 
    def deterministic = true
    // zero value
    def initialize(buffer: MutableAggregationBuffer) = buffer.update(0, false)
    // Similar to seqOp in aggregate
    def update(buffer: MutableAggregationBuffer, input: Row) = {
        if (!input.isNullAt(0))
          buffer.update(0, buffer.getBoolean(0) | input.getInt(0) < -40)
    }
    // Similar to combOp in aggregate
    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
      buffer1.update(0, buffer1.getBoolean(0) | buffer2.getBoolean(0))    
    }
    // Called on exit to get return value
    def evaluate(buffer: Row) = buffer.getBoolean(0)
}

Example usage:

df
  .groupBy($"group")
  .agg(belowThreshold($"power").alias("belowThreshold"))
  .show

// +-----+--------------+
// |group|belowThreshold|
// +-----+--------------+
// |    a|         false|
// |    b|          true|
// +-----+--------------+

Spark 1.4 workaround:

I am not sure if I correctly understand your requirements but as far as I can tell plain old aggregation should be enough here:

val df = sc.parallelize(Seq(
    ("a", 0), ("a", 1), ("b", 30), ("b", -50))).toDF("group", "power")

df
  .withColumn("belowThreshold", ($"power".lt(-40)).cast(IntegerType))
  .groupBy($"group")
  .agg(sum($"belowThreshold").notEqual(0).alias("belowThreshold"))
  .show

// +-----+--------------+
// |group|belowThreshold|
// +-----+--------------+
// |    a|         false|
// |    b|          true|
// +-----+--------------+

Spark <= 1.4:

As far I know, at this moment (Spark 1.4.1), there is no support for UDAF, other than the Hive ones. It should be possible with Spark 1.5 (see SPARK-3947).

Unsupported / internal methods

Internally Spark uses a number of classes including ImperativeAggregates and DeclarativeAggregates.

There are intended for internal usage and may change without further notice, so it is probably not something you want to use in your production code, but just for completeness BelowThreshold with DeclarativeAggregate could be implemented like this (tested with Spark 2.2-SNAPSHOT):

import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._

case class BelowThreshold(child: Expression, threshold: Expression) 
    extends  DeclarativeAggregate  {
  override def children: Seq[Expression] = Seq(child, threshold)

  override def nullable: Boolean = false
  override def dataType: DataType = BooleanType

  private lazy val belowThreshold = AttributeReference(
    "belowThreshold", BooleanType, nullable = false
  )()

  // Used to derive schema
  override lazy val aggBufferAttributes = belowThreshold :: Nil

  override lazy val initialValues = Seq(
    Literal(false)
  )

  override lazy val updateExpressions = Seq(Or(
    belowThreshold,
    If(IsNull(child), Literal(false), LessThan(child, threshold))
  ))

  override lazy val mergeExpressions = Seq(
    Or(belowThreshold.left, belowThreshold.right)
  )

  override lazy val evaluateExpression = belowThreshold
  override def defaultResult: Option[Literal] = Option(Literal(false))
} 

It should be further wrapped with an equivalent of withAggregateFunction.