且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

如何使用 spark-submit 和 pyspark 运行 luigi 任务

更新时间:2022-05-19 19:03:08

Luigi 有一些模板任务.其中之一称为 PySparkTask.您可以从此类继承并覆盖属性:

Luigi has some template Tasks. One of them called PySparkTask. You can inherit from this class and override the properties:

https://github.com/spotify/luigi/blob/master/luigi/contrib/spark.py.

我还没有测试过,但根据我对 luigi 的经验,我会尝试这个:

I haven't tested it but based on my experience with luigi I would have try this:

import my_module


class MyPySparkTask(PySparkTask):
    date = luigi.DateParameter()

    @property
    def name(self):
        return self.__class__.__name__

    @property
    def master(self):
        return 'mesos://host:port'

    @property
    def deploy_mode(self):
        return 'cluster'

    @property
    def total_executor_cores(self):
        return 1

    @property
    def driver_cores(self):
        return 1

    @property
    def executor-memory(self):
        return 1G

    @property
    def driver-memory(self):
        return 1G

    def main(self, sc, *args):
        my_module.run(sc)

    def self.app_options():
        return [date]

然后你可以运行它:luigi --module task_module MyPySparkTask --local-scheduler --date 2016-01

Then you can run it with: luigi --module task_module MyPySparkTask --local-scheduler --date 2016-01

还有一个选项可以在 client.cfg 文件中设置属性,以使它们成为其他 PySparkTasks 的默认值:

There is also an option to set the properties in a client.cfg file in order to make them the default values for other PySparkTasks:

[spark]
master: mesos://host:port
deploy_mode: cluster
total_executor_cores: 1
driver_cores: 1
executor-memory: 1G
driver-memory: 1G