且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

获取在ElasticSearch中进行了特定购买次数的用户数

更新时间:2023-12-01 11:57:40

这是我设法完成所需工作的方式.假设我们有一个名为 visitor_carts 的索引,其中包含诸如此类的文档:

Here is how I have managed to accomplish what I need. Let's say we have an index called visitor_carts with documents like that:

{
    "visitor_id" : 1,
    "total_value" : 111,
    "total_products" : 2
}

{
    "visitor_id" : 1,
    "total_value" : 199.99,
    "total_products" : 1
}

{
    "visitor_id" : 1,
    "total_value" : 890.56,
    "total_products" : 2
}

{
    "visitor_id" : 2,
    "total_value" : 223.56,
    "total_products" : 2
}

{
    "visitor_id" : 3,
    "total_value" : 4.56,
    "total_products" : 2
}

有一个解决方案-称为脚本化指标聚合.有了它,您几乎可以构建任何想要的东西,缺点是您必须熟悉用法示例非常有用.无论如何,这里是可行的解决方案:

There is a solution - it is called scripted metric aggregation. With that you can build pretty much anything you want, the downside is that you have to familiarize yourself with painless scripting. The documentation in that regard is quite hard to understand, on top of that it seems specific versions are not very well maintained as what is in painless documentation does not work with my version of ElasticSearch 6.5 (even though it should according to said documentation). So a word of warning - if it does not work keep looking for more examples. I have found usage examples here very useful. Anyways here is the working solution:

POST visitor_carts/_search
{
  "query" : {
    "match_all" : {}
  },
  "aggs": {
    "purchases": {
      "scripted_metric": {
        "init_script" : "state['visitorPurchases'] = [:]",
        "map_script" : "if (state['visitorPurchases'].containsKey(doc['visitor_id'].value)) {state['visitorPurchases'][doc['visitor_id'].value]++} else {state['visitorPurchases'][doc['visitor_id'].value] = 1}",
        "combine_script": "def combine = [:]; for (visitor in state['visitorPurchases'].entrySet()) {if (combine.containsKey(visitor.getValue().toString())) {combine[visitor.getValue().toString()]++} else {combine[visitor.getValue().toString()] = 1}} return combine",
        "reduce_script": "def reduce = [:]; for (shard in states) { for (count in shard.entrySet()) {if (reduce.containsKey(count.getKey())) {reduce[count.getKey()] += count.getValue()} else {reduce[count.getKey()] = count.getValue()}}} return reduce"
      }
    }
  }
}

map_script 中,它会查询查询匹配的所有文档,并计算每个 visitor_id 的出现次数.然后,在 combine_script 中,它采用先前准备的 map_script 并按发生次数对结果进行分组.由于 combine_script 每个分片都可以工作,我们需要让 reduce_script 汇集每个分片的所有结果集,并像这样返回它:

In map_script it looks through all the documents matched by the query and counts occurences of every visitor_id. Then in combine_script it takes what map_script prepared earlier and groups the result by occurence count. As combine_script works per shard we need to have the reduce_script bring together all results sets from every shard and return it like:

{
  "took" : 1,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 5,
    "max_score" : 0.0,
    "hits" : [ ]
  },
  "aggregations" : {
    "purchases" : {
      "value" : {
        "1" : 2,
        "3" : 1
      }
    }
  }
}

请参阅在此处使用示例来构造所需的内容.

Refer to the scripted metric aggregation documentation to find out what each of the script types do and then follow the usage examples here to construct what you need.

我对ElasticSearch实在是太新鲜了,以至于不能说出该解决方案的效率.它可以与我测试过的数千个文档一起很好地工作,但是我不知道它如何处理数百万/数十亿条记录.如果有人想测试一下-请成为我的客人:)

I am too fresh with ElasticSearch to be able to tell how efficient that solution is. It works well with a few thousands of documents I have tested it against but I have no idea how it will behave with millions/billions of records. If anyone cares to test this - be my guest :)