更新时间:2023-02-06 22:12:51
做到这一点的***方法是使用Avro,其中架构独立存储,并由Kafka Connect和KSQL自动使用。
The best way to do this is to use Avro, in which the schema is stored separately and automatically used by Kafka Connect and KSQL.
您可以通过将Kafka Connect配置为使用AvroConverter来使用Avro。在您的Kafka Connect工作程序配置集中:
You can use Avro by configuring Kafka Connect to use the AvroConverter. In your Kafka Connect worker configuration set:
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
(将 schema-registry
更新为运行模式注册表的主机名)
(Update schema-registry
to the hostname of where your Schema Registry is running)
从此处开始,在KSQL中,您只需使用
From there, in KSQL you just use
CREATE STREAM my_stream WITH (KAFKA_TOPIC='source_topic', VALUE_FORMAT='AVRO');
您无需在此处指定架构本身,因为KSQL是从Schema Registry中获取的。
You don't need to specify the schema itself here, because KSQL fetches it from the Schema Registry.
您可以阅读有关转换器和序列化器的更多信息此处。
You can read more about Converters and serialisers here.
免责声明:我为Confluent工作,并写了引用的博客文章。