且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

创建一个映射以为Spark Dataframe的每一行调用POJO

更新时间:2023-08-28 16:37:52

我刚刚发布了

I just posted a solution that actually uses DataFrame/Dataset. The post used a Star Wars dataset to build a model in R and then scored MOJO on the test set in Spark. I'll paste the only relevant part here:

您可以使用spark-submit或spark-shell.如果使用spark-submit,则需要将h2o-genmodel.jar放置在spark应用程序根目录的lib文件夹下,以便可以在编译过程中将其作为依赖项添加.以下代码假定您正在运行spark-shell.为了使用h2o-genmodel.jar,您需要在启动spark-shell时通过提供--jar标志来附加jar文件.例如:

You could either use spark-submit or spark-shell. If you use spark-submit, h2o-genmodel.jar needs to be put under lib folder of the root directory of your spark application so it could be added as a dependency during compilation. The following code assumes you're running spark-shell. In order to use h2o-genmodel.jar, you need to append the jar file when launching spark-shell by providing a --jar flag. For example:

/usr/lib/spark/bin/spark-shell \
--conf spark.serializer="org.apache.spark.serializer.KryoSerializer" \
--conf spark.driver.memory="3g" \
--conf spark.executor.memory="10g" \
--conf spark.executor.instances=10 \
--conf spark.executor.cores=4 \
--jars /path/to/h2o-genmodel.jar

现在在Spark shell中,导入依赖项

Now in the Spark shell, import the dependencies

import _root_.hex.genmodel.easy.{EasyPredictModelWrapper, RowData}
import _root_.hex.genmodel.MojoModel

使用DataFrame

val modelPath = "/path/to/zip/file"
val dataPath = "/path/to/test/data"

// Import data
val dfStarWars = spark.read.option("header", "true").csv(dataPath)
// Import MOJO model
val mojo = MojoModel.load(modelPath)
val easyModel = new EasyPredictModelWrapper(mojo)

// score
val dfScore = dfStarWars.map {
  x =>
    val r = new RowData
    r.put("height", x.getAs[String](1))
    r.put("mass", x.getAs[String](2))
    val score = easyModel.predictBinomial(r).classProbabilities
    (x.getAs[String](0), score(1))
}.toDF("name", "isHumanScore")

可变分数是级别0和1的两个分数的列表.score(1)是级别1的分数,即人类".默认情况下,map函数返回一个未指定列名称为"_1","_ 2"等的DataFrame.您可以通过调用toDF重命名这些列.

The variable score is a list of two scores for level 0 and 1. score(1) is the score for level 1, which is "human". By default the map function returns a DataFrame with unspecified column names "_1", "_2", etc. You can rename the columns by calling toDF.

要使用Dataset API,我们只需要创建两个case类,一个用于输入数据,一个用于输出.

To use the Dataset API we just need to create two case classes, one for the input data, and one for the output.

case class StarWars (
  name: String,
  height: String,
  mass: String,
  is_human: String
)

case class Score (
  name: String,
  isHumanScore: Double
)


// Dataset
val dtStarWars = dfStarWars.as[StarWars]
val dtScore = dtStarWars.map {
  x =>
    val r = new RowData
    r.put("height", x.height)
    r.put("mass", x.mass)
    val score = easyModel.predictBinomial(r).classProbabilities
    Score(x.name, score(1))
}

使用数据集,您可以通过直接调用x.columnName来获取列的值.请注意,列值的类型必须为String,因此,如果它们是case类中定义的其他类型,则可能需要手动对其进行强制转换.

With Dataset you can get the value of a column by calling x.columnName directly. Just notice that the types of the column values have to be String, so you might need to manually cast them if they are of other types defined in the case class.