Hi!

We have a rather simple Flink job which has the main purpose is
concatenating events read from Kafka and outputting them in session windows
with a gap of 1 minute, 1 minute out of order and 1h idleness setting:
[image: Screenshot 2023-08-07 at 13.44.09.png]
The problem we are facing is that we sometimes see it starts to struggle
and it seems it all starts with the following message appearing in the
source operator:

[Consumer clientId=sessionizer, groupId=sessionizer] Error sending fetch
request (sessionId=1914084170, epoch=8) to node 2: [Consumer
clientId=sessionizer, groupId=sessionizer] Cancelled in-flight FETCH
request with correlation id 105760966 due to node 2 being disconnected
(elapsed time since creation: 31311ms, elapsed time since send: 31311ms,
request timeout: 30000ms) [Consumer clientId=sessionizer,
groupId=sessionizer] Disconnecting from node 2 due to request timeout.

What happens right after this is mind-puzzling. In theory, we understand
that issues consuming from some partitions (as the error above seems to
suggest), should not influence how watermarks are handled - ie. the
watermark will not move forward, thus this should not cause any sessions to
be closed prematurely.

However, what we see is *the opposite* - shortly after (a matter of 1/2
minutes) the error we see a spike in window sessions being closed, just as
if Flink moved forward with the time (as tracked by watermark) but because
of some events missing (not consumed), it decided to close a lot of the
windows as the sessions were not getting any events. But how is this
possible, if at all? Can anyone think of any (even remotely) possible
explanation?

When this happens, we then see an expected turn of following events - a lot
of data gets produced to the sink, we run into Kafka producer quota limits
we have defined, this then puts the backpressure and we see a lag. The
Kafka errors disappear after 15-20 minutes and the situation goes back to
normal. However, some corrupted data gets produced whenever this happens.

This is Flink 1.15.2 running in AWS as Kinesis Data Analytics. There are
144 partitions on the input, parallelism 72, we use Kafka msg event
timestamps (as set by the producer). We've seen it before with 72
partitions and parallelism of 72.

-- 
Piotr Domagalski

Reply via email to