Hey, I'm building a streams application where I'm trying to aggregate a stream of events and getting a list of events per key. `eventStream .groupByKey(Grouped.with(Serdes.String(), eventSerde)) .windowedBy(TimeWindows.of(Duration.ofMillis(50)).grace(Duration.ofMillis(1))) .aggregate( ArrayList::new, (eent, accum) -> { accum.add(event); return accum; }) .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded())) .toStream() .map((windowedKey, value) -> new KeyValue<String, List<Event>>(windowedKey.key(), value)) .map(eventProcessor::processEventsWindow) .to("event-window-chunks-queue", Produced.with(Serdes.String(), eventListSerde))`
As you can see I'm grouping events by key and capturing windowed lists of events for further processing. To be able to process the list of events per key in chunks I added `suppress()`. This does not seem to work though. I get this error multiple times: `Got error produce response with correlation id 5 on topic-partition app-test143-KTABLE-SUPPRESS-STATE-STORE-0000000016-changelog-0, retrying (2147483646 attempts left). Error: NETWORK_EXCEPTION WARN org.apache.kafka.clients.producer.internals.Sender - Received invalid metadata error in produce request on partition shoonya-test143-KTABLE-SUPPRESS-STATE-STORE-0000000016-changelog-0 due to org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.. Going to request metadata update now` When I comment out the suppress() line it works fine but I get a large number of events in a list while processing chunks since it does not suppress already evaluated chunks. Can anyone help me out with what could be happening here? Regards, Sushrut