且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

重试一个python App Engine管道

更新时间:2022-10-15 23:30:11

您可能需要第四条管道来解释,验证和确认CallbackPipeline的结果是你所需要的,你需要在新的管道的run()中提高Retry。



你也可以使用rec在第四条管道中提升Retry。你只需要继续调用CallbackPipeline,直到你获得了你正在寻找的结果。这需要异步完成,就像上面贴出的Anentropic一样。


I have a client pipeline which needs to post a request to an external calculation server, then retrieve the results. The calculation takes some time, so the external server processes it in a task queue. The client pipeline isn't exactly sure when the results will be ready; instead it needs to poll the server (get_status(id)), then if status==Completed retrieve the results (get_results(id))

Problem is that occasionally the server calculation fails, in which case I need the client pipeline to retry. I have a pipeline pattern based on this post:

Google AppEngine Pipelines API

which looks as follows:

class ReducePipeline(pipeline.Pipeline):

    def run(self, *args):
        results=[result for result in list(args)
                 if result]
        if results==[]:
            return None
        return results[0]

class CallbackPipeline(pipeline.Pipeline):

    def run(self, id):
        status=get_status(id) # call external server status
        results, future_results = None, None
        if status==Error:
            pass    
        elif status==Completed:
            results=get_results(id) # get results from external server
        elif status!=Completed:
            with pipeline.InOrder():
                yield Delay(seconds=CallbackWait) # NB
                future_results=yield CallbackPipeline(id)
        yield ReducePipeline(results, future_results)

class StartPipeline(pipeline.Pipeline):

    def run(self, request):
        id=start(request) # post request to external server; get job id in return
        yield CallbackPipeline(id)

    """
    is this really the best way to retry the pipeline ? dev_appserver.py results don't look promising :-(
    """

    def finalized(self):
        if not self.outputs.default:
            raise pipeline.Retry()

but this doesn't seem to work on dev_appserver.py, simply causing repeated errors at the StartPipeline finalisation stage. I was hoping to get the entire StartPipeline to retry instead.

Can anyone advise me on a sensible pattern for retrying the StartPipeline on receipt of None results ?

Thanks.

You'll likely need a 4th pipeline that interprets, validates and confirms that the results of CallbackPipeline are what you need and you'll need to raise the Retry from within the run() of that new pipeline.

You could also use recursion in that 4th pipeline versus raising a Retry. You'd simply keep calling the CallbackPipeline until you did achieve the results you were looking for. This would need to be done asynchronously, like Anentropic posted above.