且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

Spark SQL:无法在窗口函数中使用聚合

更新时间:2023-11-18 14:44:22

要将 concat 与 spark 窗口函数一起使用,您需要使用用户定义的聚合函数 (UDAF).窗口函数不能直接使用 concat 函数.

To use concat with spark window function you need to use user defined aggregate function(UDAF). You can't directly use concat function with window function.

//Extend UserDefinedAggregateFunction to write custom aggregate function
//You can also specify any constructor arguments. For instance you can have
//CustomConcat(arg1: Int, arg2: String)
class CustomConcat() extends org.apache.spark.sql.expressions.UserDefinedAggregateFunction {
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.Row
// Input Data Type Schema
def inputSchema: StructType = StructType(Array(StructField("description", StringType)))
// Intermediate Schema
def bufferSchema = StructType(Array(StructField("groupConcat", StringType)))
// Returned Data Type.
def dataType: DataType = StringType
// Self-explaining
def deterministic = true
// This function is called whenever key changes
def initialize(buffer: MutableAggregationBuffer) = {buffer(0) = " ".toString}
// Iterate over each entry of a group
def update(buffer: MutableAggregationBuffer, input: Row) = { buffer(0) = buffer.getString(0) + input.getString(0) }
// Merge two partial aggregates
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = { buffer1(0) = buffer1.getString(0) + buffer2.getString(0) }
// Called after all the entries are exhausted.
def evaluate(buffer: Row) = {buffer.getString(0)}
}
val newdescription = new CustomConcat
val newdesc1=newdescription($"description").over(windowspec)

您可以将 newdesc1 作为聚合函数用于窗口函数中的串联.有关更多信息,您可以查看:databricks udaf我希望这能回答您的问题.

You can use newdesc1 as an aggregate function for concatenation in window functions. For more information you can have a look at : databricks udaf I hope this will answer your question.