This question is cross posted on SO. I answered it there: https://stackoverflow.com/questions/54372134/topologytestdriver-sending-incorrect-message-on-ktable-aggregations
On 1/25/19 6:32 PM, Murilo Tavares wrote: > Hi > I am new to this mailing list, so not sure if this is the right place to > send this. Please let me know if it's not. > > I believe I found a bug on the TopologyTestDriver. > > I have a topology that aggregates on a KTable. This is a generic method I > created to build this topology on different topics I have. > > public static <A, B, C> KTable<C, Set<B>> groupTable(KTable<A, B> > table, Function<B, C> getKeyFunction, > Serde<C> keySerde, Serde<B> valueSerde, Serde<Set<B>> > aggregatedSerde) { > return table > .groupBy((key, value) -> > KeyValue.pair(getKeyFunction.apply(value), value), > Serialized.with(keySerde, valueSerde)) > .aggregate(() -> new HashSet<>(), (key, newValue, agg) -> { > agg.remove(newValue); > agg.add(newValue); > return agg; > }, (key, oldValue, agg) -> { > agg.remove(oldValue); > return agg; > }, Materialized.with(keySerde, aggregatedSerde)); > } > > This works pretty well when using Kafka, but not when testing via > `TopologyTestDriver`. > > In both scenarios, when I get an update, the subtractor is called first, > and then the adder is called. The problem is that when using the > TopologyTestDriver, two messages are sent out for updates: one after the > subtractor call, and another one after the adder call. Not to mention that > the message that is sent after the subrtractor and before the adder is in > an incorrect stage. > > I created a test case in GitHub to illustrate the issue: > https://github.com/mulho/topology-testcase > > Anyone could confirm this is a bug? I've tested this for both Kafka > versions 2.0.1 and 2.1.0. > > Thanks > > Murilo >
signature.asc
Description: OpenPGP digital signature