且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

在Google App Engine中使用mapreduce的简单计数器示例

更新时间:2022-12-29 13:47:20

我在这里提供的解决方案我最终使用GAE中的mapreduce(没有缩小阶段)。如果我从头开始,我可能会使用 Drew Sears 提供的解决方案。



它适用于GAE python 1.5.0



app.yaml I添加mapreduce处理程序:

   -  url:/mapreduce(/.*)? 
script:$ PYTHON_LIB / google / appengine / ext / mapreduce / main.py

和(我使用url / mapred_update来收集mapreduce生成的结果):

   -  url :/mapred_.* 
script:mapred.py

已创建 mapreduce.yaml
$ b

  mapreduce:
- name:Color_Counter
params:
- name:done_callback
值:/ mapred_update $ b $ mapper:
input_reader:google.appengine.ext.mapreduce.input_readers.DatastoreInputReader
处理程序:mapred.process
params:
- name:entity_kind
default:models.Car

解释: done_callback 是mapreduce完成其操作后调用的网址。 mapred.process 是一个处理单个实体和更新计数器的函数(它在mapred.py文件中定义)。模型 Car 在models.py中定义
$ b mapred.py

 从模型导入CarsByColor $ b $从google.appengine.ext导入db 
从google.appengine.ext.mapreduce导入操作作为op
from google.appengine.ext.mapreduce.model从google.appengine.ext导入MapreduceState

从google.appengine.ext.webapp.util导入webapp
导入run_wsgi_app

def process(entity):
处理个别车辆
color = entity.color
如果颜色:
产生op.counters.Increment(' car_color_%s'%color)

class UpdateCounters(webapp.RequestHandler):
根据mapreduce计数器收集的数据
创建统计模型CarsByColor
def post(self):
mapreduce操作完成后调用
#完成的mapreduce作业ID在请求标头中传递
job_id = self.request.headers ['M apreduce -Id']
state = MapreduceState.get_by_job_id(job_id)
to_put = []
counters = state.counters_map.counters
#删除不需要的计数器
计数器['mapper_calls']
在counter.keys()中的计数器:
stat = CarsByColor.get_by_key_name(计数器)
如果不是stat:
stat = CarsByColor(key_name = counter,
name = counter)
stat.value = counters [counter]
to_put.append(stat)
db.put(to_put)

self.response.headers ['Content-Type'] ='text / plain'
self.response.out.write('Updated。')


application = webapp.WSGIApplication(
[('/ mapred_update',UpdateCounters)],
debug = True)
def main():
run_wsgi_app(应用程序)

if __ name__ ==__main__:
main()

CarsByColor模型相比问题。



您可以从url手动启动mapreduce作业: http:// yourapp / mapreduce / ,希望能从cron(我还没有测试过cron)。

I'm somewhat confused with the current state of mapreduce support in GAE. According to the docs http://code.google.com/p/appengine-mapreduce/ reduce phase isn't supported yet, but in the description of the session from I/O 2011 ( http://www.***.com/watch?v=EIxelKcyCC0 ) it's written "It is now possible to run full Map Reduce jobs on App Engine". I wonder if I can use mapreduce in this task:

What I want to do:

I have model Car with field color:

class Car(db.Model):
    color = db.StringProperty()

I want to run mapreduce process (from time to time, cron-defined) which can compute how many cars are in each color ans store this result in the datastore. Seems like a job well suited for mapreduce (but if I'm wrong correct me), phase "map" will yield pairs (, 1) for each Car entity, and phase "reduce" should merge this data by color_name giving me expected results. Final result I want to get are entities with computed data stored in the datastore, something like that:

class CarsByColor(db.Model):
    color_name = db.StringProperty()
    cars_num = db.IntegerProperty()

Problem: I don't know how to implement this in appengine ... The video shows examples with defined map and reduce functions, but they seem to be very general examples not related to the datastore. All other examples that i found are using one function to process the data from DatastoreInputReader, but they seem to be only the "map" phase, there is no example of how to do the "reduce" (and how to store reduce results in the datastore).

I'm providing here solution I figured out eventually using mapreduce from GAE (without reduce phase). If I had started from scratch I probably would have used solution provided by Drew Sears.

It works in GAE python 1.5.0

In app.yaml I added the handler for mapreduce:

- url: /mapreduce(/.*)?
  script: $PYTHON_LIB/google/appengine/ext/mapreduce/main.py

and the handler for my code for mapreduce (I'm using url /mapred_update to gather the results produced by mapreduce):

- url: /mapred_.*
  script: mapred.py

Created mapreduce.yaml for processing Car entities:

mapreduce:
- name: Color_Counter
  params:
  - name: done_callback
    value: /mapred_update
  mapper:
    input_reader: google.appengine.ext.mapreduce.input_readers.DatastoreInputReader
    handler: mapred.process
    params:
    - name: entity_kind
      default: models.Car

Explanation: done_callback is an url that is called after mapreduce finishes its operations. mapred.process is a function that process individual entity and update counters (it's defined in mapred.py file). Model Car is defined in models.py

mapred.py:

from models import CarsByColor
from google.appengine.ext import db
from google.appengine.ext.mapreduce import operation as op
from google.appengine.ext.mapreduce.model import MapreduceState

from google.appengine.ext import webapp
from google.appengine.ext.webapp.util import run_wsgi_app

def process(entity):
    """Process individual Car"""
    color = entity.color
    if color:
        yield op.counters.Increment('car_color_%s' % color)

class UpdateCounters(webapp.RequestHandler):
    """Create stats models CarsByColor based on the data 
    gathered by mapreduce counters"""
    def post(self):
        """Called after mapreduce operation are finished"""
        # Finished mapreduce job id is passed in request headers
        job_id = self.request.headers['Mapreduce-Id']
        state = MapreduceState.get_by_job_id(job_id)
        to_put = []
        counters = state.counters_map.counters
        # Remove counter not needed for stats
        del counters['mapper_calls']
        for counter in counters.keys():
            stat = CarsByColor.get_by_key_name(counter)
            if not stat:
                stat = CarsByColor(key_name=counter,
                                name=counter)
            stat.value = counters[counter]
            to_put.append(stat)
        db.put(to_put)

        self.response.headers['Content-Type'] = 'text/plain'
        self.response.out.write('Updated.')


application = webapp.WSGIApplication(
                                     [('/mapred_update', UpdateCounters)],
                                     debug=True)
def main():
    run_wsgi_app(application)

if __name__ == "__main__":
    main()            

There is slightly changed definition of CarsByColor model compared to question.

You can start the mapreduce job manually from url: http://yourapp/mapreduce/ and hopefully from cron (I haven't tested the cron yet).