Hi Clayton, 

Could you also help provide the topology of the job?

Also, if convenient could you also have a look at 
the back-pressure status of each node, we could
then locate which node are getting slowly and might
cause the lag.

Best,
Yun


------------------------------------------------------------------
From:Clayton Wohl <claytonw...@gmail.com>
Send Time:2022 May 26 (Thu.) 00:31
To:user <user@flink.apache.org>
Subject:Flink 1.14.4 -> 1.15.0 Upgrade Problem

I have a Flink job that has been running with Flink 1.14.4 perfectly for a few 
months.

I tried upgrading to Flink 1.15.0. There are no error messages or exceptions, 
it runs perfectly fine for several hours, but after a few hours the Flink app 
starts to lag in processing an input Kafka topic. I can see the lag grow 
linearly in my Grafana dashboards that track Kafka lag. The lag continues to 
grow indefinitely until I manually restart the Flink job, then the Flink job 
will catch up with old data, the lag will drop to zero, the application will 
run fine for several hours, and then the lag issue will happen again and lag 
will steadily grow until I manually restart the Flink job.

When I revert the application back to Flink 1.14.4, this lag issue completely 
goes away. I see no runtime errors or exceptions.

A few quick environment details:
- The Kafka brokers are running Kafka 2.8.1
- The Flink app is running on Kubernetes with the Spotify Flink Operator
- The Flink code is Java using the newer KafkaSource/KafkaSink API, not the 
older KafkaConsumer/KafkaProduer API.

The Flink app consumes from seven input Kafka topics, and for each distinct 
input topic, writes output values to a distinct output topic. Most of the 
processing happens within a RichAsyncFunction which does some processing 
against an external database. The lag issue mentioned here happens on different 
topics. And if I let the app run long enough, it will happen on multiple 
topics. Also, when the lag issue is happening, the app is still processing 
records on the affected topics. For some reason it's processing fewer record 
slower than the incoming message rate, which is the definition of lag. But 
clearly, the lag isn't caused by resources, but by a software bug within Flink.

I intend to keep this job running Flink 1.14.4 until a Flink 1.15.1 patch comes 
out that supposedly addresses this issue. This job is not using or requiring 
any new Flink 1.15.0 functionality. However, we prefer to use the newest 
versions when we can. Switching Flink versions is just changing Maven 
dependencies, changing the base Flink Docker image version, and the Flink 
version tag specified to the Kubernetes Spotify Operator.

I was hoping this report would help the flink developers with a heads up that 
there is a new bug introduced in 1.15.0. If there is anything I should try, let 
me know. Thanks :)

Reply via email to