且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

数据源表概述

更新时间:2022-09-14 18:27:21

实时计算的源表是指流式数据存储。流式数据存储驱动实时计算的运行,因此每个实时计算子作业必须提供至少一个流式数据存储。

语法


  1. CREATE TABLE tableName
  2. (columnName dataType [, columnName dataType ]*)
  3. [ WITH (propertyName=propertyValue [, propertyName=propertyValue ]*) ];

示例


  1. create table datahub_stream(
  2. name varchar,
  3. age BIGINT,
  4. birthday BIGINT
  5. ) with (
  6. type='datahub',
  7. endPoint='http://dh-et2.aliyun-inc.com',
  8. project='blink_xxx',
  9. topic='test_xxx',
  10. accessId='0i70Rxxxxx',
  11. accessKey='yF60EwUxxxx',
  12. startTime='2017-07-21 00:00:00'
  13. );

Watermark定义

Watermark是一种衡量Event Time进展的机制,它是数据本身的一个隐藏属性。Watermark的定义是数据原表DDL定义的一部分。Flink提供如下语法定义:


  1. WATERMARK [watermarkName] FOR <rowtime_field> AS withOffset(<rowtime_field>, offset)
  • watermarkName 标识了这个 watermark 的名字,可选。
  • <rowtime_field> 必须是表中已定义的一列(当前仅支持为Timestamp类型),含义是基于该列生成 watermark,并且标识该列为 Event Time 列,可以在后续 query 中用来定义窗口。
  • withOffset 是目前提供的watermark的生成策略,是根据<rowtime_field> - offset生成watermark的值。withOffset的第一个参数必须是<rowtime_field>
  • offset 单位为毫秒,含义为watermark值与event time值的偏移量。

通常一条记录中的某个字段就代表了该记录的发生时间。例如,表中有个rowtime字段,类型为Timestamp,其中某个值为1501750584000(2017-08-03 08:56:24.000),如果您需要定义一个基于该rowtime列的watermark,且watermark策略为偏移4秒,需要如下定义。


  1. WATERMARK FOR rowtime AS withOffset(rowtime, 4000)

这条数据的watermark时间为 1501750584000 - 4000 = 1501750580000(2017-08-03 08:56:20.000)。这条数据中timestamp小于1501750580000(2017-08-03 08:56:20.000)的数据,都已经到达了。

计算列

概念

计算列是虚拟列,并非实际存储在表中。计算列的表达式可以使用其他列中的数据来计算其所属列的值,可以使用表达式、内置函数、或是自定义函数。灵活度与SELECT中的表达式一样。计算列在Flink中可以像普通字段一样被使用。

用途

目前watermark的rowtime列只支持Timestamp类型(未来会支持Long类型),watermark只能定义在源表DDL中,如果您的源表中没有 Timestamp类型的列,需要从其他类型的字段转换而来,可以使用计算列来转换。

语法


  1. <computed_column_definition> ::= column_name AS computed_column_expression

示例


  1. #如果datahub的TIME字段是微秒级别的(16位Unix时间戳),可以用计算列来转换。
  2. CREATE TABLE sls_stream(
  3. a INT,
  4. b BIGINT,
  5. TIME BIGINT,
  6. ts AS TO_TIMESTAMP(TIME/1000),
  7. WATERMARK FOR ts AS withOffset(ts, 1000)
  8. ) with (
  9. type = 'DATAHUB',
  10. ...
  11. );

如上示例中所示,源表数据中的字段TIME包含时间信息,为BIGINT类型。用计算列的功能将字段TIME转换成了Timestamp类型的ts字段,并将ts字段作为watermark的rowtime字段。

本文转自实时计算——数据源表概述