Hi! We have a rather simple Flink job which has the main purpose is concatenating events read from Kafka and outputting them in session windows with a gap of 1 minute, 1 minute out of order and 1h idleness setting: [image: Screenshot 2023-08-07 at 13.44.09.png] The problem we are facing is that we sometimes see it starts to struggle and it seems it all starts with the following message appearing in the source operator:
[Consumer clientId=sessionizer, groupId=sessionizer] Error sending fetch request (sessionId=1914084170, epoch=8) to node 2: [Consumer clientId=sessionizer, groupId=sessionizer] Cancelled in-flight FETCH request with correlation id 105760966 due to node 2 being disconnected (elapsed time since creation: 31311ms, elapsed time since send: 31311ms, request timeout: 30000ms) [Consumer clientId=sessionizer, groupId=sessionizer] Disconnecting from node 2 due to request timeout. What happens right after this is mind-puzzling. In theory, we understand that issues consuming from some partitions (as the error above seems to suggest), should not influence how watermarks are handled - ie. the watermark will not move forward, thus this should not cause any sessions to be closed prematurely. However, what we see is *the opposite* - shortly after (a matter of 1/2 minutes) the error we see a spike in window sessions being closed, just as if Flink moved forward with the time (as tracked by watermark) but because of some events missing (not consumed), it decided to close a lot of the windows as the sessions were not getting any events. But how is this possible, if at all? Can anyone think of any (even remotely) possible explanation? When this happens, we then see an expected turn of following events - a lot of data gets produced to the sink, we run into Kafka producer quota limits we have defined, this then puts the backpressure and we see a lag. The Kafka errors disappear after 15-20 minutes and the situation goes back to normal. However, some corrupted data gets produced whenever this happens. This is Flink 1.15.2 running in AWS as Kinesis Data Analytics. There are 144 partitions on the input, parallelism 72, we use Kafka msg event timestamps (as set by the producer). We've seen it before with 72 partitions and parallelism of 72. -- Piotr Domagalski