I see the list processor managed to smash may beautifully formatted HTML message. For that reason I'm re-sending the sample code snippet in plain text mode...

 Here's a sample kafka streams processor:

        KStream<String, Val> input =
            builder
                .stream(
                    inputTopic,
                    Consumed.with(Serdes.String(), new Val.Serde())
                            .withTimestampExtractor((rec, prevTs) -> {
                                String key = (String) rec.key();
                                Val val = (Val) rec.value();
                                return Math.max(val.getTimestamp(), Math.max(0L, prevTs - 4000));
                            })
                );

        KStream<Windowed<String>, IntegerList> grouped =
            input
                .groupByKey()
                .windowedBy(
                    TimeWindows.of(Duration.ofSeconds(1))
                               .advanceBy(Duration.ofSeconds(1))
                               .grace(Duration.ofSeconds(5))
                )
                .aggregate(
                    IntegerList::new,
                    (k, v, list) -> {
                        list.add(v.getValue());
                        return list;
                    },
                    Materialized.with(Serdes.String(), new IntegerList.Serde())
                )
                .suppress(
                    Suppressed.untilWindowCloses(new StrictBufferConfigImpl())
                )
                .toStream();

        grouped.to(
            outputTopic,
            Produced.with(new SelfSerde.TimeWindowed<>(Serdes.String()), new IntegerList.Serde())
        );



Regards, Peter

Reply via email to