且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

Flink:如何在 flink 中处理外部应用程序配置更改

更新时间:2022-06-25 22:05:09

更新正在运行的流应用程序的配置是一个常见的要求.在 Flink 的 DataStream API 中,这可以使用处理两个输入流的所谓的 CoFlatMapFunction 来完成.其中一个流可以是数据流,另一个是控制流.

Updating the configuration of a running streaming application is a common requirements. In Flink's DataStream API this can be done using a so-called CoFlatMapFunction which processes two input streams. One of the streams can be a data stream and the other a control stream.

以下示例展示了如何动态调整过滤掉超过特定长度的字符串的用户函数.

The following example shows how to dynamically adapt a user function that filters out strings that exceed a certain length.

val data: DataStream[String] = ???
val control: DataStream[Int] = ???

val filtered: DataStream[String] = data
  // broadcast all control messages to the following CoFlatMap subtasks
  .connect(control.broadcast)
  // process data and control messages
  .flatMap(new DynLengthFilter)


class DynLengthFilter extends CoFlatMapFunction[String, Int, String] with Checkpointed[Int] {

  var length = 0

  // filter strings by length
  override def flatMap1(value: String, out: Collector[String]): Unit = {
    if (value.length < length) {
      out.collect(value)
    }
  }

  // receive new filter length
  override def flatMap2(value: Int, out: Collector[String]): Unit = {
    length = value
  }

  override def snapshotState(checkpointId: Long, checkpointTimestamp: Long): Int = length

  override def restoreState(state: Int): Unit = {
    length = state
  }
}

DynLengthFilter 用户函数为过滤器长度实现了Checkpointed 接口.如果发生故障,该信息会自动恢复.

The DynLengthFilter user function implements the Checkpointed interface for the filter length. In case of a failure, this information is automatically restored.