什么是日志服务
日志服务(Log Service),简称LOG,原SLS。是针对实时数据的一站式服务,在阿里巴巴集团经历大量大数据场景锤炼而成。无需开发就能快捷完成数据采集、消费、投递以及查询分析等功能,帮助提升运维、运营效率,建立DT时代海量日志处理能力。
日志服务本身是流数据存储,实时计算 Flink能将其作为流式数据输入。对于日志服务而言,数据格式类似JSON,示例如下。
{
"a": 1000,
"b": 1234,
"c": "li"
}
对于实时计算而言,我们需要定义的DDL如下(sls即Log Service)。
create table sls_stream(
a int,
b int,
c varchar
) with (
type ='sls',
endPoint ='http://cXXXXXXXXyuncs.com',
accessId ='0iXXXXXXXAs',
accessKey ='yF60EXXXXXXXPJ2zhCfHU',
startTime = '2017-07-05 00:00:00',
project ='ali-XXXXX-streamtest',
logStore ='strXXXtest',
consumerGroup ='consXXXXroupTest1'
);
属性字段
目前默认支持三个属性字段的获取,也支持其他自定义写入的字段。
字段名 |
注释说明 |
__source__ |
消息源 |
__topic__ |
消息主题 |
__timestamp__ |
日志时间 |
举例
通过 HEADER
关键字获取属性字段。
测试数据
__topic__: ens_altar_flow
result: {"MsgID":"ems0a","Version":"0.0.1"}
测试代码
CREATE TABLE sls_log (
__topic__ varchar HEADER,
result varchar
)
WITH
(
type ='sls'
);
CREATE TABLE sls_out (
name varchar,
MsgID varchar,
Version varchar
)
WITH
(
type ='RDS'
);
INSERT INTO sls_out
SELECT
__topic__,
JSON_VALUE(result,'$.MsgID'),
JSON_VALUE(result,'$.Version')
FROM
sls_log
测试结果
name(VARCHAT) |
MsgID(VARCHAT) |
Version(VARCHAT) |
ens_altar_flow |
ems0a |
0.0.1 |
WITH参数
参数 |
注释说明 |
备注 |
endPoint |
消费端点信息 |
日志服务的ENDPOINT地址 |
accessId |
sls读取的accessKey |
N/A |
accessKey |
sls读取的密钥 |
N/A |
project |
读取的sls项目 |
N/A |
logStore |
project下的具体的logStore |
N/A |
consumerGroup |
消费组名 |
用户可以自定义消费组名(没有固定格式) |
startTime |
消费日志开始的时间点 |
N/A |
heartBeatIntervalMills |
可选,消费客户端心跳间隔时间 |
默认为10s |
maxRetryTimes |
读取最大尝试次数 |
可选,默认为5 |
batchGetSize |
单次读取logGroup条数 |
可选,默认为10 |
lengthCheck |
单行字段条数检查策略 |
可选,默认为SKIP,其它可选值为EXCEPTION、PAD。SKIP:字段数目不符合时跳过 。EXCEPTION:字段数目不符合时抛出异常。PAD:按顺序填充,不存在的置为null。 |
columnErrorDebug |
是否打开调试开关,如果打开,会把解析异常的log打印出来 |
可选,默认为false |
注意:
- SLS暂不支持MAP类型的数据。
- 字段顺序支持无序(建议字段顺序和表中定义一致)。
- 输入数据源为Json形式时,注意定义分隔符,并且需要采用内置函数分析JSON_VALUE,否则就会解析失败。报错如下:
2017-12-25 15:24:43,467 WARN [Topology-0 (1/1)] com.alibaba.blink.streaming.connectors.common.source.parse.DefaultSourceCollector - Field missing error, table column number: 3, data column number: 3, data filed number: 1, data: [{"lg_order_code":"LP00000005","activity_code":"TEST_CODE1","occur_time":"2017-12-10 00:00:01"}]
- batchGetSize设置不能超过1000,否则会报错
- batchGetSize指明的是logGroup获取的数量,如果单条logItem的大小和 batchGetSize都很大,很有可能会导致频繁的GC,这种情况下该参数应注意调小。
类型映射
日志服务和实时计算字段类型对应关系,建议您使用该对应关系进行DDL声明:
日志服务字段类型 |
流计算字段类型 |
STRING |
VARCHAR |