如何在 Spark Scala 中读取检查点数据帧

更新时间:2023-10-20 10:23:34


I hope I can clean some of your doubts explaining checkpointing and giving you an example of how to recover a dataset from a checkpoint directory.

Checkpointing 主要用于迭代算法和 Streaming 过程.

Checkpointing is mainly used in iterative algorithms and Streaming processess.

在批处理中,我们习惯于容错(缓存或持久化).这意味着,如果节点崩溃,作业不会丢失其状态并且丢失的任务是重新安排在其他工人身上.中间结果被写入持久存储(即必须像 HDFS 或云对象存储一样具有容错性)

On batch processing we are used to having fault tolerance(caching or persisting). This means, in case a node crashed, the job doesn't loose its state and the lost tasks are rescheduled on other workers. Intermediate results are written to persistent storage(that has to be fault tolerant as well like HDFS, or Cloud Object Storage)

维护RDD谱系(缓存或持久化)提供了弹性,但当谱系很长时也会导致问题- 例如:迭代算法、流媒体- 恢复可能非常昂贵- 潜在的堆栈溢出

Maintaining RDD lineage(caching or persisting) provides resilience but can also cause problems when the lineage gets very long - For example: iterative algorithms, streaming - Recovery can be very expensive - Potencial stack overflow

检查点将数据保存到 HDFS- 提供跨节点的容错存储- 血统未保存- 必须在对 RDD 执行任何操作之前进行检查点

Checkpointing saves the data to HDFS - Provides fault-tolerant storage across nodes - Lineage is not saved - Must be checkpointed before any actions on the RDD


是Spark SQL的一个特性,可以截断一个逻辑查询计划这可能特别适用于高度迭代的数据算法(例如 SparkMLlib 使用 Spark SQL 的 Dataset API 进行数据操作).

Is a feature of Spark SQL to truncate a logical query plan that could specifically be useful for highly iterative data algorithms (e.g. Spark MLlib that uses Spark SQL’s Dataset API for data manipulation).

检查点实际上是 Spark Core 的一个特性(即 Spark SQL用于分布式计算)允许重新启动驱动程序先前计算的分布式计算状态失败描述为 RDD .那已经在Spark中成功使用了Streaming - 用于流处理的现已过时的 Spark 模块基于RDD API.检查点会截断要检查点的 RDD 的谱系.那已经在迭代机中的 Spark MLlib 中成功使用了学习算法,如 ALS.Spark SQL 中的数据集检查点使用检查点进行截断被检查点的数据集的底层 RDD 的沿袭.

Checkpointing is actually a feature of Spark Core (that Spark SQL uses for distributed computations) that allows a driver to be restarted on failure with previously computed state of a distributed computation described as an RDD . That has been successfully used in Spark Streaming - the now-obsolete Spark module for stream processing based on RDD API. Checkpointing truncates the lineage of a RDD to be checkpointed. That has been successfully used in Spark MLlib in iterative machine learning algorithms like ALS. Dataset checkpointing in Spark SQL uses checkpointing to truncate the lineage of the underlying RDD of a Dataset being checkpointed.

使用数据集检查点要求您指定检查点目录.该目录存储要进行检查点的 RDD 的检查点文件.用SparkContext.setCheckpointDir 设置检查点目录的路径.检查点可以是本地的或可靠的,这定义了检查点的可靠性目录是.本地检查点使用执行器存储写入检查点文件由于 executor 生命周期而被认为是不可靠的.可靠的检查点使用可靠的数据存储,如 Hadoop HDFS.

Using Dataset checkpointing requires that you specify the checkpoint directory. The directory stores the checkpoint files for RDDs to be checkpointed. Use SparkContext.setCheckpointDir to set the path to a checkpoint directory. Checkpointing can be local or reliable which defines how reliable the checkpoint directory is. Local checkpointing uses executor storage to write checkpoint files to and due to the executor lifecycle is considered unreliable. Reliable checkpointing uses a reliable data storage like Hadoop HDFS.


package tests

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

  * Checkpointing
  *     - Maintaining RDD lineage provides resilience but can also cause problems when the lineage gets very long
  *         - For example: iterative algorithms, streaming
  *     - Recovery can be very expensive
  *     - Potencial stack overflow
  *     - Checkpointing saves the data to HDFS
  *         - Provides fault-tolerant storage across nodes
  *         - Lineage is not saved
  *         - Must be checkpointed before any actions on the RDD
object WriteCheckPoint {
  val spark = SparkSession
    .config("spark.sql.shuffle.partitions","4") //Change to a more reasonable default number of partitions for our data
    .config("spark.app.id","WriteCheckPoint") // To silence Metrics warning

  val sqlContext = spark.sqlContext

  val sc = spark.sparkContext

  // Remember to set the checkpoint directory

  def main(args: Array[String]): Unit = {

    // Set org.apache.spark.rdd.ReliableRDDCheckpointData logger to INFO
    // to see what happens while an RDD is checkpointed
    // Let's use log4j API so, you should add import org.apache.log4j.{Level, Logger}

    try {
      val nums = spark.range(5).withColumn("random", rand()).filter("random > 0.5")
      // Must be checkpointed before any actions on the RDD
      // Save the schema as it is going to use to reconstruct nums dataset from a RDD
      val schema = nums.schema


      // To have the opportunity to view the web console of Spark: http://localhost:4040/
      println("Type whatever to the console to exit......")
    } finally {
      println("SparkContext stopped")
      println("SparkSession stopped")


20/06/15 16:42:50 INFO ReliableRDDCheckpointData: Done checkpointing RDD 4 to hdfs://localhost/user/cloudera/checkpoint/607daeca-6ec2-471c-9033-9c4c236880a9/rdd-4, new parent is RDD 5
 |-- id: long (nullable = false)
 |-- random: double (nullable = false)

| id|            random|
|  2|0.9550560942227814|

您必须定义几个受保护的辅助对象在 org.apache.spark 和 org.apache.spark.sql 包中

You will have to define a couple of helper objects that are protected in package org.apache.spark and org.apache.spark.sql

package org.apache.spark

  * SparkContext.checkpointFile is a `protected[spark]` method
  * define a helper object to "escape" the package lock-in
object my {
  import scala.reflect.ClassTag
  import org.apache.spark.rdd.RDD
  def recover[T: ClassTag](sc: SparkContext, path: String): RDD[T] = {

package org.apache.spark.sql

object my2 {
  import org.apache.spark.rdd.RDD
  import org.apache.spark.sql.{DataFrame, SparkSession}
  import org.apache.spark.sql.catalyst.InternalRow
  import org.apache.spark.sql.types.StructType
  def createDataFrame(spark: SparkSession, catalystRows: RDD[InternalRow], schema: StructType): DataFrame = {
    spark.internalCreateDataFrame(catalystRows, schema)


package tests

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, StructType}

  * Recovering RDD From Checkpoint Files
  * — SparkContext.checkpointFile Method
  *   SparkContext.checkpointFile(directory: String)
  *   checkpointFile reads (recovers) a RDD from a checkpoint directory.
  * Note SparkContext.checkpointFile is a protected[spark] method
  * so the code to access it has to be in org.apache.spark package.
  * Internally, checkpointFile creates a ReliableCheckpointRDD in a scope.
object ReadingCheckPoint {
  val spark = SparkSession
    .config("spark.sql.shuffle.partitions","4") //Change to a more reasonable default number of partitions for our data
    .config("spark.app.id","ReadingCheckPoint") // To silence Metrics warning

  val sqlContext = spark.sqlContext

  val sc = spark.sparkContext

  // Make sure to use the same checkpoint directory
  val pathCheckpoint = "hdfs://localhost/user/cloudera/checkpoint/607daeca-6ec2-471c-9033-9c4c236880a9/rdd-4"

  def main(args: Array[String]): Unit = {

    try {


      val schema = new StructType()

      import org.apache.spark.my
      import org.apache.spark.sql.catalyst.InternalRow
      val numsRddRecovered = my.recover[InternalRow](spark.sparkContext, pathCheckpoint) //org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow]
      numsRddRecovered.foreach(x => println(x.toString))

      // We have to convert RDD[InternalRow] to DataFrame
      import org.apache.spark.sql.my2
      val numsRecovered = my2.createDataFrame(spark, numsRddRecovered, schema)

      // To have the opportunity to view the web console of Spark: http://localhost:4040/
      println("Type whatever to the console to exit......")
    } finally {
      println("SparkContext stopped")
      println("SparkSession stopped")


|field1|            field2|
|     2|0.9550560942227814|

您可以通过此链接访问 Spark 文档:Checkpointing

You can follow this link to the Spark Documentation: Checkpointing