Hi kafka-users, We are implementing a Kafka Streams app that computes various streaming statistics over a corpus of data stored in Kafka topics. While some aggregates update often, others like 'min', 'max', or histogram buckets could have relatively few distinct updates relative to the input data.
With our first implementation, we discovered that the majority of our time during catch-up processing is computing the same data redundantly over and over, at various stages of our pipeline. We then found that KIP-557, Emit-on-change support https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams is reverted and doesn't seem to be coming back in the short term. So, we've implemented a work-around, where we trade additional storage to suppress redundant updates. record Holder<V> (V val, boolean changed) {} inputStream .mapValues(v -> new Holder<>(v, true)) .aggregate( () -> null, (k, v, a) -> a == null ? new Holder<>(v.val(), true) : new Holder<>(v.val(), !Objects.equals(a.val(), v.val())), materialized) .toStream() .flatMapValues((k, v) -> v.changed() ? Collections.singletonList(v.val()) : Collections.emptyList()) .toTable(materialized) This seems to mostly give us the semantics we want. The aggregation step statefully tracks if we saw this value as the previous state, and if not, forwards the new value. However, we subsequently seem to run into the same bug as highlighted in https://issues.apache.org/jira/browse/KAFKA-12508 - in at least once configuration, a failure / retry will observe the source topic rolling back but not the state store, causing downstream updates to be lost. This makes me wonder - is KAFKA-12508 a more general bug with at least once mode and any stateful store - where you can observe state in a store from a failed thread in the "future" relative to your source topic, since it already wrote to the changelog before failing? KIP-557 is long reverted and we're able to observe similar behavior just with the DSL. I can't quite put my finger on the risk, but with a bunch of aggregates all through our app, it makes me a bit uneasy about at-least-once mode. I'm sure at least some of the stateful aggregate authors (myself included) did not consider this sort of failure case. We will probably try exactly-once mode for peace of mind. >From there, back to emit on change - if there is no other path forward envisioned, perhaps we could consider re-enabling KIP-557 only for exactly-once streams apps? Then at least some users could benefit from it. Failing that, our hack workaround is somewhat complicated by the need to toStream / flatMap / toTable just to suppress "no change". It would be nicer if the aggregate could directly return "no update" to the KStreamAggregateProcessor, which would then know to skip forwarding the record. Null couldn't be used, but we could construct a different flag value or other signal. If we can't get emit-on-change back into Kafka Streams soon, we'd be interested in sponsoring a change like this if the community thinks it could be helpful. Sorry for the meandering train of thought, hopefully others are also interested in getting emit-on-change working still :) Best, Steven