Hello Peter,

Thanks for filing this report, I've looked into the source code and I think
I may spotted an edge case to your observations. To validate if my
suspicion is correct, could you try modifying your DSL code a little bit,
to use a very large suppression buffer size --- BTW the
StrictBufferConfigImpl is an internal class (you can tell by its name) and
are not recommend to use in your code. More specifically:

Suppressed.untilWindowCloses(BufferConfig.unbounded())

------

and see if this issue still exists?


Guozhang

On Wed, Dec 19, 2018 at 1:50 PM Peter Levart <peter.lev...@gmail.com> wrote:

> I see the list processor managed to smash may beautifully formatted HTML
> message. For that reason I'm re-sending the sample code snippet in plain
> text mode...
>
>   Here's a sample kafka streams processor:
>
>          KStream<String, Val> input =
>              builder
>                  .stream(
>                      inputTopic,
>                      Consumed.with(Serdes.String(), new Val.Serde())
>                              .withTimestampExtractor((rec, prevTs) -> {
>                                  String key = (String) rec.key();
>                                  Val val = (Val) rec.value();
>                                  return Math.max(val.getTimestamp(),
> Math.max(0L, prevTs - 4000));
>                              })
>                  );
>
>          KStream<Windowed<String>, IntegerList> grouped =
>              input
>                  .groupByKey()
>                  .windowedBy(
>                      TimeWindows.of(Duration.ofSeconds(1))
>                                 .advanceBy(Duration.ofSeconds(1))
>                                 .grace(Duration.ofSeconds(5))
>                  )
>                  .aggregate(
>                      IntegerList::new,
>                      (k, v, list) -> {
>                          list.add(v.getValue());
>                          return list;
>                      },
>                      Materialized.with(Serdes.String(), new
> IntegerList.Serde())
>                  )
>                  .suppress(
>                      Suppressed.untilWindowCloses(new
> StrictBufferConfigImpl())
>                  )
>                  .toStream();
>
>          grouped.to(
>              outputTopic,
>              Produced.with(new
> SelfSerde.TimeWindowed<>(Serdes.String()), new IntegerList.Serde())
>          );
>
>
>
> Regards, Peter
>
>

-- 
-- Guozhang

Reply via email to