Hello Guozhang,
May I just add some more observations which might help you pin-point the
problem...
When the process that runs the kafka streams processing threads is
restarted, I can see duplicates in the output topic. But that is
understandable for "at least once semantics" and I don't mind if there
are duplicates if they are duplicates of final results of window
aggregations. My logic is prepared for that. But I also see some results
that are actual non-final window aggregations that precede the final
aggregations. These non-final results are never emitted out of order
(for example, no such non-final result would ever come after the final
result for a particular key/window).
For example, here are some log fragments of a sample consumption of the
output topic where I detect either duplicates or "incremental updates"
of some key/window and mark them with "INSTEAD OF" words. I only show
incremental updates here:
[pool-1-thread-4] APP Consumed: [c@1545398874000/1545398876000] -> [14,
272, 548, 172], sum: 138902
[pool-1-thread-4] APP Consumed: [c@1545398874000/1545398876000] -> [14,
272, 548, 172, 596, 886, 780] INSTEAD OF [14, 272, 548, 172], sum: 141164
or:
[pool-1-thread-2] APP Consumed: [c@1545398882000/1545398884000] -> [681,
116, 542, 543, 0, 0, 0, 0], sum: 143046
[pool-1-thread-2] APP Consumed: [c@1545398882000/1545398884000] -> [681,
116, 542, 543, 0, 0, 0, 0, 0, 0, 0, 0] INSTEAD OF [681, 116, 542, 543,
0, 0, 0, 0], sum: 143046
The rule seems to be that almost always the non-final result precedes
immediately in the log the final result. I say almost, because I also
saw one occurrence of the following:
[pool-1-thread-3] APP Consumed: [b@1545398878000/1545398880000] -> [756,
454, 547, 300, 323], sum: 166729
[pool-1-thread-3] APP Consumed: [b@1545398880000/1545398882000] -> [193,
740, 660, 981], sum: 169303
[pool-1-thread-3] APP Consumed: [b@1545398878000/1545398880000] -> [756,
454, 547, 300, 323, 421, 378, 354, 0] INSTEAD OF [756, 454, 547, 300,
323], sum: 170456
[pool-1-thread-3] APP Consumed: [b@1545398880000/1545398882000] -> [193,
740, 660, 981, 879, 209, 104, 0, 0, 0] INSTEAD OF [193, 740, 660, 981],
sum: 171648
Here the incremental update of the key/window happened for two
consecutive 2 second windows in close succession and the results were
intermingled.
What you see in the above log before the window start/end timestamps is
a Sting key which is used in groupByKey (a, b, c, d). The input and
output topics have 4 partitions and I use 4 streams processing threads...
Hope this helps you find the problem.
So could this be considered a bug? I don't know how this suppression is
supposed to work, but it seems that it does not use any persistent
storage for suppression buffer. So after the streams processing process
is restarted, it starts with a fresh buffer. What mechanism are used to
guarantee that in spite of that, the suppress(untilWindowCloses)
suppresses non-final results?
Regards, Peter
On 12/21/18 10:48 AM, Peter Levart wrote:
Hello Guozhang,
Thank you for looking into this problem.
I noticed that I have been using an internal class constructor and
later discovered the right API to create the StrictBufferConfig
implementations. But I'm afraid that using your proposed factory
method won't change anything since its implementation is as follows:
static StrictBufferConfig unbounded() {
return new StrictBufferConfigImpl();
}
...it creates an instance of the same class as my sample code below,
so the program behaves the same...
What does this mean? Was your suggestion meant to rule-out any other
possible causes and your suspicion still holds or did you suspect that
I was not using suppression buffer of sufficient size?
Regards, Peter
On 12/21/18 1:58 AM, Guozhang Wang wrote:
Hello Peter,
Thanks for filing this report, I've looked into the source code and I
think
I may spotted an edge case to your observations. To validate if my
suspicion is correct, could you try modifying your DSL code a little
bit,
to use a very large suppression buffer size --- BTW the
StrictBufferConfigImpl is an internal class (you can tell by its
name) and
are not recommend to use in your code. More specifically:
Suppressed.untilWindowCloses(BufferConfig.unbounded())
------
and see if this issue still exists?
Guozhang
On Wed, Dec 19, 2018 at 1:50 PM Peter Levart <peter.lev...@gmail.com>
wrote:
I see the list processor managed to smash may beautifully formatted
HTML
message. For that reason I'm re-sending the sample code snippet in
plain
text mode...
Here's a sample kafka streams processor:
KStream<String, Val> input =
builder
.stream(
inputTopic,
Consumed.with(Serdes.String(), new Val.Serde())
.withTimestampExtractor((rec, prevTs)
-> {
String key = (String) rec.key();
Val val = (Val) rec.value();
return Math.max(val.getTimestamp(),
Math.max(0L, prevTs - 4000));
})
);
KStream<Windowed<String>, IntegerList> grouped =
input
.groupByKey()
.windowedBy(
TimeWindows.of(Duration.ofSeconds(1))
.advanceBy(Duration.ofSeconds(1))
.grace(Duration.ofSeconds(5))
)
.aggregate(
IntegerList::new,
(k, v, list) -> {
list.add(v.getValue());
return list;
},
Materialized.with(Serdes.String(), new
IntegerList.Serde())
)
.suppress(
Suppressed.untilWindowCloses(new
StrictBufferConfigImpl())
)
.toStream();
grouped.to(
outputTopic,
Produced.with(new
SelfSerde.TimeWindowed<>(Serdes.String()), new IntegerList.Serde())
);
Regards, Peter