且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

如何从MongoDB的哈希图中获取平均值?

更新时间:2023-11-14 15:10:28

您可以在此处采用两种方法:

You could take two approaches here:

  1. 更改架构并使用 聚合框架 使用 $avg 运算符或
  2. 应用 Map-Reduce .
  1. Changing the schema and use the aggregation framework to get the average by using the $avg operator OR
  2. Apply Map-Reduce.

让我们看一下第一个选项.当前,由于values子文档中的动态键,该架构将无法使用聚合框架.有利于聚合框架的理想方案是将values字段设置为包含嵌入式键/值文档的数组,如下所示:

Let's look at the first option. Currently as it is, the schema will not make it possible to use the aggregation framework because of the dynamic keys in the values subdocument. The ideal schema that would favour the aggregation framework would have the values field be an array which contains embedded key/value documents like this:

/* 0 */
{
    "_id" : ObjectId("5559d66c9bbec0dd0344e4b0"),
    "timestamp" : "2015-05-16T18:12:00.000Z",
    "values" : [ 
        {
            "k" : "0",
            "v" : 26.17
        }, 
        {
            "k" : "1",
            "v" : 26.17
        }, 
        {
            "k" : "2",
            "v" : 26.17
        },
        ...         
        {
            "k" : "58",
            "v" : 24.71
        }, 
        {
            "k" : "59",
            "v" : 25.20
        }
    ]
}

对于MongoDB 3.6和更高版本,请使用聚合框架,通过使用 $avg 来计算平均值.

With MongoDB 3.6 and newer, use the aggregation framework to tranform the hashmaps to an array by using the $objectToArray operator then use $avg to calculate the average.

考虑运行以下聚合管道:

Consider running the following aggregate pipeline:

db.test.aggregate([
    {
        "$addFields": {
            "values": { "$objectToArray": "$values" }
        }
    }   
])

使用这种新模式进行武装后,您将需要更新集合,以通过迭代从聚合方法返回的游标并使用

Armed with this new schema, you would then need to update your collection to change the string values to int by iterating the cursor returned from the aggregate method and using bulkWrite as follows:

var bulkUpdateOps = [],
    cursor = db.test.aggregate([
        {
            "$addFields": {
                "values": { "$objectToArray": "$values" }
            }
        }   
    ]);

cursor.forEach(doc => {
    const { _id, values } = doc;
    let temp = values.map(item => {
        item.key = item.k;
        item.value = parseFloat(item.v) || 0;
        delete item.k;
        delete item.v;
        return item;
    });

    bulkUpdateOps.push({
        "updateOne": {
           "filter": { _id },
           "update": { "$set": { values: temp } },
           "upsert": true
        }
    });

    if (bulkUpdateOps.length === 1000) {
        db.test.bulkWrite(bulkUpdateOps);  
        bulkUpdateOps = [];                 
    }
}); 

if (bulkUpdateOps.length > 0) {
    db.test.bulkWrite(bulkUpdateOps);
}

如果您的MongoDB版本不支持 $objectToArray 运算符,然后使用MongoDB forEach() 的功能如下(假设您有一个测试集合):

If your MongoDB version does not support the $objectToArray operator in the aggregation framework, then to convert the current schema into the one above takes a bit of native JavaScript functions with the MongoDB find() cursor's forEach() function as follows (assuming you have a test collection):

var bulkUpdateOps = [],
    cursor = db.test.find();

cursor.forEach(doc => {
    const { _id, values } = doc;
    let temp =  Object.keys(values).map(k => {
        let obj = {};
        obj.key = k;
        obj.value = parseFloat(doc.values[k]) || 0;
        return obj;
    });

    bulkUpdateOps.push({
        "updateOne": {
           "filter": { _id },
           "update": { "$set": { values: temp } },
           "upsert": true
        }
    });

    if (bulkUpdateOps.length === 1000) {
        db.test.bulkWrite(bulkUpdateOps);  
        bulkUpdateOps = [];                 
    }
}); 

if (bulkUpdateOps.length > 0) {
    db.test.bulkWrite(bulkUpdateOps);
}

db.test.find().forEach(function (doc){
     var keys = Object.keys(doc.values),
        values = keys.map(function(k){
            var obj = {};
            obj.key = k;
            obj.value = parseFloat(doc.values[k]) || 0;
            return obj;
        });
    doc.values = values;
    db.test.save(doc);    
});

该集合现在将具有上述架构,并遵循聚合管道,该管道将为您提供一分钟的平均时间:

The collection will now have the above schema and thus follows the aggregation pipeline that will give you the average time in one minute:

db.test.aggregate([
    {
        "$fields": {
            "average": { "$avg": "$values.value" }
        }
    }    
])

或者对于MongoDB 3.0及更低版本

Or for MongoDB 3.0 and lower

db.test.aggregate([
    { "$unwind": "$values" },
    {
        "$group": {
            "_id": "$timestamp",
            "average": {
                "$avg": "$values.value"
            }
        }
    }    
])

对于上述文档,输出为:

For the above document, the output would be:

/* 0 */
{
    "result" : [ 
        {
            "_id" : "2015-05-16T18:12:00.000Z",
            "average" : 25.684
        }
    ],
    "ok" : 1
}


关于其他 Map-Reduce 选项,该操作背后的直觉是您将使用JavaScript进行必要的转换并计算最终平均值.您将需要定义三个函数:


As for the other Map-Reduce option, the intuition behind the operation is you would use JavaScript to make the necessary transformations and calculate the final average. You would need to define three functions:

地图

当您告诉Mongo MapReduce时,作为地图函数提供的函数将接收每个文档作为此参数.映射的目的是锻炼JavaScript中所需的任何逻辑,然后调用emit 0次或多次以产生可简化的值.

When you tell Mongo to MapReduce, the function you provide as the map function will receive each document as the this parameter. The purpose of the map is to exercise whatever logic you need in JavaScript and then call emit 0 or more times to produce a reducible value.

var map = function(){
    var obj = this.values;
    var keys = Object.keys(obj);
    var values = [];
    keys.forEach(function(key){  
        var val = parseFloat(obj[key]);
        var value = { count: 1, qty: val };  
        emit(this.timestamp, value);
    }); 
};

对于每个文档,您都需要发出一个键和一个值.该键是emit函数的第一个参数,表示要如何对值进行分组(在这种情况下,将按时间戳分组).要发出的第二个参数是值,在这种情况下,它是一个小对象,其中包含文档计数(总是1)和每个单独的值对象关键字的总值,即分钟内的每一秒.

For each document you need to emit a key and a value. The key is the first parameter to the emit function and represents how you want to group the values (in this case you will be grouping by the timestamp). The second parameter to emit is the value, which in this case is a little object containing the count of documents (always 1) and total value of each individual value object key i.e. for each second within the minute.

减少

接下来,您需要定义reduce函数,其中Mongo将对发出的项目进行分组,并将它们作为数组传递给此reduce函数.在reduce函数中,您需要进行聚合计算并将所有对象简化为单个对象目的.

Next you need to define the reduce function where Mongo will group the items you emit and pass them as an array to this reduce function It's inside the reduce function where you want to do the aggregation calculations and reduce all the objects to a single object.

var reduce = function(key, values) {
    var result = {count: 0, total: 0 };
    values.forEach(function(value){               
        result.count += value.count;
        result.total += value.qty;
    });

    return result;
};

此reduce函数返回单个结果.重要的是,返回值必须与发射值具有相同的形状. MongoDB还可以针对给定的键多次调用reduce函数,并要求您处理部分值,因此,如果需要执行一些最终计算,还可以为MapReduce提供finalize函数.

This reduce function returns a single result. It's important for the return value to have the same shape as the emitted values. It's also possible for MongoDB to call the reduce function multiple times for a given key and ask you to process a partial set of values, so if you need to perform some final calculation, you can also give MapReduce a finalize function.

完成

finalize函数是可选的,但是如果您需要基于完全精简的数据集进行计算,则需要使用finalize函数.对一个集合的所有reduce调用完成后,Mongo将调用finalize函数.这将是计算文档/时间戳中所有第二个值的平均值的地方:

The finalize function is optional, but if you need to calculate something based on a fully reduced set of data, you'll want to use a finalize function. Mongo will call the finalize function after all the reduce calls for a set are complete. This would be the place to calculate the average of all the second values in a document/timestamp:

var finalize = function (key, value) {
    value.average = value.total / value.count;
    return value;
};

放在一起

有了JavaScript,剩下的就是告诉MongoDB执行MapReduce:

With the JavaScript in place, all that is left is to tell MongoDB to execute a MapReduce:

var map = function(){
    var obj = this.values;
    var keys = Object.keys(obj);
    var values = [];
    keys.forEach(function(key){  
        var val = parseFloat(obj[key]);
        var value = { count: 1, qty: val };  
        emit(this.timestamp, value);
    }); 
};

var reduce = function(key, values) {
    var result = {count: 0, total: 0 };
    values.forEach(function(value){               
        result.count += value.count;
        result.total += value.qty;
    });

    return result;
};

var finalize = function (key, value) {
    value.average = value.total / value.count;
    return value;
};

db.collection.mapReduce(
    map,
    reduce,
    {
        out: { merge: "map_reduce_example" },        
        finalize: finalize
    }
)

当您查询输出集合map_reduce_example db.map_reduce_example.find()时,将得到结果:

And when you query the output collection map_reduce_example, db.map_reduce_example.find(), you get the result:

/* 0 */
{
    "_id" : null,
    "value" : {
        "count" : 5,
        "total" : 128.42,
        "average" : 25.684
    }
}

参考:

  1. mapReduce上的MongoDB文档