Thank you for your response Matthias.

> On Feb 26, 2025, at 8:32 PM, Matthias J. Sax <> wrote:
> Hey,
> the problem you describe it not totally fundamental, but there is no 
> workaround in KS currently... So right now, only EOS can fix it.
> In the end, what you would need is a way to first write to the output topic 
> (and flush the write), before you update the state store.

Yes, I reached the same conclusion.

> Using the PAPI you could conceptually do `context.forward()` before you call 
> `store.put(...)`, but the KS runtime hides too many low level controls that 
> it would guarantee that `context.forward(...)` resulted in successful write 
> into the sink downstream, before it returns and `state.put(...)` is 
> executed... There is a missing `producer.flush()` step in-between that you 
> cannot perform.
> Does this make sense?

Yes, I thought of the same thing, but wasn't able to figure out the flush step 
- so it makes sense why I got stuck :)

> Down the line though, I think that TX state-stores can fix it (KIP-892). I 
> believe that we can (and should) use this, even for ALOS mode, allowing us to 
> roll back state of error, ie, all write to the store are "pending" until we 
> do a commit, and for this case we first flush all pending producer writes, 
> before we "commit the store changes", and the KS runtime can do the required 
> flush step.
> Thus, while TX state-store itself do not give you EOS, they should address 
> the issue at hand, and thus also unblock KIP-557 down the line.

Thank you for the link to this KIP - it's exactly what I had been musing about. 
Having this would be fantastic. We'd much rather have KS manage this under the 
hood than expose explicit flushes or have abstract worries about our 

I see lots of discussion from 2022 - mid 2024, leading up to a vote, but not 
much activity since then.
There's some PRs but while they have all been merged, I don't see one that 
actually activates the feature, and the KIP remains open.

Is there any way interested parties could help (testing?) push KIP-892 forward?


> Hope this helps,
>  -Matthias
> On 2/25/25 4:22 PM, Steven Schlansker wrote:
>> Hi kafka-users,
>> We are implementing a Kafka Streams app that computes various
>> streaming statistics over a corpus of data stored in Kafka topics.
>> While some aggregates update often, others like 'min', 'max', or
>> histogram buckets could have relatively few distinct updates relative
>> to the input data.
>> With our first implementation, we discovered that the majority of our
>> time during catch-up processing is computing the same data redundantly
>> over and over, at various stages of our pipeline. We then found that
>> KIP-557, Emit-on-change support
>> is reverted and doesn't seem to be coming back in the short term.
>> So, we've implemented a work-around, where we trade additional storage
>> to suppress redundant updates.
>> record Holder<V> (V val, boolean changed) {}
>> inputStream
>>     .mapValues(v -> new Holder<>(v, true))
>>     .aggregate(
>>         () -> null,
>>         (k, v, a) -> a == null ? new Holder<>(v.val(), true) : new
>> Holder<>(v.val(), !Objects.equals(a.val(), v.val())),
>>         materialized)
>>     .toStream()
>>     .flatMapValues((k, v) -> v.changed() ?
>> Collections.singletonList(v.val()) : Collections.emptyList())
>>     .toTable(materialized)
>> This seems to mostly give us the semantics we want. The aggregation
>> step statefully tracks if we saw this value as the previous state, and
>> if not, forwards the new value.
>> However, we subsequently seem to run into the same bug as highlighted
>> in - in at least
>> once configuration, a failure / retry will observe the source topic
>> rolling back but not the state store, causing downstream updates to be
>> lost.
>> This makes me wonder - is KAFKA-12508 a more general bug with at least
>> once mode and any stateful store - where you can observe state in a
>> store from a failed thread in the "future" relative to your source
>> topic, since it already wrote to the changelog before failing? KIP-557
>> is long reverted and we're able to observe similar behavior just with
>> the DSL.
>> I can't quite put my finger on the risk, but with a bunch of
>> aggregates all through our app, it makes me a bit uneasy about
>> at-least-once mode. I'm sure at least some of the stateful aggregate
>> authors (myself included) did not consider this sort of failure case.
>> We will probably try exactly-once mode for peace of mind.
>> From there, back to emit on change - if there is no other path forward
>> envisioned, perhaps we could consider re-enabling KIP-557 only for
>> exactly-once streams apps? Then at least some users could benefit from
>> it.
>> Failing that, our hack workaround is somewhat complicated by the need
>> to toStream / flatMap / toTable just to suppress "no change". It would
>> be nicer if the aggregate could directly return "no update" to the
>> KStreamAggregateProcessor, which would then know to skip forwarding
>> the record. Null couldn't be used, but we could construct a different
>> flag value or other signal.
>> If we can't get emit-on-change back into Kafka Streams soon, we'd be
>> interested in sponsoring a change like this if the community thinks it
>> could be helpful.
>> Sorry for the meandering train of thought, hopefully others are also
>> interested in getting emit-on-change working still :)
>> Best,
>> Steven

Reply via email to