且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

结构化流-消耗每条消息

更新时间:2023-11-12 17:30:22

您应该能够使用foreach输出接收器: https://spark.apache. org/docs/latest/structured-streaming-programming-guide.html#using-foreach

You should be able to use a foreach output sink: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks and https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach

即使客户端不可序列化,也不必在ForeachWriter构造函数中打开它.只需将其保留为None/null,然后在open方法中对其进行初始化,该方法在序列化后称为,但每个任务只能执行一次.

Even though the client is not serializable, you don't have to open it in your ForeachWriter constructor. Just leave it None/null, and initialize it in the open method, which is called after serialization, but only once per task.

以伪代码排序:

class HBaseForeachWriter extends ForeachWriter[MyType] {
  var client: Option[HBaseClient] = None
  def open(partitionId: Long, version: Long): Boolean = {
    client = Some(... open a client ...)
  }
  def process(record: MyType) = {
    client match {
      case None => throw Exception("shouldn't happen")
      case Some(cl) => {
        ... use cl to write record ...
      }
    }
  }
  def close(errorOrNull: Throwable): Unit = {
    client.foreach(cl => cl.close())
  }
}