且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

是否可以使用 Kafka Streams 访问消息头?

更新时间:2023-01-29 19:42:00

从 2.0.0 版本开始可以访问记录标题(参见 KIP-244 详情).

Records headers are accessible since versions 2.0.0 (cf. KIP-244 for details).

您可以通过处理器 API 访问记录元数据(即,通过 transform()transformValues()process()),通过给定的上下文"对象(参见https://docs.confluent.io/current/streams/developer-guide/processor-api.html#accessing-processor-context).

You can access record metadata via the Processor API (ie, via transform(), transformValues(), or process()), by the given "context" object (cf. https://docs.confluent.io/current/streams/developer-guide/processor-api.html#accessing-processor-context).

更新

从 2.7.0 版本开始,处理器 API 得到改进(参见 KIP-478),添加一个新的类型安全的 api.Processor 类和 process(Record) 而不是process(K, V) 方法.对于这种情况,可以通过 Record 类访问标头(和记录元数据).

As of 2.7.0 release, the Processor API was improved (cf. KIP-478), adding a new type-safe api.Processor class with process(Record) instead of process(K, V) method. For this case, headers (and record metadata) are accessible via the Record class).

这个新功能在 DSL 的PAPI 方法"中尚不可用(例如 KStream#process()KStream#transform() 和兄弟姐妹).

This new feature is not yet available in "PAPI method of the DSL though (eg. KStream#process(), KStream#transform() and siblings).

++++++

在 2.0 之前,上下文仅公开主题、分区、偏移量和时间戳——但不公开在旧版本中读取时实际上由 Streams 丢弃的标头.

Prior to 2.0, the context only exposes topic, partition, offset, and timestamp---but not headers that are in fact dropped by Streams on read in those older versions.

尽管元数据在 DSL 级别不可用.但是,还有一些工作正在通过 KIP-159.

Metadata is not available at DSL level though. However, there is also work in progress to extend the DSL via KIP-159.