I see the list processor managed to smash may beautifully formatted HTML
message. For that reason I'm re-sending the sample code snippet in plain
text mode...
Here's a sample kafka streams processor:
KStream<String, Val> input =
builder
.stream(
inputTopic,
Consumed.with(Serdes.String(), new Val.Serde())
.withTimestampExtractor((rec, prevTs) -> {
String key = (String) rec.key();
Val val = (Val) rec.value();
return Math.max(val.getTimestamp(),
Math.max(0L, prevTs - 4000));
})
);
KStream<Windowed<String>, IntegerList> grouped =
input
.groupByKey()
.windowedBy(
TimeWindows.of(Duration.ofSeconds(1))
.advanceBy(Duration.ofSeconds(1))
.grace(Duration.ofSeconds(5))
)
.aggregate(
IntegerList::new,
(k, v, list) -> {
list.add(v.getValue());
return list;
},
Materialized.with(Serdes.String(), new
IntegerList.Serde())
)
.suppress(
Suppressed.untilWindowCloses(new
StrictBufferConfigImpl())
)
.toStream();
grouped.to(
outputTopic,
Produced.with(new
SelfSerde.TimeWindowed<>(Serdes.String()), new IntegerList.Serde())
);
Regards, Peter