Ah, I should add, if you actually want to use suppression, or
you need to resolve a similar error message in the future, you
probably need to tweak the batch sizes and/or timeout configs
of the various clients, and maybe the server as well.

That error message kind of sounds like the server went silent
long enough that the http session expired, or maybe it suffered
a long pause of some kind (GC, de-scheduling, etc.) that caused
the OS to hang up the socket.

I'm not super familiar with diagnosing these issues; I'm just 
trying to point you in the right direction in case you wanted
to directly solve the given error instead of trying something 
different.

Thanks,
-John

On Fri, Jan 17, 2020, at 23:33, John Roesler wrote:
> Hi Sushrut,
> 
> That's frustrating... I haven't seen that before, but looking at the error
> in combination with what you say happens without suppress makes
> me think there's a large volume of data involved here. Probably,
> the problem isn't specific to suppression, but it's just that the
> interactions on the suppression buffers are pushing the system over
> the edge.
> 
> Counterintuitively, adding Suppression can actually increase your
> broker traffic because the Suppression buffer has to provide resiliency
> guarantees, so it needs its own changelog, even though the aggregation
> immediately before it _also_ has a changelog.
> 
> Judging from your description, you were just trying to batch more, rather
> than specifically trying to get "final results" semantics for the window
> results. In that case, you might want to try removing the suppression
> and instead increasing the "CACHE_MAX_BYTES_BUFFERING_CONFIG"
> and "COMMIT_INTERVAL_MS_CONFIG" configurations.
> 
> Hope this helps,
> -John
> 
> On Fri, Jan 17, 2020, at 22:02, Sushrut Shivaswamy wrote:
> > Hey,
> > 
> > I'm building a streams application where I'm trying to aggregate a stream
> > of events
> > and getting a list of events per key.
> > `eventStream
> > .groupByKey(Grouped.with(Serdes.String(), eventSerde))
> > .windowedBy(TimeWindows.of(Duration.ofMillis(50)).grace(Duration.ofMillis(1)))
> > .aggregate(
> >     ArrayList::new, (eent, accum) -> {
> >         accum.add(event);
> >         return accum;
> > })
> > .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
> > .toStream()
> > .map((windowedKey, value) -> new KeyValue<String,
> > List<Event>>(windowedKey.key(), value))
> > .map(eventProcessor::processEventsWindow)
> > .to("event-window-chunks-queue", Produced.with(Serdes.String(),
> > eventListSerde))`
> > 
> > As you can see I'm grouping events by key and capturing windowed lists of
> > events for further processing.
> > To be able to process the list of events per key in chunks I added
> > `suppress()`.
> > This does not seem to work though.
> > I get this error multiple times:
> > `Got error produce response with correlation id 5 on topic-partition
> > app-test143-KTABLE-SUPPRESS-STATE-STORE-0000000016-changelog-0, retrying
> > (2147483646 attempts left). Error: NETWORK_EXCEPTION
> > WARN org.apache.kafka.clients.producer.internals.Sender - Received invalid
> > metadata error in produce request on partition
> > shoonya-test143-KTABLE-SUPPRESS-STATE-STORE-0000000016-changelog-0 due to
> > org.apache.kafka.common.errors.NetworkException: The server disconnected
> > before a response was received.. Going to request metadata update now`
> > 
> > When I comment out the suppress() line it works fine but I get a large
> > number of events in a list while processing chunks since it does not
> > suppress already evaluated chunks.
> > Can anyone help me out with what could be happening here?
> > 
> > Regards,
> > Sushrut
> >
>

Reply via email to