[ https://issues.apache.org/jira/browse/FLINK-32658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Rui Fan updated FLINK-32658: ---------------------------- Description: When ignore-unclaimed-state is false and the old state is removed, flink should throw exception. It's similar to removing a stateful operator. This case occurs not only when the user removes state, but also when the operator is replaced. For example: upgrade FlinkKafkaConsumer to KafkaSource. All logical are not changed, so the operator id isn't changed. The KafkaSource cannot resume from the state of FlinkKafkaConsumer. However, flink job can start, and the state is silently discarded.(The old state is not physically discarded, it is still stored in the state backend, but the new code will never use it.) It also brings an additional problem: the KafkaSource will snapshot 2 states, it includes the new state of KafkaSource, and the union list state of FlinkKafkaConsumer. Whenever a job resumes from checkpoint, the union List state is inflated. Eventually the state size of kafka offset exceeded 200MB. !screenshot-1.png! was: When ignore-unclaimed-state is false and the old state is removed, flink should throw exception. It's similar with removing an stateful operator. This case occurs not only when the user removes state, but also when the operator is replaced. For example: upgrade FlinkKafkaConsumer to KafkaSource. All logical are not changed, so the operator id isn't changed. The KafkaSource cannot resume from the state of FlinkKafkaConsumer. However, flink job can start, and the state is silently discarded. It also brings an additional problem: the KafkaSource will snapshot 2 states, it includes the new state of KafkaSource, and the union list state of FlinkKafkaConsumer. Whenever a job resumes from checkpoint, the union List state is inflated. Eventually the state size of kafka offset exceeded 200MB. !screenshot-1.png! > State should not be silently removed when ignore-unclaimed-state is false > ------------------------------------------------------------------------- > > Key: FLINK-32658 > URL: https://issues.apache.org/jira/browse/FLINK-32658 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing > Affects Versions: 1.18.0, 1.17.1 > Reporter: Rui Fan > Assignee: Rui Fan > Priority: Major > Attachments: screenshot-1.png > > > When ignore-unclaimed-state is false and the old state is removed, flink > should throw exception. It's similar to removing a stateful operator. > This case occurs not only when the user removes state, but also when the > operator is replaced. > For example: upgrade FlinkKafkaConsumer to KafkaSource. All logical are not > changed, so the operator id isn't changed. The KafkaSource cannot resume from > the state of FlinkKafkaConsumer. However, flink job can start, and the state > is silently discarded.(The old state is not physically discarded, it is still > stored in the state backend, but the new code will never use it.) > It also brings an additional problem: the KafkaSource will snapshot 2 states, > it includes the new state of KafkaSource, and the union list state of > FlinkKafkaConsumer. Whenever a job resumes from checkpoint, the union List > state is inflated. Eventually the state size of kafka offset exceeded 200MB. > !screenshot-1.png! -- This message was sent by Atlassian Jira (v8.20.10#820010)