且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

我可以将luigi与Python芹菜一起使用吗

更新时间:2022-06-25 23:56:32

从未尝试过,但我认为应该可以在celery任务中调用luigi任务表单,就像您从python代码中通常使用的方法一样:

Never tried but I think it should be possible to call a luigi task form inside a celery task, the same way you do it from python code in general:

from foobar import MyTask
from luigi import scheduler

task = MyTask(123, 'another parameter value')
sch = scheduler.CentralPlannerScheduler()
w = worker.Worker(scheduler=sch)
w.add(task)
w.run()

关于扩展队列和celery工人:如果您有太多的celery工人调用luigi任务,当然,这将需要您扩展luigi调度程序/守护程序,以便它可以处理API请求的数量(每次调用任务时)要被执行,您每隔N秒点击一次luigi Scheduler API-这取决于您的配置-每当任务以错误或成功完成时,您的任务都会在Scheduler API上说我还活着"调度程序API等).

About scaling your queue and celery workers: if you have too many celery workers calling luigi tasks of course it will require you to scale your luigi scheduler/daemon so it can handle the number of API requests (every time you call a task to be excecuted, you hit the luigi scheduler API, every N seconds -it dependes on your config- your tasks will hit the scheduler API to say "I'm alive", every time a task finished with -error or success- you hit the scheduler API, and so on).

所以是的,请仔细查看您的调度程序,以查看它是否接收到太多的HTTP请求,或者其数据库是否处于瓶颈(luigi默认使用sqlite,但您可以轻松地将其更改为mysql o postgres).

So yes, take a close look at your scheduler to see if it's receiving too many http requests or if its database is being a bottle neck (luigi uses by default an sqlite but you can easily change it to mysql o postgres).

更新:

版本2.7.0 起,luigi.scheduler.CentralPlannerScheduler重命名为luigi.scheduler.Scheduler的名称为您可能会在这里看到,因此上述代码现在应为:

Since version 2.7.0, luigi.scheduler.CentralPlannerScheduler has been renamed to luigi.scheduler.Scheduler as you may see here so the above code should now be:

from foobar import MyTask
from luigi import scheduler

task = MyTask(123, 'another parameter value')
sch = scheduler.Scheduler()
w = worker.Worker(scheduler=sch)
w.add(task)
w.run()