Hi wei,

I had a similar issue when I changed from FlinkKafkaConsumer to
KafkaSource. In my case, I had the _metadata size increase inside the
checkpoint. I have tried to rollback to the old flink version with the
old checkpoint/savepoint, and then change the uid of the flink kafka
source and sink operator. You can find the guideline here also
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#upgrading-to-the-latest-connector-version
(Change the assigned uid of your source/sink. This makes sure the new
source/sink doesn’t read state from the old source/sink operators.).
And then redeploy new flink version and a flink job with a different
uuid.

Best,
Yang

On Tue, 9 May 2023 at 04:09, Shammon FY <zjur...@gmail.com> wrote:
>
> Hi Wei,
>
> From the error message, I guess the reason for the issue is that the events 
> sent by SplitEnumerator to the source exceeds the default size of akka. You 
> can add the option 'akka.framesize' to set the akka packet size, or try to 
> decrease the event size.
>
> When you use 'FlinkKafkaConsumer' to read data from kafka, the source subtask 
> in TaskManager will connect to kafka and read data directly, but 
> 'KafkaSource' doesn't act as that. You can refer to FLIP-27 [1] to get more 
> detailed information about 'KafkaSource'. Simply speaking, the 
> 'SplitEnumerator' in JobManager will get splits from kafka and send them to 
> source subtask to read data.
>
> [1] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>
> Best,
> Shammon FY
>
>
> On Tue, May 9, 2023 at 2:37 AM Wei Hou via user <user@flink.apache.org> wrote:
>>
>> Hi Team,
>>
>> We hit an issue after we upgrade our job from Flink 1.12 to 1.15,  there's a 
>> consistent akka.remote.OversizedPayloadException after job restarts:
>>
>> Transient association error (association remains live) 
>> akka.remote.OversizedPayloadException: Discarding oversized payload sent to 
>> Actor[akka.tcp://flink@xxx/user/rpc/taskmanager_0#-311495648]: max allowed 
>> size 10485760 bytes, actual size of encoded class 
>> org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 33670549 bytes.
>>
>>
>> In the job, We changed the kafka consumer from FlinkKafkaConsumer to the new 
>> KafkaSource, and we noticed there's a stackoverflow 
>> (https://stackoverflow.com/questions/75363084/jobs-stuck-while-trying-to-restart-from-a-checkpoint)
>>   talking about _metadata file size kept doubling after that change.
>>
>> We later checked the _metadata for our own job, it did increase a lot for 
>> each restart, (around 128 MB when we hit the akka error). I'd like to see if 
>> there's a known root cause for this problem and what can we do here to 
>> eliminate it?
>>
>>
>> Best,
>> Wei

Reply via email to