Hi Peter, Sorry, I just now have seen this thread.
You asked if this behavior is unexpected, and the answer is yes. Suppress.untilWindowCloses is intended to emit only the final result, regardless of restarts. You also asked how the suppression buffer can resume after a restart, since it's not persistent. The answer is the same as for in-memory stores. The state of the store (or buffer, in this case) is persisted to a changelog topic, which is re-read on restart to re-create the exact state prior to shutdown. "Persistent" in the store nomenclature refers only to "persistent on the local disk". Just to confirm your response regarding the buffer size: While it is better to use the public ("Suppressed.unbounded()") API, yes, your buffer was already unbounded. I looked at your custom transfomer, and it looks almost correct to me. The only flaw seems to be that it only looks for closed windows for the key currently being processed, which means that if you have key "A" buffered, but don't get another event for it for a while after the window closes, you won't emit the final result. This might actually take longer than the window retention period, in which case, the data would be deleted without ever emitting the final result. You said you think it should be possible to get the DSL version working, and I agree, since this is exactly what it was designed for. Do you mind filing a bug in the "KAFKA" Jira project ( https://issues.apache.org/jira/secure/Dashboard.jspa)? It will be easier to keep the investigation organized that way. In the mean time, I'll take another look at your logs above and try to reason about what could be wrong. Just one clarification... For example, you showed > [pool-1-thread-4] APP Consumed: [c@1545398874000/1545398876000] -> [14, 272, 548, 172], sum: 138902 > [pool-1-thread-4] APP Consumed: [c@1545398874000/1545398876000] -> [14, 272, 548, 172, 596, 886, 780] INSTEAD OF [14, 272, 548, 172], sum: 141164 Am I correct in thinking that the first, shorter list is the "incremental" version, and the second is the "final" version? I think so, but am confused by "INSTEAD OF". Thanks for the report, -John On Wed, Dec 26, 2018 at 3:21 AM Peter Levart <peter.lev...@gmail.com> wrote: > > > On 12/21/18 3:16 PM, Peter Levart wrote: > > I also see some results that are actual non-final window aggregations > > that precede the final aggregations. These non-final results are never > > emitted out of order (for example, no such non-final result would ever > > come after the final result for a particular key/window). > > Absence of proof is not the proof of absence... And I have later > observed (using the DSL variant, not the custom Transformer) an > occurrence of a non-final result that was emited after restart of > streams processor while the final result for the same key/window had > been emitted before the restart: > > [pool-1-thread-4] APP Consumed: [a@1545815260000/1545815262000] -> [550, > 81, 18, 393, 968, 847, 452, 0, 0, 0], sum: 444856 > ... > ... restart ... > ... > [pool-1-thread-4] APP Consumed: [a@1545815260000/1545815262000] -> [550] > INSTEAD OF [550, 81, 18, 393, 968, 847, 452, 0, 0, 0], sum: 551648 > > > The app logic can not even rely on guarantee that results are ordered > then. This is really not usable until the bug is fixed. > > Regards, Peter > >