This question is cross posted on SO. I answered it there:

https://stackoverflow.com/questions/54372134/topologytestdriver-sending-incorrect-message-on-ktable-aggregations



On 1/25/19 6:32 PM, Murilo Tavares wrote:
> Hi
> I am new to this mailing list, so not sure if this is the right place to
> send this. Please let me know if it's not.
> 
> I believe I found a bug on the TopologyTestDriver.
> 
> I have a topology that aggregates on a KTable. This is a generic method I
> created to build this topology on different topics I have.
> 
> public static <A, B, C> KTable<C, Set<B>> groupTable(KTable<A, B>
> table, Function<B, C> getKeyFunction,
>         Serde<C> keySerde, Serde<B> valueSerde, Serde<Set<B>> 
> aggregatedSerde) {
>     return table
>             .groupBy((key, value) ->
> KeyValue.pair(getKeyFunction.apply(value), value),
>                     Serialized.with(keySerde, valueSerde))
>             .aggregate(() -> new HashSet<>(), (key, newValue, agg) -> {
>                 agg.remove(newValue);
>                 agg.add(newValue);
>                 return agg;
>             }, (key, oldValue, agg) -> {
>                 agg.remove(oldValue);
>                 return agg;
>             }, Materialized.with(keySerde, aggregatedSerde));
> }
> 
> This works pretty well when using Kafka, but not when testing via
> `TopologyTestDriver`.
> 
> In both scenarios, when I get an update, the subtractor is called first,
> and then the adder is called. The problem is that when using the
> TopologyTestDriver, two messages are sent out for updates: one after the
> subtractor call, and another one after the adder call. Not to mention that
> the message that is sent after the subrtractor and before the adder is in
> an incorrect stage.
> 
> I created a test case in GitHub to illustrate the issue:
> https://github.com/mulho/topology-testcase
> 
> Anyone could confirm this is a bug? I've tested this for both Kafka
> versions 2.0.1 and 2.1.0.
> 
> Thanks
> 
> Murilo
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to