Hi
I am new to this mailing list, so not sure if this is the right place to
send this. Please let me know if it's not.

I believe I found a bug on the TopologyTestDriver.

I have a topology that aggregates on a KTable. This is a generic method I
created to build this topology on different topics I have.

public static <A, B, C> KTable<C, Set<B>> groupTable(KTable<A, B>
table, Function<B, C> getKeyFunction,
        Serde<C> keySerde, Serde<B> valueSerde, Serde<Set<B>> aggregatedSerde) {
    return table
            .groupBy((key, value) ->
KeyValue.pair(getKeyFunction.apply(value), value),
                    Serialized.with(keySerde, valueSerde))
            .aggregate(() -> new HashSet<>(), (key, newValue, agg) -> {
                agg.remove(newValue);
                agg.add(newValue);
                return agg;
            }, (key, oldValue, agg) -> {
                agg.remove(oldValue);
                return agg;
            }, Materialized.with(keySerde, aggregatedSerde));
}

This works pretty well when using Kafka, but not when testing via
`TopologyTestDriver`.

In both scenarios, when I get an update, the subtractor is called first,
and then the adder is called. The problem is that when using the
TopologyTestDriver, two messages are sent out for updates: one after the
subtractor call, and another one after the adder call. Not to mention that
the message that is sent after the subrtractor and before the adder is in
an incorrect stage.

I created a test case in GitHub to illustrate the issue:
https://github.com/mulho/topology-testcase

Anyone could confirm this is a bug? I've tested this for both Kafka
versions 2.0.1 and 2.1.0.

Thanks

Murilo

Reply via email to