且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

复杂事件处理(CEP)语句

更新时间:2021-12-08 05:50:16

MATCH_RECOGNIZE用于从输入流中识别符合指定规则的事件,并按照指定的方式输出。

语法


  1. SELECT [ ALL | DISTINCT ]
  2. { * | projectItem [, projectItem ]* }
  3. FROM tableExpression
  4. [MATCH_RECOGNIZE (
  5. [PARTITION BY {partitionItem [, partitionItem]*}]
  6. [ORDER BY {orderItem [, orderItem]*}]
  7. [MEASURES {measureItem AS col [, measureItem AS col]*}]
  8. [ONE ROW PER MATCH|ALL ROWS PER MATCH|ONE ROW PER MATCH WITH TIMEOUT ROWS|ALL ROWS PER MATCH WITH TIMEOUT ROWS]
  9. [AFTER MATCH SKIP]
  10. PATTERN (patternVariable[quantifier] [ patternVariable[quantifier]]*) WITHIN intervalExpression
  11. DEFINE {patternVariable AS patternDefinationExpression [, patternVariable AS patternDefinationExpression]*}
  12. )];

参数说明

  • PARTITION BY 指定分区的列,可选项。
  • ORDER BY 可以指定多列,但是必须以event time列或者process time列作为排序的首列,可选项。
  • MEASURES 定义如何根据匹配成功的输入事件构造输出事件。
  • ONE ROW PER MATCH 对于每一次成功的匹配,只会产生一个输出事件。
  • ONE ROW PER MATCH WITH TIMEOUT ROWS 除了匹配成功的时候产生输出外,超时的时候也会产生输出。超时时间由PATTERN语句中的WITHIN语句定义。
  • ALL ROW PER MATCH 对于每一次成功的匹配,对应于每一个输入事件,都会产生一个输出事件。
  • ALL ROW PER MATCH WITH TIMEOUT ROWS 除了匹配成功的时候产生输出外,超时的时候也会产生输出。超时时间由PATTERN语句中的WITHIN语句定义。
  • [ONE ROW PER MATCH|ALL ROWS PER MATCH|ONE ROW PER MATCH WITH TIMEOUT ROWS|ALL ROWS PER MATCH WITH TIMEOUT ROWS]为可选项,默认为ONE ROW PER MATCH
  • AFTER MATCH SKIP TO NEXT ROW 匹配成功之后,从匹配成功的事件序列中的第一个事件的下一个事件开始进行下一次匹配。
  • AFTER MATCH SKIP PAST LAST ROW 匹配成功之后,从匹配成功的事件序列中的最后一个事件的下一个事件开始进行下一次匹配。
  • AFTER MATCH SKIP TO FIRST patternItem 匹配成功之后,从匹配成功的事件序列中第一个对应于patternItem的事件开始下一次匹配。
  • AFTER MATCH SKIP TO LAST patternItem 匹配成功之后,从匹配成功的事件序列中最后一个对应于patternItem的事件开始下一次匹配。
  • PATTERN 定义待识别的事件序列需要满足的规则,需要定义在()中,由一系列自定义的patternVariable构成。

    说明:

    • patternVariable之间若以空格间隔,表示符合这两种patternVariable的事件中间不存在其他事件。
    • patternVariable之间若以->间隔,表示符合这两种patternVariable的事件之间可以存在其它事件。

Quantifier

quantifier用于指定符合patternVariable定义的事件的出现次数。

参数 参数意义
* 0次或多次
+ 1次或多次
? 0次或1次
{n} n次
{n,} 大于等于n次
{n, m} 大于等于n次,小于等于m次
{,m} 小于等于m次

默认为贪婪匹配。比如对于pattern: A -> B+,输入:a b1, b2, b3,输出为:a b1, a b1 b2, a b1 b2 b3。可以在quantifier符号后面加来表示非贪婪匹配。

  • *?
  • +?
  • {n}?
  • {n,}?
  • {n, m}?
  • {,m}?

此时对于上面例子中的pattern及输入,产生的输出为:a b1, a b2, a b1 b2, a b3, a b2 b3, a b1 b2 b3

注意:

  • WITHIN 定义符合规则的事件序列的最大时间跨度。
  • 静态窗口
    格式:INTERVAL ‘string’ timeUnit [ TO timeUnit ]
    示例:INTERVAL ‘10’ SECOND, INTERVAL ‘45’ DAY, INTERVAL ‘10:20’ MINUTE TO SECOND, INTERVAL ‘10:20.10’ MINUTE TO SECOND, INTERVAL ‘10:20’ HOUR TO MINUTE, INTERVAL ‘1-5’ YEAR TO MONTH
  • 动态窗口
    格式: INTERVAL intervalExpression
    示例: INTERVAL A.windowTime + 10,其中A为pattern定义中第一个patternVariable。
    在intervalExpression的定义中,可以使用pattern定义中出现过的patternVariable。当前只能使用第一个patternVariable。intervalExpression中可以使用UDF,intervalExpression的结果必须为long,单位为millisecond, 表示窗口的大小。
  • DEFINE 定义在PATTERN中出现的patternVariable的具体含义,若某个patternVariable在DEFINE中没有定义,则认为对于每一个事件,该patternVariable都成立。

在MEASURES和DEFINE语句中,可以使用如下函数。

函数 函数意义
Row Pattern Column References 形式为: patternVariable.col。表示访问patternVariable所对应的事件的指定的列。
PREV 只能用在DEFINE语句中,一般与Row Pattern Column References合用。用于访问指定的pattern所对应的事件之前偏移指定的offset所对应的事件的指定的列。
示例:对于DOWN AS DOWN.price < PREV(DOWN.price),PREV(A.price)表示当前事件的前一个事件的price列的值。注意,DOWN.price等价于PREV(DOWN.price, 0)。 PREV(DOWN.price)等价于PREV(DOWN.price, 1)。
FIRST、LAST 一般与Row Pattern Column References合用,用于访问指定的pattern所对应的事件序列中的指定偏移位置的事件。
示例:FIRST(A.price, 3)表示pattern A所对应的事件序列中的第3个事件。LAST(A.price, 3)表示pattern A所对应的事件序列中的倒数第3个事件。

输出列:

函数 输出列
ONE ROW PER MATCH 包括 partition by中指定的列及measures中定义的列。 对于partition by中已经指定的列,在measures中无需重复定义。
ONE ROW PER MATCH WITH TIMEOUT ROWS 除匹配成功的时候产生输出外,超时的时候也会产生输出,超时时间由PATTERN语句中的WITHIN语句定义。

注意:

  1. 定义pattern的时候,***也定义WITHIN,否则可能会造成state越来越大。
  2. order by中定义的首列必须为event time列或者process time列。

示例


  1. SELECT *
  2. FROM Ticker MATCH_RECOGNIZE (
  3. PARTITION BY symbol
  4. ORDER BY tstamp
  5. MEASURES STRT.tstamp AS start_tstamp,
  6. LAST(DOWN.tstamp) AS bottom_tstamp,
  7. LAST(UP.tstamp) AS end_tstamp
  8. ONE ROW PER MATCH
  9. AFTER MATCH SKIP TO NEXT ROW
  10. PATTERN (STRT DOWN+ UP+) WITHIN INTERVAL '10' SECOND
  11. DEFINE
  12. DOWN AS DOWN.price < PREV(DOWN.price),
  13. UP AS UP.price > PREV(UP.price)
  14. ) MR
  15. ORDER BY MR.symbol, MR.start_tstamp;

测试数据

timestamp(TIMESTAMP) card_id(VARCHAR) location(VARCHAR) action(VARCHAR)
2018-04-13 12:00:00 1 WW Tom
2018-04-13 12:05:00 1 WW1 Tom
2018-04-13 12:10:00 1 WW2 Tom
2018-04-13 12:20:00 1 WW Tom

测试案例


  1. CREATE TABLE datahub_stream (
  2. `timestamp` TIMESTAMP,
  3. card_id VARCHAR,
  4. location VARCHAR,
  5. `action` VARCHAR,
  6. WATERMARK wf FOR `timestamp` AS withOffset(`timestamp`, 1000)
  7. ) WITH (
  8. type = 'datahub'
  9. ...
  10. );
  11. CREATE TABLE rds_out (
  12. start_timestamp TIMESTAMP,
  13. end_timestamp TIMESTAMP,
  14. card_id VARCHAR,
  15. event VARCHAR
  16. ) WITH (
  17. type= 'rds'
  18. ...
  19. );
  20. --案例描述
  21. -- 当相同的card_id在十分钟内,从两个不同的location发生刷卡现象,就会触发报警机制,以便于监测信用卡盗刷等现象
  22. -- 定义计算逻辑
  23. insert into rds_out
  24. select
  25. `start_timestamp`,
  26. `end_timestamp`,
  27. card_id, `event`
  28. from datahub_stream
  29. MATCH_RECOGNIZE (
  30. PARTITION BY card_id -- card_id分区,将相同卡号的数据分到同一个计算节点上。
  31. ORDER BY `timestamp` -- 在窗口内,对事件时间进行排序。
  32. MEASURES --定义如何根据匹配成功的输入事件构造输出事件。
  33. e2.`action` as `event`,
  34. e1.`timestamp` as `start_timestamp`, --第一次的事件时间为start_timestamp
  35. LAST(e2.`timestamp`) as `end_timestamp`--最新的事件时间为end_timestamp
  36. ONE ROW PER MATCH --匹配成功输出一条。
  37. AFTER MATCH SKIP TO NEXT ROW--匹配后跳转到下一行。
  38. PATTERN (e1 e2+) WITHIN INTERVAL '10' MINUTE -- 定义两个事件,e1e2
  39. DEFINE --定义在PATTERN中出现的patternVariable的具体含义。
  40. e1 as e1.action = 'Tom', --事件一的action标记为Tom
  41. e2 as e2.action = 'Tom' and e2.location <> e1.location --事件二的action标记为Tom,且事件一和事件二的location不一致。
  42. );

测试结果

start_timestamp(TIMESTAMP) end_timestamp(TIMESTAMP) card_id(VARCHAR) event(VARCHAR)
2018-04-13 20:00:00.0 2018-04-13 20:05:00.0 1 Tom
2018-04-13 20:05:00.0 2018-04-13 20:10:00.0 1 Tom
本文转自实时计算——复杂事件处理(CEP)语句