且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

创建日志服务(Log Service)源表

更新时间:2022-09-14 18:27:15

什么是日志服务

日志服务(Log Service),简称LOG,原SLS。是针对实时数据的一站式服务,在阿里巴巴集团经历大量大数据场景锤炼而成。无需开发就能快捷完成数据采集、消费、投递以及查询分析等功能,帮助提升运维、运营效率,建立DT时代海量日志处理能力。
日志服务本身是流数据存储,实时计算 Flink能将其作为流式数据输入。对于日志服务而言,数据格式类似JSON,示例如下。


  1. {
  2. "a": 1000,
  3. "b": 1234,
  4. "c": "li"
  5. }

对于实时计算而言,我们需要定义的DDL如下(sls即Log Service)。


  1. create table sls_stream(
  2. a int,
  3. b int,
  4. c varchar
  5. ) with (
  6. type ='sls',
  7. endPoint ='http://cXXXXXXXXyuncs.com',
  8. accessId ='0iXXXXXXXAs',
  9. accessKey ='yF60EXXXXXXXPJ2zhCfHU',
  10. startTime = '2017-07-05 00:00:00',
  11. project ='ali-XXXXX-streamtest',
  12. logStore ='strXXXtest',
  13. consumerGroup ='consXXXXroupTest1'
  14. );

属性字段

目前默认支持三个属性字段的获取,也支持其他自定义写入的字段。

字段名 注释说明
__source__ 消息源
__topic__ 消息主题
__timestamp__ 日志时间

举例

通过 HEADER 关键字获取属性字段。

测试数据


  1. __topic__: ens_altar_flow
  2. result: {"MsgID":"ems0a","Version":"0.0.1"}

测试代码


  1. CREATE TABLE sls_log (
  2. __topic__ varchar HEADER,
  3. result varchar
  4. )
  5. WITH
  6. (
  7. type ='sls'
  8. );
  9. CREATE TABLE sls_out (
  10. name varchar,
  11. MsgID varchar,
  12. Version varchar
  13. )
  14. WITH
  15. (
  16. type ='RDS'
  17. );
  18. INSERT INTO sls_out
  19. SELECT
  20. __topic__,
  21. JSON_VALUE(result,'$.MsgID'),
  22. JSON_VALUE(result,'$.Version')
  23. FROM
  24. sls_log

测试结果

name(VARCHAT) MsgID(VARCHAT) Version(VARCHAT)
ens_altar_flow ems0a 0.0.1

WITH参数

参数 注释说明 备注
endPoint 消费端点信息 日志服务的ENDPOINT地址
accessId sls读取的accessKey N/A
accessKey sls读取的密钥 N/A
project 读取的sls项目 N/A
logStore project下的具体的logStore N/A
consumerGroup 消费组名 用户可以自定义消费组名(没有固定格式)
startTime 消费日志开始的时间点 N/A
heartBeatIntervalMills 可选,消费客户端心跳间隔时间 默认为10s
maxRetryTimes 读取最大尝试次数 可选,默认为5
batchGetSize 单次读取logGroup条数 可选,默认为10
lengthCheck 单行字段条数检查策略 可选,默认为SKIP,其它可选值为EXCEPTION、PAD。SKIP:字段数目不符合时跳过 。EXCEPTION:字段数目不符合时抛出异常。PAD:按顺序填充,不存在的置为null。
columnErrorDebug 是否打开调试开关,如果打开,会把解析异常的log打印出来 可选,默认为false

注意:

  • SLS暂不支持MAP类型的数据。
  • 字段顺序支持无序(建议字段顺序和表中定义一致)。
  • 输入数据源为Json形式时,注意定义分隔符,并且需要采用内置函数分析JSON_VALUE,否则就会解析失败。报错如下:
    
    
    1. 2017-12-25 15:24:43,467 WARN [Topology-0 (1/1)] com.alibaba.blink.streaming.connectors.common.source.parse.DefaultSourceCollector - Field missing error, table column number: 3, data column number: 3, data filed number: 1, data: [{"lg_order_code":"LP00000005","activity_code":"TEST_CODE1","occur_time":"2017-12-10 00:00:01"}]
  • batchGetSize设置不能超过1000,否则会报错
  • batchGetSize指明的是logGroup获取的数量,如果单条logItem的大小和 batchGetSize都很大,很有可能会导致频繁的GC,这种情况下该参数应注意调小。

类型映射

日志服务和实时计算字段类型对应关系,建议您使用该对应关系进行DDL声明:

日志服务字段类型 流计算字段类型
STRING VARCHAR
本文转自实时计算——创建日志服务(Log Service)源表