Hello Guozhang,
Thank you for looking into this problem.
I noticed that I have been using an internal class constructor and later
discovered the right API to create the StrictBufferConfig
implementations. But I'm afraid that using your proposed factory method
won't change anything since its implementation is as follows:
static StrictBufferConfig unbounded() {
return new StrictBufferConfigImpl();
}
...it creates an instance of the same class as my sample code below, so
the program behaves the same...
What does this mean? Was your suggestion meant to rule-out any other
possible causes and your suspicion still holds or did you suspect that I
was not using suppression buffer of sufficient size?
Regards, Peter
On 12/21/18 1:58 AM, Guozhang Wang wrote:
Hello Peter,
Thanks for filing this report, I've looked into the source code and I think
I may spotted an edge case to your observations. To validate if my
suspicion is correct, could you try modifying your DSL code a little bit,
to use a very large suppression buffer size --- BTW the
StrictBufferConfigImpl is an internal class (you can tell by its name) and
are not recommend to use in your code. More specifically:
Suppressed.untilWindowCloses(BufferConfig.unbounded())
------
and see if this issue still exists?
Guozhang
On Wed, Dec 19, 2018 at 1:50 PM Peter Levart <peter.lev...@gmail.com> wrote:
I see the list processor managed to smash may beautifully formatted HTML
message. For that reason I'm re-sending the sample code snippet in plain
text mode...
Here's a sample kafka streams processor:
KStream<String, Val> input =
builder
.stream(
inputTopic,
Consumed.with(Serdes.String(), new Val.Serde())
.withTimestampExtractor((rec, prevTs) -> {
String key = (String) rec.key();
Val val = (Val) rec.value();
return Math.max(val.getTimestamp(),
Math.max(0L, prevTs - 4000));
})
);
KStream<Windowed<String>, IntegerList> grouped =
input
.groupByKey()
.windowedBy(
TimeWindows.of(Duration.ofSeconds(1))
.advanceBy(Duration.ofSeconds(1))
.grace(Duration.ofSeconds(5))
)
.aggregate(
IntegerList::new,
(k, v, list) -> {
list.add(v.getValue());
return list;
},
Materialized.with(Serdes.String(), new
IntegerList.Serde())
)
.suppress(
Suppressed.untilWindowCloses(new
StrictBufferConfigImpl())
)
.toStream();
grouped.to(
outputTopic,
Produced.with(new
SelfSerde.TimeWindowed<>(Serdes.String()), new IntegerList.Serde())
);
Regards, Peter