Hey,
the problem you describe it not totally fundamental, but there is no
workaround in KS currently... So right now, only EOS can fix it.
In the end, what you would need is a way to first write to the output
topic (and flush the write), before you update the state store.
Using the PAPI you could conceptually do `context.forward()` before you
call `store.put(...)`, but the KS runtime hides too many low level
controls that it would guarantee that `context.forward(...)` resulted in
successful write into the sink downstream, before it returns and
`state.put(...)` is executed... There is a missing `producer.flush()`
step in-between that you cannot perform.
Does this make sense?
Down the line though, I think that TX state-stores can fix it (KIP-892).
I believe that we can (and should) use this, even for ALOS mode,
allowing us to roll back state of error, ie, all write to the store are
"pending" until we do a commit, and for this case we first flush all
pending producer writes, before we "commit the store changes", and the
KS runtime can do the required flush step.
Thus, while TX state-store itself do not give you EOS, they should
address the issue at hand, and thus also unblock KIP-557 down the line.
Hope this helps,
-Matthias
On 2/25/25 4:22 PM, Steven Schlansker wrote:
Hi kafka-users,
We are implementing a Kafka Streams app that computes various
streaming statistics over a corpus of data stored in Kafka topics.
While some aggregates update often, others like 'min', 'max', or
histogram buckets could have relatively few distinct updates relative
to the input data.
With our first implementation, we discovered that the majority of our
time during catch-up processing is computing the same data redundantly
over and over, at various stages of our pipeline. We then found that
KIP-557, Emit-on-change support
https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams
is reverted and doesn't seem to be coming back in the short term.
So, we've implemented a work-around, where we trade additional storage
to suppress redundant updates.
record Holder<V> (V val, boolean changed) {}
inputStream
.mapValues(v -> new Holder<>(v, true))
.aggregate(
() -> null,
(k, v, a) -> a == null ? new Holder<>(v.val(), true) : new
Holder<>(v.val(), !Objects.equals(a.val(), v.val())),
materialized)
.toStream()
.flatMapValues((k, v) -> v.changed() ?
Collections.singletonList(v.val()) : Collections.emptyList())
.toTable(materialized)
This seems to mostly give us the semantics we want. The aggregation
step statefully tracks if we saw this value as the previous state, and
if not, forwards the new value.
However, we subsequently seem to run into the same bug as highlighted
in https://issues.apache.org/jira/browse/KAFKA-12508 - in at least
once configuration, a failure / retry will observe the source topic
rolling back but not the state store, causing downstream updates to be
lost.
This makes me wonder - is KAFKA-12508 a more general bug with at least
once mode and any stateful store - where you can observe state in a
store from a failed thread in the "future" relative to your source
topic, since it already wrote to the changelog before failing? KIP-557
is long reverted and we're able to observe similar behavior just with
the DSL.
I can't quite put my finger on the risk, but with a bunch of
aggregates all through our app, it makes me a bit uneasy about
at-least-once mode. I'm sure at least some of the stateful aggregate
authors (myself included) did not consider this sort of failure case.
We will probably try exactly-once mode for peace of mind.
From there, back to emit on change - if there is no other path forward
envisioned, perhaps we could consider re-enabling KIP-557 only for
exactly-once streams apps? Then at least some users could benefit from
it.
Failing that, our hack workaround is somewhat complicated by the need
to toStream / flatMap / toTable just to suppress "no change". It would
be nicer if the aggregate could directly return "no update" to the
KStreamAggregateProcessor, which would then know to skip forwarding
the record. Null couldn't be used, but we could construct a different
flag value or other signal.
If we can't get emit-on-change back into Kafka Streams soon, we'd be
interested in sponsoring a change like this if the community thinks it
could be helpful.
Sorry for the meandering train of thought, hopefully others are also
interested in getting emit-on-change working still :)
Best,
Steven