Hi Wei, >From the error message, I guess the reason for the issue is that the events sent by SplitEnumerator to the source exceeds the default size of akka. You can add the option 'akka.framesize' to set the akka packet size, or try to decrease the event size.
When you use 'FlinkKafkaConsumer' to read data from kafka, the source subtask in TaskManager will connect to kafka and read data directly, but 'KafkaSource' doesn't act as that. You can refer to FLIP-27 [1] to get more detailed information about 'KafkaSource'. Simply speaking, the 'SplitEnumerator' in JobManager will get splits from kafka and send them to source subtask to read data. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface Best, Shammon FY On Tue, May 9, 2023 at 2:37 AM Wei Hou via user <user@flink.apache.org> wrote: > Hi Team, > > We hit an issue after we upgrade our job from Flink 1.12 to 1.15, there's > a consistent akka.remote.OversizedPayloadException after job restarts: > > Transient association error (association remains live) > akka.remote.OversizedPayloadException: Discarding oversized payload sent to > Actor[akka.tcp://flink@xxx/user/rpc/taskmanager_0#-311495648]: max > allowed size 10485760 bytes, actual size of encoded class > org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 33670549 > bytes. > > > In the job, We changed the kafka consumer from FlinkKafkaConsumer to the > new KafkaSource, and we noticed there's a stackoverflow ( > https://stackoverflow.com/questions/75363084/jobs-stuck-while-trying-to-restart-from-a-checkpoint > ) talking about _metadata file size kept doubling after that change. > > We later checked the _metadata for our own job, it did increase a lot for > each restart, (around 128 MB when we hit the akka error). I'd like to see > if there's a known root cause for this problem and what can we do here to > eliminate it? > > > Best, > Wei >