且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

SPARK - 使用 RDD.foreach 创建数据帧并在数据帧上执行操作

更新时间:2023-11-18 22:43:46

您的代码将因 TaskNotSerializable 异常而失败,因为您正在尝试使用 SQLContext(其中不可序列化)在 execute 方法中,该方法应该被序列化并发送给工作人员以在 categories RDD 中的每条记录上执行.

Your code would fail on a TaskNotSerializable exception since you're trying to use the SQLContext (which isn't serializable) inside the execute method, which should be serialized and sent to workers to be executed on each record in the categories RDD.

假设您知道类别的数量有限,这意味着类别列表不会太大而无法容纳您的驱动程序内存,您应该将类别收集到驱动程序,并使用 foreach 迭代该本地集合:

Assuming you know the number of categories is limited, which means the list of categories isn't too large to fit in your driver memory, you should collect the categories to driver, and iterate over that local collection using foreach:

val categoriesRdd: RDD[String] = df.select(CATEGORY).distinct().rdd.map(r => r(0).toString)
val categories: Seq[String] = categoriesRdd.collect()
categories.foreach(executePipeline)

另一个改进是重用您加载的数据框而不是执行另一个查询,为每个类别使用过滤器:

Another improvement would be reusing the dataframe that you loaded instead of performing another query, using a filter for each category:

def executePipeline(singleCategoryDf: DataFrame) { /* ... */ }

categories.foreach(cat => {
  val filtered = df.filter(col(CATEGORY) === cat)
  executePipeline(filtered)
})

注意:为了确保 df 的重用不会在每次执行时重新加载它,请确保在收集类别之前 cache() 它.

NOTE: to make sure the re-use of df doesn't reload it for every execution, make sure you cache() it before collecting the categories.