且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

如何暂停或恢复芹菜任务?

更新时间:2023-12-05 11:36:52

我想演示一种通过工作流实现可暂停(可恢复)正在进行芹菜任务的通用方法模式.

I'd like to demonstrate a general approach to implementing pause-able (and resume-able) ongoing celery tasks through the workflow pattern.

使用芹菜工作流程-您可以设计整个操作来分为 chain 的任务.它不一定必须纯粹是一条链,但应该遵循一个任务接一个任务的一般概念(或任务

With celery workflows - you can design your entire operation to be divided into a chain of tasks. It doesn't necessarily have to be purely a chain, but it should follow the general concept of one task after another task (or task group) finishes.

一旦有了这样的工作流程,您就可以最终定义要在整个工作流程中暂停的点.在这些点的每个 处,您可以检查前端用户是否已请求操作暂停并相应地继续操作.概念是这样的:-

Once you have a workflow like that, you can finally define points to pause at throughout your workflow. At each of these points, you can check whether or not the frontend user has requested the operation to pause and continue accordingly. The concept is this:-

一个复杂且耗时的操作O分为5个芹菜任务-T1,T2,T3,T4和T5-每个任务(第一个任务除外)都取决于前一个任务的返回值.

A complex and time consuming operation O is split into 5 celery tasks - T1, T2, T3, T4, and T5 - each of these tasks (except the first one) depend on the return value of the previous task.

假设我们定义了要在每个任务之后暂停的点 ,因此工作流程看起来像-

Let's assume we define points to pause after every single task, so the workflow looks like-

  • T1执行
  • T1完成,检查用户是否已请求暂停
    • 如果用户未请求暂停-继续
    • 如果用户请求暂停,请序列化 剩余的工作流程链,并将其存储在某个位置以供以后继续使用
    • T1 executes
    • T1 completes, check if user has requested pause
      • If user has not requested pause - continue
      • If user has requested pause, serialize the remaining workflow chain and store it somewhere to continue later

      ...等等.由于每个任务后都有一个暂停点,因此将在每个任务之后执行检查(当然,最后一个除外).

      ... and so on. Since there's a pause point after each task, that check is performed after every one of them (except the last one of course).

      但这只是理论,我很难在网上的任何地方找到这种实现,所以这就是我想出的-

      But this is only theory, I struggled to find an implementation of this anywhere online so here's what I came up with-

      from typing import Any, Optional
      
      from celery import shared_task
      from celery.canvas import Signature, chain, signature
      
      @shared_task(bind=True)
      def pause_or_continue(
          self, retval: Optional[Any] = None, clause: dict = None, callback: dict = None
      ):
          # Task to use for deciding whether to pause the operation chain
          if signature(clause)(retval):
              # Pause requested, call given callback with retval and remaining chain
              # chain should be reversed as the order of execution follows from end to start
              signature(callback)(retval, self.request.chain[::-1])
              self.request.chain = None
          else:
              # Continue to the next task in chain
              return retval
      
      
      def tappable(ch: chain, clause: Signature, callback: Signature, nth: Optional[int] = 1):
          '''
          Make a operation workflow chain pause-able/resume-able by inserting
          the pause_or_continue task for every nth task in given chain
      
          ch: chain
              The workflow chain
      
          clause: Signature
              Signature of a task that takes one argument - return value of
              last executed task in workflow (if any - othewise `None` is passsed)
              - and returns a boolean, indicating whether or not the operation should continue
      
              Should return True if operation should continue normally, or be paused
      
          callback: Signature
              Signature of a task that takes 2 arguments - return value of
              last executed task in workflow (if any - othewise `None` is passsed) and
              remaining chain of the operation workflow as a json dict object
              No return value is expected
      
              This task will be called when `clause` returns `True` (i.e task is pausing)
              The return value and the remaining chain can be handled accordingly by this task
      
          nth: Int
              Check `clause` after every nth task in the chain
              Default value is 1, i.e check `clause` after every task
              Hence, by default, user given `clause` is called and checked
              after every task
      
          NOTE: The passed in chain is mutated in place
          Returns the mutated chain
          '''
          newch = []
          for n, sig in enumerate(ch.tasks):
              if n != 0 and n % nth == nth - 1:
                  newch.append(pause_or_continue.s(clause=clause, callback=callback))
              newch.append(sig)
          ch.tasks = tuple(newch)
          return ch
      

      说明- pause_or_continue

      此处 pause_or_continue 是前面提到的暂停点.该任务将按特定的时间间隔(时间间隔在任务间隔中而不是在时间间隔中)被调用.然后,此任务调用用户提供的功能(实际上是任务)-子句-检查任务是否应该继续.

      Explanation - pause_or_continue

      Here pause_or_continue is the aforementioned pause point. It's a task that will be called at specific intervals (intervals as in task intervals, not as in time intervals). This task then calls a user provided function (actually a task) - clause - to check whether or not the task should continue.

      如果 clause 函数(实际上是一个任务)返回 True ,则将调用用户提供的 callback 函数,最新返回值(如果任何- None ,否则)以及剩余任务链被传递到此回调中. callback 执行所需的操作,并且 pause_or_continue self.request.chain 设置为 None ,这可以告诉芹菜任务链现在为空-一切都完成了".

      If the clause function (actually a task) returns True, the user provided callback function is called, the latest return value (if any - None otherwise) is passed onto this callback, as well as the remaining chain of tasks. The callback does what it needs to do and pause_or_continue sets self.request.chain to None, which tells celery "The task chain is now empty - everything is finished".

      如果 clause 函数(实际上是一个任务)返回 False ,则返回前一个任务的返回值(如果有,则返回- None )返回以接收下一个任务-链继续进行.因此,工作流程继续进行.

      If the clause function (actually a task) returns False, the return value from the previous task (if any - None otherwise) is returned back for the next task to receive - and the chain goes on. Hence the workflow continues.

      直接调用 调用子句回调,而无需 delay apply_async .它在当前上下文中的当前进程中执行.因此它的行为与普通函数完全相同,那么为什么要使用 签名 ?

      Both clause and callback are being called directly - without delay or apply_async. It is executed in the current process, in the current context. So it behaves exactly like a normal function, then why use signatures?

      答案是序列化.您不能方便地将常规函数对象传递给celery任务.但是,您可以传递任务签名.这正是我在这里所做的. clause callback 都应该是芹菜任务的常规 签名对象.

      The answer is serialization. You can't conveniently pass a regular function object to a celery task. But you can pass a task signature. That's exactly what I'm doing here. Both clause and callback should be a regular signature object of a celery task.

      self.request.chain 存储一个字典列表(表示json,因为celery任务序列化程序默认为json)-每个字典都代表一个任务签名.该列表中的每个任务都以相反的顺序执行.这就是为什么在传递给用户提供的 callback 函数(实际上是一项任务)之前,该列表被颠倒了的原因-用户可能希望任务的顺序从左到右.

      self.request.chain stores a list of dicts (representing jsons as the celery task serializer is json by default) - each of them representing a task signature. Each task from this list is executed in reverse order. Which is why, the list is reversed before passing to the user provided callback function (actually a task) - the user probably expects the order of tasks to be left to right.

      快速笔记:与本讨论无关,但是如果您使用的是 apply_async 中的 link 参数来构建链,而不是 chain 原语本身. self.request.callback 是要修改的属性(即设置为 None 以删除回调和停止链),而不是 self.request.chain

      Quick note: Irrelevant to this discussion, but if you're using the link parameter from apply_async to construct a chain instead of the chain primitive itself. self.request.callback is the property to be modified (i.e set to None to remove callback and stop chain) instead of self.request.chain

      tappable 只是一个基本函数,它带有一条链(为简洁起见,这里是唯一涵盖的工作流原语),并在每个 nth pause_or_continue /code>任务.您可以将它们插入真正想要的任何地方,这取决于您在操作中定义暂停点.这只是一个例子!

      tappable is just a basic function that takes a chain (which is the only workflow primitive covered here, for brevity) and inserts pause_or_continue after every nth task. You can insert them wherever you want really, it is upto you to define pause points in your operation. This is just an example!

      对于每个 chain 对象,任务的实际签名(按从左到右的顺序)存储在 .tasks 属性中.这是任务签名的元组.因此,我们要做的就是将这个元组转换为列表,插入暂停点,然后转换回元组以分配给链.然后返回修改后的链对象.

      For each chain object, the actual signatures of tasks (in order, going from left to right) is stored in the .tasks property. It's a tuple of task signatures. So all we have to do, is take this tuple, convert into a list, insert the pause points and convert back to a tuple to assign to the chain. Then return the modified chain object.

      子句 callback 也附加到 pause_or_continue 签名.普通的芹菜.

      The clause and callback is also attached to the pause_or_continue signature. Normal celery stuff.

      虽然涵盖了主要概念,但是为了展示使用此模式的真实项目(并展示已暂停任务的恢复部分),下面是所有必要资源的小演示

      That covers the primary concept, but to showcase a real project using this pattern (and also to showcase the resuming part of a paused task), here's a small demo of all the necessary resources

      此示例用法假定带有数据库的基本Web服务器的概念.每当启动操作(即工作流链)时,都会为它分配一个ID 并将其存储到数据库中.该表的架构看起来像-

      This example usage assumes the concept of a basic web server with a database. Whenever an operation (i.e workflow chain) is started, it's assigned an id and stored into the database. The schema of that table looks like-

      -- Create operations table
      -- Keeps track of operations and the users that started them
      CREATE TABLE operations (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        requester_id INTEGER NOT NULL,
        completion TEXT NOT NULL,
        workflow_store TEXT,
        result TEXT,
        FOREIGN KEY (requester_id) REFERENCES user (id)
      );
      

      目前唯一需要了解的字段是 completion .它只是存储操作的状态-

      The only field that needs to be known about right now, is completion. It just stores the status of the operation-

      • 当操作开始并创建数据库条目时,将其设置为 IN PROGRESS
      • 当用户请求暂停时,路径控制器(即视图)将其修改为请求暂停
      • 当操作实际暂停并且调用 callback (在 pause_or_continue 内部的 tappable 中)时, callback 应将其修改为 PAUSED
      • 任务完成后,应将其修改为 COMPLETED
      • When the operation starts and a db entry is created, this is set to IN PROGRESS
      • When a user requests pause, the route controller (i.e view) modifies this to REQUESTING PAUSE
      • When the operation actually gets paused and callback (from tappable, inside pause_or_continue) is called, the callback should modify this to PAUSED
      • When the task is completed, this should be modified to COMPLETED
      @celery.task()
      def should_pause(_, operation_id: int):
          # This is the `clause` to be used for `tappable`
          # i.e it lets celery know whether to pause or continue
          db = get_db()
      
          # Check the database to see if user has requested pause on the operation
          operation = db.execute(
              "SELECT * FROM operations WHERE id = ?", (operation_id,)
          ).fetchone()
          return operation["completion"] == "REQUESTING PAUSE"
      

      这是在暂停点调用的任务,以确定是否暂停.这个函数需要2个参数.....很好.第一个是强制性的, tappable 需要 有一个(也是一个)参数,因此它可以传递上一个任务的返回值(即使该返回值是 None ).在此示例中,不需要使用返回值-因此我们可以忽略它.

      This is the task to call at the pause points, to determine whether or not to pause. It's a function that takes 2 parameters.....well sort of. The first one is mandatory, tappable requires the clause to have one (and exactly one) argument - so it can pass the previous task's return value to it (even if that return value is None). In this example, the return value isn't required to be used - so we can just ignore it.

      第二个参数是操作ID.瞧,所有 clause 所做的-检查数据库中的操作(工作流)条目,并查看其状态是否为 REQUESTING PAUSE .为此,它需要知道操作ID.但是 clause 应该是一个带有一个参数的任务,这有​​什么用?

      The second parameter is an operation id. See, all this clause does - is check a database for the operation (the workflow) entry and see if it has the status REQUESTING PAUSE. To do that, it needs to know the operation id. But clause should be a task with one argument, what gives?

      好东西签名可能是不完整的.首次启动任务时,会创建一个 tappable 链.操作ID 是已知的,因此我们可以执行 should_pause.s(operation_id)来获取带有一个参数的任务的签名,即是上一个任务的返回值.符合子句

      Well, good thing signatures can be partial. When the task is first started and a tappable chain is created. The operation id is known and hence we can do should_pause.s(operation_id) to get the signature of a task that takes one parameter, that being the return value of the previous task. That qualifies as a clause!

      import os
      import json
      from typing import Any, List
      
      @celery.task()
      def save_state(retval: Any, chains: dict, operation_id: int):
          # This is the `callback` to be used for `tappable`
          # i.e this is called when an operation is pausing
          db = get_db()
      
          # Prepare directories to store the workflow
          operation_dir = os.path.join(app.config["OPERATIONS"], f"{operation_id}")
          workflow_file = os.path.join(operation_dir, "workflow.json")
          if not os.path.isdir(operation_dir):
              os.makedirs(operation_dir, exist_ok=True)
          
          # Store the remaining workflow chain, serialized into json
          with open(workflow_file, "w") as f:
              json.dump(chains, f)
      
          # Store the result from the last task and the workflow json path
          db.execute(
              """
              UPDATE operations
              SET completion = ?,
                  workflow_store = ?,
                  result = ?
              WHERE id = ?
              """,
              ("PAUSED", workflow_file, f"{retval}", operation_id),
          )
          db.commit()
      

      这是任务暂停时要调用的任务.请记住,这应该采用上次执行的任务的返回值和其余的签名列表(按从左到右的顺序).再有一个额外的参数- operation_id .对此的解释与子句的解释相同.

      And here's the task to be called when the task is being paused. Remember, this should take the last executed task's return value and the remaining list of signatures (in order, from left to right). There's an extra param - operation_id - once again. The explanation for this is the same as the one for clause.

      此函数将剩余的链存储在json文件中(因为它是字典列表).记住,您可以使用其他序列化器-我使用的是json,因为它是celery使用的默认任务序列化器.

      This function stores the remaining chain in a json file (since it's a list of dicts). Remember, you can use a different serializer - I'm using json since it's the default task serializer used by celery.

      在存储了剩余的链之后,它将 completion 状态更新为 PAUSED ,并将json文件的路径记录到数据库中.

      After storing the remaining chain, it updates the completion status to PAUSED and also logs the path to the json file into the db.

      现在,让我们看看它们的作用-

      Now, let's see these in action-

      def start_operation(user_id, *operation_args, **operation_kwargs):
          db = get_db()
          operation_id: int = db.execute(
              "INSERT INTO operations (requester_id, completion) VALUES (?, ?)",
              (user_id, "IN PROGRESS"),
          ).lastrowid
          # Convert a regular workflow chain to a tappable one
          tappable_workflow = tappable(
              (T1.s() | T2.s() | T3.s() | T4.s() | T5.s(operation_id)),
              should_pause.s(operation_id),
              save_state.s(operation_id),
          )
          # Start the chain (i.e send task to celery to run asynchronously)
          tappable_workflow(*operation_args, **operation_kwargs)
          db.commit()
          return operation_id
      

      接受用户ID并启动操作工作流程的函数.这或多或少是围绕视图/路线控制器建模的不切实际的虚拟函数.但是我认为这可以使总体思路得以贯彻.

      A function that takes in a user id and starts an operation workflow. This is more or less an impractical dummy function modeled around a view/route controller. But I think it gets the general idea through.

      假定 T [1-4] 是该操作的所有单位任务,每个任务都将前一个任务的返回值作为参数.只是普通芹菜链的一个示例,请随意使用链条.

      Assume T[1-4] are all unit tasks of the operation, each taking the previous task's return as an argument. Just an example of a regular celery chain, feel free to go wild with your chains.

      T5 是一项将最终结果( T4 的结果)保存到数据库的任务.因此,连同 T4 的返回值一起,它还需要 operation_id .传递到签名中.

      T5 is a task that saves the final result (result from T4) to the database. So along with the return value from T4 it needs the operation_id. Which is passed into the signature.

      def pause(operation_id):
          db = get_db()
      
          operation = db.execute(
              "SELECT * FROM operations WHERE id = ?", (operation_id,)
          ).fetchone()
      
          if operation and operation["completion"] == "IN PROGRESS":
              # Pause only if the operation is in progress
              db.execute(
                  """
                  UPDATE operations
                  SET completion = ?
                  WHERE id = ?
                  """,
                  ("REQUESTING PAUSE", operation_id),
              )
              db.commit()
              return 'success'
      
          return 'invalid id'
      

      这采用了前面提到的修改数据库条目以将 completion 更改为 REQUESTING PAUSE 的概念.提交后,下次 pause_or_continue 下一次调用 should_pause 时,它将知道用户已请求暂停操作,并将相应地进行暂停.

      This employs the previously mentioned concept of modifying the db entry to change completion to REQUESTING PAUSE. Once this is committed, the next time pause_or_continue calls should_pause, it'll know that the user has requested the operation to pause and it'll do so accordingly.

      def resume(operation_id):
          db = get_db()
      
          operation = db.execute(
              "SELECT * FROM operations WHERE id = ?", (operation_id,)
          ).fetchone()
      
          if operation and operation["completion"] == "PAUSED":
              # Resume only if the operation is paused
              with open(operation["workflow_store"]) as f:
                  # Load the remaining workflow from the json
                  workflow_json = json.load(f)
              # Load the chain from the json (i.e deserialize)
              workflow_chain = chain(signature(x) for x in serialized_ch)
              # Start the chain and feed in the last executed task result
              workflow_chain(operation["result"])
      
              db.execute(
                  """
                  UPDATE operations
                  SET completion = ?
                  WHERE id = ?
                  """,
                  ("IN PROGRESS", operation_id),
              )
              db.commit()
              return 'success'
      
          return 'invalid id'
      

      回想一下,当操作暂停时-剩余的工作流存储在json中.由于我们当前将工作流程限制为 chain 对象.我们知道这个json是应该被转换为 chain 的签名列表.因此,我们对其进行反序列化,然后将其发送给芹菜工作者.

      Recall that, when the operation is paused - the remaining workflow is stored in a json. Since we are currently restricting the workflow to a chain object. We know this json is a list of signatures that should be turned into a chain. So, we deserialize it accordingly and send it to the celery worker.

      请注意,此剩余的工作流程仍然具有最初的 pause_or_continue 任务-因此,该工作流程本身又可以暂停/恢复.暂停时, workflow.json 只会被更新.

      Note that, this remaining workflow still has the pause_or_continue tasks as they were originally - so this workflow itself, is once again pause-able/resume-able. When it pauses, the workflow.json will simply be updated.