更新时间:2022-09-30 23:14:02
在实际业务使用中,需要经常实时做一些数据分析,包括实时PV和UV展示,实时销售数据,实时店铺UV以及实时推荐系统等,基于此类需求,Confluent+实时计算Flink版是一个高效的方案。
Confluent是基于Apache Kafka提供的企业级全托管流数据服务,由 Apache Kafka 的原始创建者构建,通过企业级功能扩展了 Kafka 的优势,同时消除了 Kafka管理或监控的负担。
实时计算Flink版是阿里云基于 Apache Flink 构建的企业级实时大数据计算商业产品。实时计算 Flink 由 Apache Flink 创始团队官方出品,拥有全球统一商业化品牌,提供全系列产品矩阵,完全兼容开源 Flink API,并充分基于强大的阿里云平台提供云原生的 Flink 商业增值能力。
Add a topic
按钮,创建一个名为confluent-vvp-test
的topic,将partition设置为3
--用户累计消费结果表 CREATE TABLE consume ( appkey VARCHAR, serverid VARCHAR, servertime VARCHAR, roleid VARCHAR, amount FLOAT, dt VARCHAR, primary key(appkey,dt) );
create TEMPORARY table kafka_game_consume_source( appkey STRING, servertime STRING, consumenum DOUBLE, roleid STRING, serverid STRING ) with ( 'connector' = 'kafka', 'topic' = 'game_consume_log', 'properties.bootstrap.servers' = 'kafka.confluent.svc.cluster.local.xxx:9071[xxx可以找开发同学查看]', 'properties.group.id' = 'gamegroup', 'format' = 'json', 'properties.ssl.truststore.location' = '/flink/usrlib/truststore.jks', 'properties.ssl.truststore.password' = '[your truststore password]', 'properties.security.protocol'='SASL_SSL', 'properties.sasl.mechanism'='PLAIN', 'properties.sasl.jaas.config'='org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="xxx[集群的用户]" password="xxx[相应的密码]";' ); -- 创建累计消费hologres sink表 CREATE TEMPORARY TABLE consume( appkey STRING, serverid STRING, servertime STRING, roleid STRING, amount DOUBLE, dt STRING, PRIMARY KEY (appkey,dt) NOT ENFORCED )WITH ( 'connector' = 'hologres', 'dbname' = 'mydb', 'endpoint' = 'hgprecn-cn-tl32gkaet006-cn-beijing-vpc.hologres.aliyuncs.com:80', 'password' = '[your appkey secret]', 'tablename' = 'consume', 'username' = '[your app key]', 'mutateType' = 'insertorreplace' ); --{"appkey":"appkey1","servertime":"2020-09-30 14:10:36","consumenum":33.8,"roleid":"roleid1","serverid":"1"} --{"appkey":"appkey2","servertime":"2020-09-30 14:11:36","consumenum":30.8,"roleid":"roleid2","serverid":"2"} --{"appkey":"appkey1","servertime":"2020-09-30 14:13:36","consumenum":31.8,"roleid":"roleid1","serverid":"1"} --{"appkey":"appkey2","servertime":"2020-09-30 14:20:36","consumenum":33.8,"roleid":"roleid2","serverid":"2"} --{"appkey":"appkey1","servertime":"2020-09-30 14:30:36","consumenum":73.8,"roleid":"roleid1","serverid":"1"} -- 计算每个用户累积消费金额 insert into consume SELECT appkey,LAST_VALUE(serverid) as serverid,LAST_VALUE(servertime) as servertime,LAST_VALUE(roleid) as roleid, sum(consumenum) as amount, substring(servertime,1,10) as dt FROM kafka_game_consume_source GROUP BY appkey,substring(servertime,1,10) having sum(consumenum) > 0;
{"appkey":"appkey1","servertime":"2020-09-30 14:10:36","consumenum":33.8,"roleid":"roleid1","serverid":"1"} {"appkey":"appkey2","servertime":"2020-09-30 14:11:36","consumenum":30.8,"roleid":"roleid2","serverid":"2"} {"appkey":"appkey1","servertime":"2020-09-30 14:13:36","consumenum":31.8,"roleid":"roleid1","serverid":"1"} {"appkey":"appkey2","servertime":"2020-09-30 14:20:36","consumenum":33.8,"roleid":"roleid2","serverid":"2"} {"appkey":"appkey1","servertime":"2020-09-30 14:30:36","consumenum":73.8,"roleid":"roleid1","serverid":"1"}
Add a topic
按钮,创建一个名为pv-uv
的topic,将partition设置为33.【vswitch IP段】可在 flink的工作空间详情中查询
CREATE TABLE result_cps_total_summary_pvuv_min( summary_date date NOT NULL COMMENT '统计日期', summary_min varchar(255) COMMENT '统计分钟', pv bigint COMMENT 'pv', uv bigint COMMENT 'uv', currenttime timestamp COMMENT '当前时间', primary key(summary_date,summary_min) )
1.【[VVP控制台】新建文件
--数据的订单源表 CREATE TABLE source_ods_fact_log_track_action ( account_id VARCHAR, --用户ID client_ip VARCHAR, --客户端IP client_info VARCHAR, --设备机型信息 platform VARCHAR, --系统版本信息 imei VARCHAR, --设备唯一标识 `version` VARCHAR, --版本号 `action` VARCHAR, --页面跳转描述 gpm VARCHAR, --埋点链路 c_time VARCHAR, --请求时间 target_type VARCHAR, --目标类型 target_id VARCHAR, --目标ID udata VARCHAR, --扩展信息,JSON格式 session_id VARCHAR, --会话ID product_id_chain VARCHAR, --商品ID串 cart_product_id_chain VARCHAR, --加购商品ID tag VARCHAR, --特殊标记 `position` VARCHAR, --位置信息 network VARCHAR, --网络使用情况 p_dt VARCHAR, --时间分区天 p_platform VARCHAR --系统版本信息 ) WITH ( 'connector' = 'kafka', 'topic' = 'game_consume_log', 'properties.bootstrap.servers' = 'kafka.confluent.svc.cluster.local.c79f69095bc5d4d98b01136fe43e31b93:9071', 'properties.group.id' = 'gamegroup', 'format' = 'json', 'properties.ssl.truststore.location' = '/flink/usrlib/truststore.jks', 'properties.ssl.truststore.password' = '【your password】', 'properties.security.protocol'='SASL_SSL', 'properties.sasl.mechanism'='PLAIN', 'properties.sasl.jaas.config'='org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="【your user name】" password="【your password】";' ); --{"account_id":"id1","client_ip":"172.11.1.1","client_info":"mi10","p_dt":"2021-12-01","c_time":"2021-12-01 19:10:00"} CREATE TABLE result_cps_total_summary_pvuv_min ( summary_date date, --统计日期 summary_min varchar, --统计分钟 pv bigint, --点击量 uv bigint, --一天内同个访客多次访问仅计算一个UV currenttime timestamp, --当前时间 primary key (summary_date, summary_min) ) WITH ( type = 'rds', url = 'url = 'jdbc:mysql://rm-【your rds clusterId】.mysql.rds.aliyuncs.com:3306/confluent_vvp',', tableName = 'result_cps_total_summary_pvuv_min', userName = 'flink_confluent_vip', password = '【your rds password】' ); CREATE VIEW result_cps_total_summary_pvuv_min_01 AS select cast (p_dt as date) as summary_date --时间分区 , count (client_ip) as pv --客户端的IP , count (distinct client_ip) as uv --客户端去重 , cast (max (c_time) as TIMESTAMP) as c_time --请求的时间 from source_ods_fact_log_track_action group by p_dt; INSERT into result_cps_total_summary_pvuv_min select a.summary_date, --时间分区 cast (DATE_FORMAT (c_time, 'HH:mm') as varchar) as summary_min, --取出小时分钟级别的时间 a.pv, a.uv, CURRENT_TIMESTAMP as currenttime --当前时间 from result_cps_total_summary_pvuv_min_01 AS a;
{"account_id":"id1","client_ip":"72.11.1.111","client_info":"mi10","p_dt":"2021-12-01","c_time":"2021-12-01 19:11:00"} {"account_id":"id2","client_ip":"72.11.1.112","client_info":"mi10","p_dt":"2021-12-01","c_time":"2021-12-01 19:12:00"} {"account_id":"id3","client_ip":"72.11.1.113","client_info":"mi10","p_dt":"2021-12-01","c_time":"2021-12-01 19:13:00"}
可以看出rds数据表的pv和uv会随着发送的消息数据,动态的变化,同时还可以通过【数据可视化】来查看相应的图表信息。
pv图表展示:
uv图表展示: