更新时间:2021-11-04 17:39:23
看你的代码我可以看到几个问题:
Looking at your code I can see several issues:
Serde
而不是 Serde
Materialized
中没有传递 Key 和 Value 的类型,因此它假定它是 Object
Serde<TimeOff>
not Serde<Object>
Materialized
, so it assume it is Object
所以你的流媒体部分应该是这样的:
So your streaming part should be something like:
KTable<String, ArrayList<TimeOff>> newStore = source.groupBy((k, v) -> v.getEmployeeId())
.aggregate(ArrayList::new,
(key, value, aggregate) -> {
aggregate.add(value);
return aggregate;
}, Materialized.<String, ArrayList<TimeOff>, KeyValueStore<Bytes, byte[]>>as("NewStore").withValueSerde(new TimeOffListSerde(new TimeOffSerde())));
注意:修改后记得清空状态存储目录.
NOTICE: Rember to clear state store directory after modification.