且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

TopN语句

更新时间:2022-09-13 21:39:55

TopN语句常用于计算实时数据中对某个指标的最大或者最小的前N个数据的筛选。Flink SQL可以基于 OVER窗口操作灵活地完成TopN的工作。

语法


  1. SELECT *
  2. FROM (
  3. SELECT *,
  4. ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]]
  5. ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
  6. FROM table_name)
  7. WHERE rownum <= N [AND conditions]

参数说明:

  • ROW_NUMBER():行号计算函数OVER的窗口,行号计算从1开始。
  • PARTITION BY col1[, col2..] : 指定分区的列,可以不指定。
  • ORDER BY col1 [asc|desc][, col2 [asc|desc]...]:指定排序的列和每列的排序方向。

如上语法所示,TopN需要两层query。

  • 查询中使用 ROW_NUMBER() 窗口函数来对数据根据排序列进行排序并标上排名。
  • 外层查询中对排名进行过滤,只取前N条。例如N=10即为取前10条的数据。

在执行过程中,Flink SQL会对输入的数据流根据排序键进行排序。如果某个分区的前N条记录发生了改变,则会将改变的那几条数据以更新流的形式发给下游。

注意:如果需要将TopN的数据输出到外部存储,后接的结果表必须是一个带主键的表。

WHERE条件的限制
为了使Flink SQL能识别出这是一个TopN的query,外层循环中必须要指定 rownum <= N的格式来指定前N条记录,不能使用rownum - 5 <= N 这种将rownum至于某个表达式中。当然,WHERE条件中,可以额外带上其他条件,但是必须是以AND连接。

示例一

如下示例,先统计查询流中每小时、每个城市和关键字被查询的次数。然后输出每小时、每个城市被查询最多的前100个关键字。在输出表中,小时、城市、排名三者可以唯一确定一条记录,所以需要将这三列声明成联合主键(需要在外部存储中也有同样的主键设置)。


  1. CREATE TABLE rds_output (
  2. rownum int,
  3. start_time bigint,
  4. city varchar,
  5. keyword varchar,
  6. pv bigint,
  7. PRIMARY KEY (rownum, start_time, city)
  8. ) with (
  9. type = 'rds',
  10. ...
  11. )
  12. INSERT INTO rds_output
  13. SELECT rownum, start_time, city, keyword, pv
  14. FROM (
  15. SELECT *,
  16. ROW_NUMBER() OVER (PARTITION BY start_time, city ORDER BY pv desc) AS rownum
  17. FROM (
  18. select substr(time_str,1,12) as start_time,
  19. keyword,
  20. count(1) as pv,
  21. city
  22. from tmp_search
  23. group by substr(time_str,1,12), keyword, city
  24. ) a
  25. )
  26. WHERE rownum <= 100

示例二

测试数据

ip(varchar) time(varchar)
192.168.1.1 100000000
192.168.1.2 100000000
192.168.1.2 100000000
192.168.1.3 100030000
192.168.1.3 100000000
192.168.1.3 100000000

测试语句


  1. create table source_table (
  2. IP VARCHAR,
  3. `TIME` VARCHAR
  4. )with(
  5. type='datahub',
  6. endPoint='http://dh-cn-hangzhou.aliyuncs.com',
  7. project='blink_test',
  8. topic='ip_count01',
  9. accessId='LTXXXx',
  10. accessKey='gUqXXXxxx'
  11. );
  12. create table result_table (
  13. rownum int,
  14. start_time VARCHAR,
  15. IP VARCHAR,
  16. cc BIGINT,
  17. PRIMARY KEY (start_time, IP)
  18. ) with (
  19. type = 'rds',
  20. url='jdbc:mysql://rm-bp1gz4k202t8XXXXXXs.com:3306/blink_test',
  21. tableName='blink_rds_test',
  22. userName='xxx',
  23. password='xxx'
  24. );
  25. INSERT INTO result_table
  26. SELECT rownum,start_time,IP,cc
  27. FROM (
  28. SELECT *,
  29. ROW_NUMBER() OVER (PARTITION BY start_time ORDER BY cc desc) AS rownum
  30. FROM (
  31. SELECT SUBSTRING(`TIME`,1,2) AS start_time,--可以根据真实时间取相应的数值,这里取得是测试数据
  32. COUNT(IP) AS cc,
  33. IP
  34. FROM source_table
  35. GROUP BY SUBSTRING(`TIME`,1,2), IP
  36. )a
  37. )
  38. WHERE rownum <= 3 --可以根据真实top值取相应的数值,这里取得是测试数据

测试结果

rownum(int) start_time(varchar) ip(varchar) cc(bigint)
1 10 192.168.1.3 6
2 10 192.168.1.2 4
3 10 192.168.1.1 2

无排名优化

  • 根据TopN的语法,rownum字段会作为结果表的主键字段之一写入结果表。但是这可能导致数据膨胀的问题。例如,收到一条原排名9的更新数据,更新后排名上升到1,那么从1到9的数据排名都发生变化了。需要将这些数据作为更新都写入结果表。这样就产生了数据膨胀,可能导致结果表因为收到了太多的数据而降低更新速度。
  • 优化方法
    结果表中不保存 rownum,最终的 rownum 由前端计算。因为TopN的数据量通常不会很大,前端排序100个数据很快。当收到一条原排名9,更新后排名上升到1的数据,也只需要发送这一条数据,而不用把排名1到9的数据全发送下去。这种优化能够提升结果表的更新速度。

无排名优化语法


  1. SELECT col1, col2, col3
  2. FROM (
  3. SELECT col1, col2, col3
  4. ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]]
  5. ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
  6. FROM table_name)
  7. WHERE rownum <= N [AND conditions]

语法与上文类似,只是在外层查询中将 rownum 字段裁剪掉即可。

注意:在无 rownum 的场景,结果表主键的定义一定要特别注意。如果定义有误,会直接导致TopN结果的不正确。无 rownum 场景的主键应为 TopN 上游 group by 节点的 keys 列表。

无排名优化示例

本示例来自某视频行业用户的真实业务(精简后)。用户每个视频在分发时会产生大量流量,依据视频产生的流量可以分析出最热门的视频。如下示例用于统计出每分钟流量最大的 top5 的视频。

测试语句


  1. --从SLS读取数据原始存储表
  2. CREATE TABLE sls_cdnlog_stream (
  3. vid VARCHAR, -- video id
  4. rowtime Timestamp, -- 观看视频发生的时间
  5. response_size BIGINT, -- 观看产生的流量
  6. WATERMARK FOR rowtime as withOffset(rowtime, 0)
  7. ) WITH (
  8. type='sls',
  9. ...
  10. );
  11. --1分钟窗口统计vid带宽数
  12. CREATE VIEW cdnvid_group_view AS
  13. SELECT vid,
  14. TUMBLE_START(rowtime, INTERVAL '1' MINUTE) AS start_time,
  15. SUM(response_size) AS rss
  16. FROM sls_cdnlog_stream
  17. GROUP BY vid, TUMBLE(rowtime, INTERVAL '1' MINUTE);
  18. --存储表
  19. CREATE TABLE hbase_out_cdnvidtoplog (
  20. vid VARCHAR,
  21. rss BIGINT,
  22. start_time VARCHAR,
  23. -- 注意结果表中不存储 rownum 字段
  24. -- 特别注意该主键的定义,为 TopN 上游 group by keys
  25. PRIMARY KEY(start_time, vid)
  26. ) WITH (
  27. type='RDS',
  28. ...
  29. );
  30. -- 统计每分钟 top5 消耗流量的 vid,并输出
  31. INSERT INTO hbase_out_cdnvidtoplog
  32. -- 注意次外层查询,不选出 rownum 字段
  33. SELECT vid, rss, start_time FROM
  34. (
  35. SELECT
  36. vid, start_time, rss,
  37. ROW_NUMBER() OVER (PARTITION BY start_time ORDER BY rss DESC) as rownum,
  38. FROM
  39. cdnvid_group_view
  40. )
  41. WHERE rownum <= 5;

测试数据

vid(VARCHAR) rowtime(Timestamp) response_size(BIGINT)
10000 2017-12-18 15:00:10 2000
10000 2017-12-18 15:00:15 4000
10000 2017-12-18 15:00:20 3000
10001 2017-12-18 15:00:20 3000
10002 2017-12-18 15:00:20 4000
10003 2017-12-18 15:00:20 1000
10004 2017-12-18 15:00:30 1000
10005 2017-12-18 15:00:30 5000
10006 2017-12-18 15:00:40 6000
10007 2017-12-18 15:00:50 8000

测试结果

start_time(VARCHAR) vid(VARCHAR) rss(BIGINT)
2017-12-18 15:00:00 10000 9000
2017-12-18 15:00:00 10007 8000
2017-12-18 15:00:00 10006 6000
2017-12-18 15:00:00 10005 5000
2017-12-18 15:00:00 10002 4000
本文转自实时计算——TopN语句