且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

使用ClickHouse实现,累计用户计算模型

更新时间:2022-08-13 17:15:17

问题描述:根据用户标识和历史库的匹配结果,识别是否是新增用户,单位:天

要求:历史库每天累加更新,要考录用户历史数据库的幂等性及回补数据策略

输出:

  • 用户pushid
  • pushid对应的uid(如果当天没有没有登录就没有对应的pushid则从历史库中匹配)
  • pushid当天和uid是否有对应关系
  • 用户新增时间
  • 用户历史所有投资次数
  • 当天用户投资次数
  • 用户每次投资时间(rechargeTime)

 

说明:

用户标识有两个 pushid、uid,pushid表示用户的注册id,登录的时候才会存在,uid是用户访问的cookie(会频繁变化)。

因此在业务中要关联两者之间的关系。

 

创建历史库:

CREATE TABLE IF NOT EXISTS `db_name`.`table_name` (
    partition Date DEFAULT '1970-01-01',
    pushid String DEFAULT '',
    opTime DateTime DEFAULT 0,
    rechargeTime DateTime DEFAULT 0, # rechargeTime如果不是默认值则表示用户发生投资时间
    appkey String DEFAULT '',
    uid String DEFAULT '',
    ver UInt64 DEFAULT 0
)
ENGINE = ReplacingMergeTree(partition, (pushid, rechargeTime), 8192, ver)

利用ReplacingMergeTree实现数据幂等性,当重复入库数据时会去除重复项,保证数据执行多次时数据不重复。

ver表示版本号,当数据重复时,会以最大的版本号为准,版本号可以是一个递增的数字,业务中数据的版本号是插入的时的时间戳。

*其中:(pushid, rechargeTime)中的rechargeTime表示用户复投时间,如果业务中没有对用户发生某一个行为特殊要求则可以删除。

 

历史库更新代码(每天更新):

INSERT INTO db_name.table_name SELECT 
    partition, 
    pushid, 
    opTime, 
    rechargeTime, 
    appkey, 
    uid, 
    ver
FROM 
(
    SELECT 
        partition, 
        jhd_pushid AS pushid, 
        jhd_opTime AS opTime, 
        jhd_opTime AS rechargeTime, 
        jhd_datatype AS appkey, 
        jhd_userkey AS uid, 
        CAST(835664 AS UInt64) AS ver
    FROM ncf_h5.userevent 
    WHERE (partition = toDate('2017-03-28')) AND (jhd_pushid != '') AND (jhd_opType = 'page') AND (visitParamExtractString(jhd_map, 'uri') LIKE '%/pay_result%')
    UNION ALL 
    SELECT 
        partition, 
        jhd_pushid AS pushid, 
        min(jhd_opTime) AS opTime, 
        toDateTime('1970-01-01 00:00:00') AS rechargeTime, 
        jhd_datatype AS appkey, 
        jhd_userkey AS uid, 
        CAST(835664 AS UInt64) AS ver
    FROM ncf_h5.userevent 
    WHERE (partition = toDate('2017-03-28')) AND (jhd_pushid != '')
    GROUP BY 
        jhd_datatype, 
        partition, 
        pushid, 
        jhd_userkey
    UNION ALL 
    SELECT 
        partition, 
        jhd_pushid AS pushid, 
        jhd_opTime AS opTime, 
        jhd_opTime AS rechargeTime, 
        jhd_datatype AS appkey, 
        jhd_userkey AS uid, 
        CAST(835664 AS UInt64) AS ver
    FROM ncf_ws.userevent 
    WHERE (partition = toDate('2017-03-28')) AND (jhd_pushid != '') AND (jhd_opType = 'page') AND (visitParamExtractString(jhd_map, 'uri') LIKE '%/success%')
    UNION ALL 
    SELECT 
        partition, 
        jhd_pushid AS pushid, 
        min(jhd_opTime) AS opTime, 
        toDateTime('1970-01-01 00:00:00') AS rechargeTime, 
        jhd_datatype AS appkey, 
        jhd_userkey AS uid, 
        CAST(835664 AS UInt64) AS ver
    FROM ncf_ws.userevent 
    WHERE (partition = toDate('2017-03-28')) AND (jhd_pushid != '')
    GROUP BY 
        jhd_datatype, 
        partition, 
        pushid, 
        jhd_userkey
)

数据导出代码:

数据格式:

pushid、是否当天登录、uid、新增时间、用户历史所有投资次数、当天用户投资次数、用户每次投资时间

 

SELECT 
    pushid, 
    1 AS isfind, 
    uids, 
    earliest, 
    recharge_n, 
    recharge_today, 
    recharge_arr
FROM 
(
    SELECT 
        pushid, 
        CAST(earliest AS String) AS earliest, 
        recharge_n, 
        recharge_today, 
        arrayMap(lambda(tuple(x), CAST(x AS String)), arrayFilter(lambda(tuple(x), x != '1970-01-01 00:00:00'), recharge_arr)) AS recharge_arr, 
        arrayFilter(lambda(tuple(x), x != ''), uids) AS uids
    FROM 
    (
        SELECT pushid, groupUniqArray(uid) AS uids
        FROM ncf_common.user_pushid 
        WHERE partition = toDate('2017-04-04')
        GROUP BY pushid
    ) 
    ANY LEFT JOIN 
    (
        SELECT 
            pushid, 
            min(opTime) AS earliest, 
            sumIf(1, rechargeTime != '1970-01-01 00:00:00') AS recharge_n, 
            sumIf(1, toDate(rechargeTime) = toDate('2017-04-04')) AS recharge_today, 
            groupArray(rechargeTime) AS recharge_arr
        FROM ncf_common.user_pushid 
        WHERE (partition <= toDate('2017-04-04')) AND (partition >= (toDate('2017-04-04') - 365))
        GROUP BY pushid
    ) USING (pushid)
) 
ARRAY JOIN uids
UNION ALL 
SELECT 
    pushid, 
    0 AS isfind, 
    uids, 
    CAST(earliest AS String) AS earliest, 
    recharge_n, 
    recharge_today, 
    arrayMap(lambda(tuple(x), CAST(x AS String)), arrayFilter(lambda(tuple(x), x != '1970-01-01 00:00:00'), recharge_arr)) AS recharge_arr
FROM 
(
    SELECT 
        pushid, 
        groupUniqArray(uid) AS uids, 
        min(opTime) AS earliest, 
        sumIf(1, rechargeTime != '1970-01-01 00:00:00') AS recharge_n, 
        sumIf(1, toDate(rechargeTime) = toDate('2017-04-04')) AS recharge_today, 
        arrayFilter(lambda(tuple(x), x != '1970-01-01 00:00:00'), groupArray(rechargeTime)) AS recharge_arr
    FROM 
    (
        SELECT 
            pushid, 
            uid, 
            opTime, 
            rechargeTime
        FROM 
        (
            SELECT 
                jhd_userkey AS uid, 
                groupUniqArray(jhd_pushid) AS pushids, 
                'ncf_ws' AS appkey
            FROM ncf_ws.userevent 
            WHERE partition = toDate('2017-04-04')
            GROUP BY jhd_userkey
            HAVING (length(pushids) = 1) AND has(pushids, '')
            UNION ALL 
            SELECT 
                jhd_userkey AS uid, 
                groupUniqArray(jhd_pushid) AS pushids, 
                'ncf_h5' AS appkey
            FROM ncf_h5.userevent 
            WHERE partition = toDate('2017-04-04')
            GROUP BY jhd_userkey
            HAVING (length(pushids) = 1) AND has(pushids, '')
        ) 
        ALL INNER JOIN 
        (
            SELECT 
                pushid, 
                uid, 
                opTime, 
                rechargeTime, 
                appkey
            FROM ncf_common.user_pushid 
            WHERE partition < toDate('2017-04-04')
        ) USING (uid)
    ) 
    GROUP BY pushid
) 
ARRAY JOIN uids