Thank you for your response Matthias. > On Feb 26, 2025, at 8:32 PM, Matthias J. Sax <mj...@apache.org> wrote: > > Hey, > > the problem you describe it not totally fundamental, but there is no > workaround in KS currently... So right now, only EOS can fix it. > > In the end, what you would need is a way to first write to the output topic > (and flush the write), before you update the state store. >
Yes, I reached the same conclusion. > Using the PAPI you could conceptually do `context.forward()` before you call > `store.put(...)`, but the KS runtime hides too many low level controls that > it would guarantee that `context.forward(...)` resulted in successful write > into the sink downstream, before it returns and `state.put(...)` is > executed... There is a missing `producer.flush()` step in-between that you > cannot perform. > > Does this make sense? Yes, I thought of the same thing, but wasn't able to figure out the flush step - so it makes sense why I got stuck :) > Down the line though, I think that TX state-stores can fix it (KIP-892). I > believe that we can (and should) use this, even for ALOS mode, allowing us to > roll back state of error, ie, all write to the store are "pending" until we > do a commit, and for this case we first flush all pending producer writes, > before we "commit the store changes", and the KS runtime can do the required > flush step. > > Thus, while TX state-store itself do not give you EOS, they should address > the issue at hand, and thus also unblock KIP-557 down the line. Thank you for the link to this KIP - it's exactly what I had been musing about. Having this would be fantastic. We'd much rather have KS manage this under the hood than expose explicit flushes or have abstract worries about our crash-readiness. I see lots of discussion from 2022 - mid 2024, leading up to a vote, but not much activity since then. There's some PRs but while they have all been merged, I don't see one that actually activates the feature, and the KIP remains open. Is there any way interested parties could help (testing?) push KIP-892 forward? Thanks! > > > Hope this helps, > -Matthias > > On 2/25/25 4:22 PM, Steven Schlansker wrote: >> Hi kafka-users, >> We are implementing a Kafka Streams app that computes various >> streaming statistics over a corpus of data stored in Kafka topics. >> While some aggregates update often, others like 'min', 'max', or >> histogram buckets could have relatively few distinct updates relative >> to the input data. >> With our first implementation, we discovered that the majority of our >> time during catch-up processing is computing the same data redundantly >> over and over, at various stages of our pipeline. We then found that >> KIP-557, Emit-on-change support >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams >> is reverted and doesn't seem to be coming back in the short term. >> So, we've implemented a work-around, where we trade additional storage >> to suppress redundant updates. >> record Holder<V> (V val, boolean changed) {} >> inputStream >> .mapValues(v -> new Holder<>(v, true)) >> .aggregate( >> () -> null, >> (k, v, a) -> a == null ? new Holder<>(v.val(), true) : new >> Holder<>(v.val(), !Objects.equals(a.val(), v.val())), >> materialized) >> .toStream() >> .flatMapValues((k, v) -> v.changed() ? >> Collections.singletonList(v.val()) : Collections.emptyList()) >> .toTable(materialized) >> This seems to mostly give us the semantics we want. The aggregation >> step statefully tracks if we saw this value as the previous state, and >> if not, forwards the new value. >> However, we subsequently seem to run into the same bug as highlighted >> in https://issues.apache.org/jira/browse/KAFKA-12508 - in at least >> once configuration, a failure / retry will observe the source topic >> rolling back but not the state store, causing downstream updates to be >> lost. >> This makes me wonder - is KAFKA-12508 a more general bug with at least >> once mode and any stateful store - where you can observe state in a >> store from a failed thread in the "future" relative to your source >> topic, since it already wrote to the changelog before failing? KIP-557 >> is long reverted and we're able to observe similar behavior just with >> the DSL. >> I can't quite put my finger on the risk, but with a bunch of >> aggregates all through our app, it makes me a bit uneasy about >> at-least-once mode. I'm sure at least some of the stateful aggregate >> authors (myself included) did not consider this sort of failure case. >> We will probably try exactly-once mode for peace of mind. >> From there, back to emit on change - if there is no other path forward >> envisioned, perhaps we could consider re-enabling KIP-557 only for >> exactly-once streams apps? Then at least some users could benefit from >> it. >> Failing that, our hack workaround is somewhat complicated by the need >> to toStream / flatMap / toTable just to suppress "no change". It would >> be nicer if the aggregate could directly return "no update" to the >> KStreamAggregateProcessor, which would then know to skip forwarding >> the record. Null couldn't be used, but we could construct a different >> flag value or other signal. >> If we can't get emit-on-change back into Kafka Streams soon, we'd be >> interested in sponsoring a change like this if the community thinks it >> could be helpful. >> Sorry for the meandering train of thought, hopefully others are also >> interested in getting emit-on-change working still :) >> Best, >> Steven >