Hello Peter, Thanks for filing this report, I've looked into the source code and I think I may spotted an edge case to your observations. To validate if my suspicion is correct, could you try modifying your DSL code a little bit, to use a very large suppression buffer size --- BTW the StrictBufferConfigImpl is an internal class (you can tell by its name) and are not recommend to use in your code. More specifically:
Suppressed.untilWindowCloses(BufferConfig.unbounded()) ------ and see if this issue still exists? Guozhang On Wed, Dec 19, 2018 at 1:50 PM Peter Levart <peter.lev...@gmail.com> wrote: > I see the list processor managed to smash may beautifully formatted HTML > message. For that reason I'm re-sending the sample code snippet in plain > text mode... > > Here's a sample kafka streams processor: > > KStream<String, Val> input = > builder > .stream( > inputTopic, > Consumed.with(Serdes.String(), new Val.Serde()) > .withTimestampExtractor((rec, prevTs) -> { > String key = (String) rec.key(); > Val val = (Val) rec.value(); > return Math.max(val.getTimestamp(), > Math.max(0L, prevTs - 4000)); > }) > ); > > KStream<Windowed<String>, IntegerList> grouped = > input > .groupByKey() > .windowedBy( > TimeWindows.of(Duration.ofSeconds(1)) > .advanceBy(Duration.ofSeconds(1)) > .grace(Duration.ofSeconds(5)) > ) > .aggregate( > IntegerList::new, > (k, v, list) -> { > list.add(v.getValue()); > return list; > }, > Materialized.with(Serdes.String(), new > IntegerList.Serde()) > ) > .suppress( > Suppressed.untilWindowCloses(new > StrictBufferConfigImpl()) > ) > .toStream(); > > grouped.to( > outputTopic, > Produced.with(new > SelfSerde.TimeWindowed<>(Serdes.String()), new IntegerList.Serde()) > ); > > > > Regards, Peter > > -- -- Guozhang