且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

OVER窗口

更新时间:2022-09-13 21:40:01

OVER窗口(OVER Window)是传统数据库的标准开窗,OVER Window不同于Group By Window,OVER Window中每一个元素都对应一个窗口。窗口元素是与当前元素相邻的元素集合, 流上元素会在多个窗口中。在Flink SQL Window的实现中,每个触发计算的元素所确定的行,都是该元素所在窗口的最后一行。

生命周期

在应用OVER Window的Stream中,每一个元素都对应一个OVER Window。每一个元素都触发一次数据计算。在Flink的底层实现中,OVER Window的数据进行全局统一管理(数据只存储一份),逻辑上为每一个元素维护一个OVER Window,为每一个元素进行窗口计算,完成计算后会清除过期的数据。

SQL


  1. SELECT
  2. agg1(col1) OVER (definition1) AS colName,
  3. ...
  4. aggN(colN) OVER (definition1) AS colNameN
  5. FROM Tab1

注意

  • agg1到aggN所对应的OVER definition1必须相同。
  • AS 的别名可供外层SQL进行查询。

类型

Flink SQL中对OVER Window的定义遵循标准SQL的定义语法,传统OVER Window没有对其进行更细粒度的窗口类型命名划分。本节为了让您更清楚细致了解OVER Window的语义,将OVER Window按照计算行的定义方式划分为如下两类。

  • ROWS OVER Window - 每一行元素都视为新的计算行,即每一行都是一个新的窗口。
  • RANGE OVER Window - 具有相同时间值的所有元素行视为同一计算行,即具有相同时间值的所有行都是同一个窗口。

属性

正交属性 proctime eventtime
rows
range
  • rows - 按照实际元素的行进行确定窗口。
  • range - 按照实际的元素值(时间戳值)进行确定窗口。

语义

Rows OVER Window

窗口数据

ROWS OVER Window的每个元素都确定一个窗口。ROWS OVER Window也有Unbounded和Bounded的两种情况。

Unbounded ROWS OVER Window 数据如下图所示。

OVER窗口

注意: 上图所示窗口user1的w7和w8,user2的窗口w3和w4,虽然元素都是同一时刻到达,但是它们仍然是在不同的窗口,这一点有别于RANGE OVER Window。

Bounded ROWS OVER Window 数据以3个元素(2 PRECEDING)的窗口为例,如下图所示。

OVER窗口

注意: 上图所示窗口user1的w5和w6,user2的窗口w2和w3,虽然有元素都是同一时刻到达,但是它们仍然是在不同的窗口,这一点有别于RANGE OVER Window。

窗口语法


  1. SELECT
  2. agg1(col1) OVER(
  3. [PARTITION BY (value_expression1,..., value_expressionN)]
  4. ORDER BY timeCol
  5. ROWS
  6. BETWEEN (UNBOUNDED | rowCount) PRECEDING AND CURRENT ROW) AS colName, ...
  7. FROM Tab1
  • value_expression - 分区值表达式。
  • timeCol - 用于元素排序的时间字段。
  • rowCount - 是定义根据当前行开始向前追溯几行元素。

案例

以Bounded ROWS OVER Window场景示例,假设有一张商品上架表,包含有商品ID、商品类型、商品上架时间、商品价格数据。假设求在当前商品上架之前同类的3个商品中的最高价格。

测试数据

商品ID 商品类型 上架时间 销售价格
ITEM001 Electronic 2017-11-11 10:01:00 20
ITEM002 Electronic 2017-11-11 10:02:00 50
ITEM003 Electronic 2017-11-11 10:03:00 30
ITEM004 Electronic 2017-11-11 10:03:00 60
ITEM005 Electronic 2017-11-11 10:05:00 40
ITEM006 Electronic 2017-11-11 10:06:00 20
ITEM007 Electronic 2017-11-11 10:07:00 70
ITEM008 Clothes 2017-11-11 10:08:00 20

测试代码


  1. CREATE TABLE tmall_item(
  2. itemID VARCHAR,
  3. itemType VARCHAR,
  4. onSellTime TIMESTAMP,
  5. price DOUBLE
  6. WATERMARK onSellTime FOR onSellTime as withOffset(onSellTime, 0)
  7. )
  8. WITH (
  9. type = 'sls',
  10. ...
  11. ) ;
  12. SELECT
  13. itemID,
  14. itemType,
  15. onSellTime,
  16. price,
  17. MAX(price) OVER (
  18. PARTITION BY itemType
  19. ORDER BY onSellTime
  20. ROWS BETWEEN 2 preceding AND CURRENT ROW) AS maxPrice
  21. FROM tmall_item

测试结果

itemID itemType onSellTime price maxPrice
ITEM001 Electronic 2017-11-11 10:01:00 20 20
ITEM002 Electronic 2017-11-11 10:02:00 50 50
ITEM003 Electronic 2017-11-11 10:03:00 30 50
ITEM004 Electronic 2017-11-11 10:03:00 60 60
ITEM005 Electronic 2017-11-11 10:05:00 40 60
ITEM006 Electronic 2017-11-11 10:06:00 20 60
ITEM007 Electronic 2017-11-11 10:07:00 70 70
ITEM008 Clothes 2017-11-11 10:08:00 20 20

RANGE OVER Window

窗口数据

RANGE OVER Window所有具有共同元素值(元素时间戳)的元素行确定一个窗口,RANGE OVER Window也有Unbounded和Bounded的两种情况。

Unbounded RANGE OVER Window 数据如下图所示。OVER窗口

注意: 上图所示窗口user1的w7, user2的窗口w3 ,两个元素同一时刻到达,他们属于相同的window,这一点有别于ROWS OVER Window。

Bounded RANGE OVER Window 数据,以3秒中数据(INTERVAL ‘2’ SECOND)的窗口为例,如下图所示。

OVER窗口

注意: 上图所示窗口user1的w6, user2的窗口w3,元素都是同一时刻到达,他们属于相同的window,这一点有别于ROWS OVER Window。

窗口语法


  1. SELECT
  2. agg1(col1) OVER(
  3. [PARTITION BY (value_expression1,..., value_expressionN)]
  4. ORDER BY timeCol
  5. RANGE
  6. BETWEEN (UNBOUNDED | timeInterval) PRECEDING AND CURRENT ROW) AS colName,
  7. ...
  8. FROM Tab1
  • value_expression - 进行分区的字表达式。
  • timeCol - 用于元素排序的时间字段。
  • timeInterval - 是定义根据当前行开始向前追溯指定时间的元素行。

案例

以Bounded RANGE OVER Window场景示例,假设有一张商品上架表,包含有商品ID、商品类型、商品上架时间、商品价格数据。假设求比当前商品上架时间早2分钟的同类商品中的最高价格。

测试数据

商品ID 商品类型 上架时间 销售价格
ITEM001 Electronic 2017-11-11 10:01:00 20
ITEM002 Electronic 2017-11-11 10:02:00 50
ITEM003 Electronic 2017-11-11 10:03:00 30
ITEM004 Electronic 2017-11-11 10:03:00 60
ITEM005 Electronic 2017-11-11 10:05:00 40
ITEM006 Electronic 2017-11-11 10:06:00 20
ITEM007 Electronic 2017-11-11 10:07:00 70
ITEM008 Clothes 2017-11-11 10:08:00 20

测试代码


  1. CREATE TABLE tmall_item(
  2. itemID VARCHAR,
  3. itemType VARCHAR,
  4. onSellTime TIMESTAMP,
  5. price DOUBLE
  6. WATERMARK onSellTime FOR onSellTime as withOffset(onSellTime, 0)
  7. )
  8. WITH (
  9. type = 'sls',
  10. ...
  11. ) ;
  12. SELECT
  13. itemID,
  14. itemType,
  15. onSellTime,
  16. price,
  17. MAX(price) OVER (
  18. PARTITION BY itemType
  19. ORDER BY onSellTime
  20. RANGE BETWEEN INTERVAL '2' MINUTE preceding AND CURRENT ROW) AS maxPrice
  21. FROM tmall_item

测试结果

itemID itemType onSellTime price maxPrice
ITEM001 Electronic 2017-11-11 10:01:00 20 20
ITEM002 Electronic 2017-11-11 10:02:00 50 50
ITEM003 Electronic 2017-11-11 10:03:00 30 50
ITEM004 Electronic 2017-11-11 10:03:00 60 60
ITEM005 Electronic 2017-11-11 10:05:00 40 60
ITEM006 Electronic 2017-11-11 10:06:00 20 40
ITEM007 Electronic 2017-11-11 10:07:00 70 70
ITEM008 Clothes 2017-11-11 10:08:00 20 20

本文转自实时计算——OVER窗口