Hi wei, I had a similar issue when I changed from FlinkKafkaConsumer to KafkaSource. In my case, I had the _metadata size increase inside the checkpoint. I have tried to rollback to the old flink version with the old checkpoint/savepoint, and then change the uid of the flink kafka source and sink operator. You can find the guideline here also https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#upgrading-to-the-latest-connector-version (Change the assigned uid of your source/sink. This makes sure the new source/sink doesn’t read state from the old source/sink operators.). And then redeploy new flink version and a flink job with a different uuid.
Best, Yang On Tue, 9 May 2023 at 04:09, Shammon FY <zjur...@gmail.com> wrote: > > Hi Wei, > > From the error message, I guess the reason for the issue is that the events > sent by SplitEnumerator to the source exceeds the default size of akka. You > can add the option 'akka.framesize' to set the akka packet size, or try to > decrease the event size. > > When you use 'FlinkKafkaConsumer' to read data from kafka, the source subtask > in TaskManager will connect to kafka and read data directly, but > 'KafkaSource' doesn't act as that. You can refer to FLIP-27 [1] to get more > detailed information about 'KafkaSource'. Simply speaking, the > 'SplitEnumerator' in JobManager will get splits from kafka and send them to > source subtask to read data. > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface > > Best, > Shammon FY > > > On Tue, May 9, 2023 at 2:37 AM Wei Hou via user <user@flink.apache.org> wrote: >> >> Hi Team, >> >> We hit an issue after we upgrade our job from Flink 1.12 to 1.15, there's a >> consistent akka.remote.OversizedPayloadException after job restarts: >> >> Transient association error (association remains live) >> akka.remote.OversizedPayloadException: Discarding oversized payload sent to >> Actor[akka.tcp://flink@xxx/user/rpc/taskmanager_0#-311495648]: max allowed >> size 10485760 bytes, actual size of encoded class >> org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 33670549 bytes. >> >> >> In the job, We changed the kafka consumer from FlinkKafkaConsumer to the new >> KafkaSource, and we noticed there's a stackoverflow >> (https://stackoverflow.com/questions/75363084/jobs-stuck-while-trying-to-restart-from-a-checkpoint) >> talking about _metadata file size kept doubling after that change. >> >> We later checked the _metadata for our own job, it did increase a lot for >> each restart, (around 128 MB when we hit the akka error). I'd like to see if >> there's a known root cause for this problem and what can we do here to >> eliminate it? >> >> >> Best, >> Wei