Hi, if the processing logic is modified, then the representation of the topology would change. Consequently, the UIDs that are determined by the topological order might change as well, which could potentially cause state recovery to fail. For further details, you can refer to [1].
Currently, the Table API does not have the capability to customize UIDs. You might consider creating a feature request on JIRA [2], and then initiate a discussion on the dev mailing list. [1] https://github.com/apache/flink/blob/92eef24d4cc531d6474252ef909fc6d431285dd9/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java#L243C38-L243C62 [2] https://issues.apache.org/jira/projects/FLINK/issues/ -- Best! Xuyang 在 2024-05-08 06:13:29,"Talat Uyarer via user" <user@flink.apache.org> 写道: Hi Keith, When you add a new insert statement to your EXECUTE STATEMENT you change your job graph with independent two graphs.Unfortunately, Flink doesn't currently provide a way to directly force specific UIDs for operators through configuration or SQL hints. This is primarily due to how Flink's internal planner optimizes execution plans. Talat On Tue, May 7, 2024 at 8:42 AM Keith Lee <leekeiabstract...@gmail.com> wrote: Hello, I'm running into issues restoring from savepoint after changing SQL statement. [ERROR] Could not execute SQL statement. Reason: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint file:/tmp/flink-savepoints/savepoint-52c344-3bedb8204ff0. Cannot map checkpoint/savepoint state for operator cbc357ccb763df2852fee8c4fc7d55f2 to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI. EXECUTE STATEMENT SET was used, where an additional INSERT is added to the initial set with the first INSERT unmodified. I appreciate that under the hood, Flink does the planning and assigning of random uid for operator. Under this scenario, I'd expect the restore to be fine as the first statement remain unchanged, are there any ways around this. Also is it possible to force uid using configuration or SQL hint? Initial SQL statement set: EXECUTE STATEMENT SET BEGIN INSERT INTO UserErrorExperienceS3Sink (user_id, user_session, interaction_type, interaction_target, interaction_tags, event_time) SELECT user_id, user_session, interaction_type, interaction_target, interaction_tags, event_time FROM UserBehaviourKafkaSource WHERE interaction_result Like '%ERROR%'; END Updated SQL statement set: EXECUTE STATEMENT SET BEGIN INSERT INTO UserErrorExperienceS3Sink (user_id, user_session, interaction_type, interaction_target, interaction_tags, event_time) SELECT user_id, user_session, interaction_type, interaction_target, interaction_tags, event_time FROM UserBehaviourKafkaSource WHERE interaction_result Like '%ERROR%'; INSERT INTO CampaignAggregationsJDBC SELECT CONCAT_WS('/', interaction_tags, interaction_result, DATE_FORMAT(window_start, 'YYYY-MM-DD HH:mm:ss.SSS'), DATE_FORMAT(window_end, 'YYYY-MM-DD HH:mm:ss.SSS')) AS id, interaction_tags as campaign, interaction_result, COUNT(*) AS interaction_count, window_start, window_end FROM TABLE( TUMBLE(TABLE UserBehaviourKafkaSource, DESCRIPTOR(event_time), INTERVAL '10' SECONDS)) GROUP BY window_start, window_end, interaction_tags, interaction_result; END; This was done on Flink 1.18 SQL Client. Much appreciated Keith