且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

创建数据总线(DataHub)源表

更新时间:2022-09-14 18:26:39

什么是数据总线(DataHub)

DataHub作为一个流式数据总线,为阿里云数加平台提供了大数据的入口服务,共同构建一站式的数据处理平台。实时计算 Flink通常使用DataHub作为流式数据存储头和输出目的端。同时,上游众多流式数据,包括DTS、IOT等均选择DataHub作为大数据平台的数据入口。DataHub本身是流数据存储,实时计算只能将其作为流式数据输入。示例如下:


  1. create table datahub_stream(
  2. name varchar,
  3. age BIGINT,
  4. birthday BIGINT
  5. ) with (
  6. type='datahub',
  7. endPoint='http://dXXXXXXXX.com',
  8. project='blink_datahub_test',
  9. topic='test_topic_1',
  10. accessId='0i70XXXXXXXXs',
  11. accessKey='yF60EwXXXXXXXXXnvQPJ2zhCfHU',
  12. startTime='2017-07-21 00:00:00'
  13. );

属性字段

Flink SQL支持获取DataHub的属性字段。能够记录每条信息写入DataHub的系统时间。

如图所示:创建数据总线(DataHub)源表

字段名 注释说明
timestamp 每条记录入datahub的systemtime

示例

通过 HEADER 关键字获取属性字段。

例如,属性字段并不存在于DATAHUB的字段声明里。想获取每条记录入datahub的systemtime,可以将timestamp作为字段名,在后面加上HEADER就可以取出想要的属性值。

测试数据

name(VARCHAT) MsgID(VARCHAT)
ens_altar_flow ems0a

测试案例


  1. CREATE TABLE datahub_log (
  2. `timestamp` varchar HEADER,
  3. result varchar
  4. MsgID varchar
  5. )
  6. WITH
  7. (
  8. type ='datahub'
  9. );
  10. CREATE TABLE RDS_out (
  11. `timestamp` varchar,
  12. MsgID varchar,
  13. Version varchar
  14. )
  15. WITH
  16. (
  17. type ='RDS'
  18. );
  19. INSERT INTO RDS_out
  20. SELECT
  21. `timestamp`,
  22. result,
  23. MsgID
  24. FROM
  25. datahub_log;

测试结果

TIME(VARCHAT) MsgID(VARCHAT) Version(VARCHAT)
1522652455625 ems0a 0.0.1

WITH参数

目前只支持tuple模式的topic

参数 注释说明 备注
endPoint 消费端点信息 DATAHUB的Endpoint地址
accessId 读取的accessId
accessKey 读取的密钥
project 读取的项目
topic project下的具体的topic
startTime 启动位点的时间 格式为”yyyy-MM-dd hh:mm:ss”
maxRetryTimes 读取最大尝试次数 可选,默认为20。
retryIntervalMs 重试间隔 可选,默认为1000。
batchReadSize 单次读取条数 可选,默认为10,可设置的最大值为1000。
lengthCheck 单行字段条数检查策略 可选,默认为SKIP,其它可选值为EXCEPTION、PAD。SKIP:字段数目不符合时跳过 。EXCEPTION:字段数目不符合时抛出异常。PAD:按顺序填充,不存在的置为null。
columnErrorDebug 是否打开调试开关,如果打开,会把解析异常的log打印出来 可选,默认为false。

类型映射

DataHub和实时计算字段类型对应关系如下,建议使用该对应关系时进行DDL声明:

DataHub字段类型 实时计算字段类型
BIGINT BIGINT
STRING VARCHAR
DOUBLE DOUBLE
TIMESTAMP BIGINT
BOOLEAN BOOLEAN
DECIMAL DECIMAL

注意:DataHub的TIMESTAMP是精确到微妙级别的,在Unix时间戳里是16位的。而实时计算定义的TIMESTAMP是精确到毫秒级别的,在Unix时间戳里是13位的所以建议大家使用BIGINT来映射。如果一定是要用TIMESTAMP建议使用计算列来做转换。


本文转自实时计算——创建数据总线(DataHub)源表