更新时间:2022-06-25 22:05:09
更新正在运行的流应用程序的配置是一个常见的要求.在 Flink 的 DataStream API 中,这可以使用处理两个输入流的所谓的 CoFlatMapFunction
来完成.其中一个流可以是数据流,另一个是控制流.
Updating the configuration of a running streaming application is a common requirements. In Flink's DataStream API this can be done using a so-called CoFlatMapFunction
which processes two input streams. One of the streams can be a data stream and the other a control stream.
以下示例展示了如何动态调整过滤掉超过特定长度的字符串的用户函数.
The following example shows how to dynamically adapt a user function that filters out strings that exceed a certain length.
val data: DataStream[String] = ???
val control: DataStream[Int] = ???
val filtered: DataStream[String] = data
// broadcast all control messages to the following CoFlatMap subtasks
.connect(control.broadcast)
// process data and control messages
.flatMap(new DynLengthFilter)
class DynLengthFilter extends CoFlatMapFunction[String, Int, String] with Checkpointed[Int] {
var length = 0
// filter strings by length
override def flatMap1(value: String, out: Collector[String]): Unit = {
if (value.length < length) {
out.collect(value)
}
}
// receive new filter length
override def flatMap2(value: Int, out: Collector[String]): Unit = {
length = value
}
override def snapshotState(checkpointId: Long, checkpointTimestamp: Long): Int = length
override def restoreState(state: Int): Unit = {
length = state
}
}
DynLengthFilter
用户函数为过滤器长度实现了Checkpointed
接口.如果发生故障,该信息会自动恢复.
The DynLengthFilter
user function implements the Checkpointed
interface for the filter length. In case of a failure, this information is automatically restored.