Hey,

I'm building a streams application where I'm trying to aggregate a stream
of events
and getting a list of events per key.
`eventStream
.groupByKey(Grouped.with(Serdes.String(), eventSerde))
.windowedBy(TimeWindows.of(Duration.ofMillis(50)).grace(Duration.ofMillis(1)))
.aggregate(
    ArrayList::new, (eent, accum) -> {
        accum.add(event);
        return accum;
})
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
.toStream()
.map((windowedKey, value) -> new KeyValue<String,
List<Event>>(windowedKey.key(), value))
.map(eventProcessor::processEventsWindow)
.to("event-window-chunks-queue", Produced.with(Serdes.String(),
eventListSerde))`

As you can see I'm grouping events by key and capturing windowed lists of
events for further processing.
To be able to process the list of events per key in chunks I added
`suppress()`.
This does not seem to work though.
I get this error multiple times:
`Got error produce response with correlation id 5 on topic-partition
app-test143-KTABLE-SUPPRESS-STATE-STORE-0000000016-changelog-0, retrying
(2147483646 attempts left). Error: NETWORK_EXCEPTION
WARN org.apache.kafka.clients.producer.internals.Sender - Received invalid
metadata error in produce request on partition
shoonya-test143-KTABLE-SUPPRESS-STATE-STORE-0000000016-changelog-0 due to
org.apache.kafka.common.errors.NetworkException: The server disconnected
before a response was received.. Going to request metadata update now`

When I comment out the suppress() line it works fine but I get a large
number of events in a list while processing chunks since it does not
suppress already evaluated chunks.
Can anyone help me out with what could be happening here?

Regards,
Sushrut

Reply via email to