且构网

分享程序员开发的那些事...
且构网 - 分享程序员编程开发的那些事

无法反序列化实例 Kafka Streams

更新时间:2021-11-04 17:39:23

看你的代码我可以看到几个问题:

Looking at your code I can see several issues:

  1. TimeOffSerde - 它应该实现 Serde 而不是 Serde
  2. Materialized 中没有传递 Key 和 Value 的类型,因此它假定它是 Object
  1. TimeOffSerde - It should implement Serde<TimeOff> not Serde<Object>
  2. You don't pass types for Key and Value in Materialized, so it assume it is Object

所以你的流媒体部分应该是这样的:

So your streaming part should be something like:

KTable<String, ArrayList<TimeOff>> newStore = source.groupBy((k, v) -> v.getEmployeeId())
        .aggregate(ArrayList::new,
                (key, value, aggregate) -> {
                    aggregate.add(value);
                    return aggregate;
                }, Materialized.<String, ArrayList<TimeOff>, KeyValueStore<Bytes, byte[]>>as("NewStore").withValueSerde(new TimeOffListSerde(new TimeOffSerde())));

注意:修改后记得清空状态存储目录.

NOTICE: Rember to clear state store directory after modification.