更新时间:2022-02-17 06:53:10
摘要: Flink入坑指南系列文章,从实际例子入手,一步步引导用户零基础入门实时计算/Flink,并成长为使用Flink的高阶用户。本文属个人原创,仅做技术交流之用,笔者才疏学浅,如有错误,欢迎指正。转载请注明出处,侵权必究。
Flink支持三种接口,也就是三种写作业的方式:
不会介绍实时计算/Flink SQL的语法细节,关于SQL语法或各内置函数的用法,请参考文档:帮助手册
有志于了解FlinkSQL实现原理或研究Flink代码的同学,可以参考《Apache Flink 漫谈系列 - SQL概览》
接上一章内容,本章计算一个指标:
ctime | category_id | shop_id | item_id | price |
---|---|---|---|---|
2018-12-04 15:44:54 | cat_01 | shop_01 | item_01 | 10 |
2018-12-04 15:45:46 | cat_02 | shop_02 | item_02 | 11.1 |
2018-12-04 15:46:11 | cat_01 | shop_03 | item_03 | 12.4 |
源表:
-- 源表DDL
create table src(
ctime timestamp, -- 交易时间戳
category_id varchar, -- 类目id
shop_id varchar, -- 店铺id
item_id varchar, -- 商品id
price double -- 价格
)
-- 结果表DDL
create table sink(
cdate date, -- 日期
gmv_daily double -- 从零点开始,每天的全网成交金额
)
一般批的写法:
SELECT
date_format(ctime, '%Y%m%d') as cdate, -- 将数据从时间戳格式(2018-12-04 15:44:54),转换为date格式(20181204)
SUM(price) AS gmv_daily
FROM src
GROUP BY date_format(ctime, '%Y%m%d') ; --按照天做聚合
结果:
cdate | gmv_daily |
---|---|
20181204 | 33.5 |
特点:
SELECT
date_format(ctime, '%Y%m%d') as cdate, -- 将数据从时间戳格式(2018-12-04 15:44:54),转换为date格式(20181204)
SUM(price) AS gmv_daily
FROM src
GROUP BY date_format(ctime, '%Y%m%d') ; --按照天做聚合
特点:
cdate | gmv_daily |
---|---|
20181204 | 10.0 |
20181204 | 21.1 |
20181204 | 33.5 |
如果把MySQL结果表中的cdate字段作为主键,那么每来一条数据,这个Flink作业都会输出一个值,三条数据的主键相同,因此会覆盖之前的结果,等三条数据都经过Flink计算后,得到的结果如下:
cdate | gmv_daily |
---|---|
20181204 | 33.5 |
这个例子中,批和流的SQL相同,从最终结果看也相同。但是批引擎(比如MySQL/Hive等)的执行模式,和流引擎(如Flink)是完全不同的。这就导致同一个SQL在处理数据的行为上,会有很多区别。如果要深入使用Flink SQL,并且保证结果的正确性,成为Flink SQL调优专家,就需要对Flink底层实现有一定的了解。接下来每一章的例子之后,都会介绍一下本章所用的基础原理。但不会讲实现细节,需要了解实现细节的同学,可以follow flink源代码。
从直观上看,批和流SQL行为非常不同:
该SQL中有两个关键操作:
SUM:求和操作,在批SQL和流SQL中,SUM求和操作的语义上相同的,都是对每个分组的某个字段,做求和操作。但是批SQL和流SQL的实现方式是不同的:
Flink SQL持续计算过程中,数据源源不断流入,以本文中例子来看,三条数据先后进入Flink,Flink中需要按cdate做一个全局group by,然后对每个cdate中所有数据的price做一个聚合运算(SUM),过程如下:
这就产生了个问题:每条数据的SUM计算,都要依赖上一条数据的计算结果。Flink在计算的时候,会保留这些中间结果么?答案是:会保存。而这些中间结果,就是一个作业的状态(state)的一部分。
关于state的几个关键问题:
__state过期规则是什么?__state过期规则
ctime | category_id | shop_id | item_id | price | ptime |
---|---|---|---|---|---|
2018-12-04 15:44:54 | cat_01 | shop_01 | item_01 | 10 | 2018-12-04 15:45:00 |
2018-12-04 15:45:46 | cat_02 | shop_02 | item_02 | 11.1 | 2018-12-04 15:45:10 |
2018-12-04 15:46:11 | cat_01 | shop_03 | item_03 | 12.4 | 2018-12-04 15:52:00 |
此时:
该例子中:
《Streaming System 第一章:Streaming 101》
《Streaming System 第二章:The What- Where- When- and How of Data Processing》
上述SQL中,每来一条数据就要就要计算一次,在输入数量大的情况下,容易产生性能瓶颈。每来一条数据,后端都会read和write一次state,发生序列化和反序列化操作,甚至是磁盘的 I/O 操作。对复杂场景,比如JOIN/TopN等,因此状态的相关操作通常都会成为整个任务的性能瓶颈。
如何避免这个问题呢?使用microbatch策略。microbatch顾名思义,就是攒批。不是来一条处理一条,而是攒一批再处理。相关配置如下:
#攒批的间隔时间,使用 microbatch 策略时需要加上该配置,且建议和 blink.miniBatch.allowLatencyMs 保持一致
blink.microBatch.allowLatencyMs=5000
# 使用 microbatch 时需要保留以下两个 minibatch 配置
blink.miniBatch.allowLatencyMs=5000
# 防止OOM,每个批次最多缓存多少条数据
blink.miniBatch.size=20000。
持续查询
状态(state):
流计算中的时间域概念 - Process Time vs. Event Time