且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

如何拆除 SparkSession 并在一个应用程序中创建一个新的?

更新时间:2023-02-25 18:18:01

长话短说,Spark(包括 PySpark)并不是为了在单个应用程序中处理多个上下文而设计的.如果您对故事的 JVM 方面感兴趣,我建议您阅读 SPARK-2243(解析为不会修复).

PySpark 做出了许多反映这一点的设计决策,包括但不限于 单例 Py4J 网关.有效地你不能有多个SparkContext代码>在单个应用程序中.SparkSession 不仅绑定到 SparkContext 还引入了它自己的问题,例如处理本地(独立)Hive Metastore(如果使用).此外,有些函数在内部使用 SparkSession.builder.getOrCreate 并且取决于您现在看到的行为.一个显着的例子是 UDF 注册.如果存在多个 SQL 上下文(例如 RDD.toDF),其他函数可能会表现出意外行为.

多个上下文不仅不受支持,而且在我个人看来,也违反了单一职责原则.您的业​​务逻辑不应涉及所有设置、清理和配置细节.

我个人的建议如下:

  • 如果应用程序由多个连贯的模块组成,这些模块可以组合在一起并受益于具有缓存和公共元存储的单个执行环境,则在应用程序入口点初始化所有必需的上下文,并在必要时将它们传递给各个管道:>

    • main.py:

      from pyspark.sql import SparkSession进口收集进口流程如果 __name__ == "__main__":火花:SparkSession = ...# 在模块之间传递数据收集 = 收集.执行(火花)处理 = process.execute(火花,数据=收集)...火花.停止()

    • collect.py/process.py:

      from pyspark.sql import SparkSessiondef 执行(火花:SparkSession,数据=无):...

  • 否则(根据您的描述,这里似乎是这种情况)我会设计入口点来执行单个管道并使用外部工作流管理器(例如 Apache AirflowToil) 来处理执行.

    它不仅更干净,而且允许更灵活的故障恢复和调度.

    同样的事情当然可以用建设者来做,但就像一个聪明人曾经说过:显式优于隐式.

    • main.py

      导入 argparse从 pyspark.sql 导入 SparkSession进口收集进口流程管道 = {收集":收集,处理":处理}如果 __name__ == "__main__":解析器 = argparse.ArgumentParser()parser.add_argument('--pipeline')args = parser.parse_args()火花:SparkSession = ...# 仅针对副作用执行单个管道管道[args.pipeline].execute(spark)火花.停止()

    • collect.py/process.py 和上一点一样.

无论如何,我会保留一个且只有一个设置上下文的地方,并且只有一个地方被拆除.

I have a pyspark program with multiple independent modules that can each independently process data to meet my various needs. But they can also be chained together to process data in a pipeline. Each of these modules builds a SparkSession and executes perfectly on their own.

However, when I try to run them serially within the same python process, I run into issues. At the moment when the second module in the pipeline executes, spark complains that the SparkContext I am attempting to use has been stopped:

py4j.protocol.Py4JJavaError: An error occurred while calling o149.parquet.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.

Each of these modules builds a SparkSession at the beginning of execution and stops the sparkContext at the end of its process. I build and stop sessions/contexts like so:

session = SparkSession.builder.appName("myApp").getOrCreate()
session.stop()

According to official documentation, getOrCreate "gets an existing SparkSession or, if there is no existing one, creates a new one based on the options set in this builder." But I don't want this behavior (this behavior where the process attempts to get an existing session). I can't find any way to disable it, and I can't figure out how to destroy the session -- I only know how to stop its associated SparkContext.

How can I build new SparkSessions in independent modules, and execute them in sequence in the same Python process without previous sessions interfering with the newly created ones?

The following is an example of the project structure:

main.py

import collect
import process

if __name__ == '__main__':
    data = collect.execute()
    process.execute(data)

collect.py

import datagetter

def execute(data=None):
    session = SparkSession.builder.appName("myApp").getOrCreate()

    data = data if data else datagetter.get()
    rdd = session.sparkContext.parallelize(data)
    [... do some work here ...]
    result = rdd.collect()
    session.stop()
    return result

process.py

import datagetter

def execute(data=None):
    session = SparkSession.builder.appName("myApp").getOrCreate()
    data = data if data else datagetter.get()
    rdd = session.sparkContext.parallelize(data)
    [... do some work here ...]
    result = rdd.collect()
    session.stop()
    return result

Long story short, Spark (including PySpark) is not designed to handle multiple contexts in a single application. If you're interested in JVM side of the story I would recommend reading SPARK-2243 (resolved as won't fix).

There is a number of design decisions made in PySpark which reflects that including, but not limited to a singleton Py4J gateway. Effectively you cannot have multiple SparkContexts in a single application. SparkSession is not only bound to SparkContext but also introduces problems of its own, like handling local (standalone) Hive metastore if one is used. Moreover there functions which use SparkSession.builder.getOrCreate internally and depend on the behavior you see right now. A notable example is UDF registration. Other functions may exhibit unexpected behavior if multiple SQL contexts are present (for example RDD.toDF).

Multiple contexts are not only unsupported but also, in my personal opinion, violate single responsibility principle. Your business logic shouldn't be concerned with all the setup, cleanup and configuration details.

My personal recommendations are as follows:

  • If application consist of multiple coherent modules which can be composed and benefit from a single execution environment with caching and common metastore initialize all required contexts in the application entry point and pass these down to individual pipelines when necessary:

    • main.py:

      from pyspark.sql import SparkSession
      
      import collect
      import process
      
      if __name__ == "__main__":
          spark: SparkSession = ...
      
          # Pass data between modules
          collected = collect.execute(spark)
          processed = process.execute(spark, data=collected)
          ...
          spark.stop()
      

    • collect.py / process.py:

      from pyspark.sql import SparkSession
      
      def execute(spark: SparkSession, data=None):
          ...
      

  • Otherwise (it seems to be the case here based on your description) I would design entrypoint to execute a single pipeline and use external worfklow manager (like Apache Airflow or Toil) to handle the execution.

    It is not only cleaner but also allows for much more flexible fault recovery and scheduling.

    The same thing can be of course done with builders but like a smart person once said: Explicit is better than implicit.

    • main.py

      import argparse
      
      from pyspark.sql import SparkSession
      
      import collect
      import process
      
      pipelines = {"collect": collect, "process": process}
      
      if __name__ == "__main__":
          parser = argparse.ArgumentParser()
          parser.add_argument('--pipeline')
          args = parser.parse_args()
      
          spark: SparkSession = ...
      
          # Execute a single pipeline only for side effects
          pipelines[args.pipeline].execute(spark)
          spark.stop()
      

    • collect.py / process.py as in the previous point.

One way or another I would keep one and only one place where context is set up and one and only one place were it is tear down.