Thanks John, I'll try increasing the "CACHE_MAX_BYTES_BUFFERING_CONFIG" and "COMMIT_INTERVAL_MS_CONFIG" configurations.
Thanks, Sushrut On Sat, Jan 18, 2020 at 11:31 AM John Roesler <vvcep...@apache.org> wrote: > Ah, I should add, if you actually want to use suppression, or > you need to resolve a similar error message in the future, you > probably need to tweak the batch sizes and/or timeout configs > of the various clients, and maybe the server as well. > > That error message kind of sounds like the server went silent > long enough that the http session expired, or maybe it suffered > a long pause of some kind (GC, de-scheduling, etc.) that caused > the OS to hang up the socket. > > I'm not super familiar with diagnosing these issues; I'm just > trying to point you in the right direction in case you wanted > to directly solve the given error instead of trying something > different. > > Thanks, > -John > > On Fri, Jan 17, 2020, at 23:33, John Roesler wrote: > > Hi Sushrut, > > > > That's frustrating... I haven't seen that before, but looking at the > error > > in combination with what you say happens without suppress makes > > me think there's a large volume of data involved here. Probably, > > the problem isn't specific to suppression, but it's just that the > > interactions on the suppression buffers are pushing the system over > > the edge. > > > > Counterintuitively, adding Suppression can actually increase your > > broker traffic because the Suppression buffer has to provide resiliency > > guarantees, so it needs its own changelog, even though the aggregation > > immediately before it _also_ has a changelog. > > > > Judging from your description, you were just trying to batch more, rather > > than specifically trying to get "final results" semantics for the window > > results. In that case, you might want to try removing the suppression > > and instead increasing the "CACHE_MAX_BYTES_BUFFERING_CONFIG" > > and "COMMIT_INTERVAL_MS_CONFIG" configurations. > > > > Hope this helps, > > -John > > > > On Fri, Jan 17, 2020, at 22:02, Sushrut Shivaswamy wrote: > > > Hey, > > > > > > I'm building a streams application where I'm trying to aggregate a > stream > > > of events > > > and getting a list of events per key. > > > `eventStream > > > .groupByKey(Grouped.with(Serdes.String(), eventSerde)) > > > > .windowedBy(TimeWindows.of(Duration.ofMillis(50)).grace(Duration.ofMillis(1))) > > > .aggregate( > > > ArrayList::new, (eent, accum) -> { > > > accum.add(event); > > > return accum; > > > }) > > > .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded())) > > > .toStream() > > > .map((windowedKey, value) -> new KeyValue<String, > > > List<Event>>(windowedKey.key(), value)) > > > .map(eventProcessor::processEventsWindow) > > > .to("event-window-chunks-queue", Produced.with(Serdes.String(), > > > eventListSerde))` > > > > > > As you can see I'm grouping events by key and capturing windowed lists > of > > > events for further processing. > > > To be able to process the list of events per key in chunks I added > > > `suppress()`. > > > This does not seem to work though. > > > I get this error multiple times: > > > `Got error produce response with correlation id 5 on topic-partition > > > app-test143-KTABLE-SUPPRESS-STATE-STORE-0000000016-changelog-0, > retrying > > > (2147483646 attempts left). Error: NETWORK_EXCEPTION > > > WARN org.apache.kafka.clients.producer.internals.Sender - Received > invalid > > > metadata error in produce request on partition > > > shoonya-test143-KTABLE-SUPPRESS-STATE-STORE-0000000016-changelog-0 due > to > > > org.apache.kafka.common.errors.NetworkException: The server > disconnected > > > before a response was received.. Going to request metadata update now` > > > > > > When I comment out the suppress() line it works fine but I get a large > > > number of events in a list while processing chunks since it does not > > > suppress already evaluated chunks. > > > Can anyone help me out with what could be happening here? > > > > > > Regards, > > > Sushrut > > > > > >