Hi Xuyang, Thank you for anticipating my questions and pointing me to the right resources.
I've requested JIRA account, will create feature request once approved and then start discussion on dev mailing list. I am interested in contributing to this feature, would appreciate it if you can point me to any other relevant resources. Keith On Wed, May 8, 2024 at 2:55 AM Xuyang <xyzhong...@163.com> wrote: > Hi, if the processing logic is modified, then the representation of the > topology would change. Consequently, the UIDs that are determined by the > topological order might change as well, which could potentially cause state > recovery to fail. For further details, you can refer to [1]. > > Currently, the Table API does not have the capability to customize UIDs. > You might consider creating a feature request on JIRA [2], and then > initiate a discussion on the dev mailing list. > > > [1] > https://github.com/apache/flink/blob/92eef24d4cc531d6474252ef909fc6d431285dd9/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java#L243C38-L243C62 > > [2] https://issues.apache.org/jira/projects/FLINK/issues/ > > > -- > Best! > Xuyang > > > 在 2024-05-08 06:13:29,"Talat Uyarer via user" <user@flink.apache.org> 写道: > > Hi Keith, > > When you add a new insert statement to your EXECUTE STATEMENT you change > your job graph with independent two graphs.Unfortunately, Flink doesn't > currently provide a way to directly force specific UIDs for operators > through configuration or SQL hints. This is primarily due to how Flink's > internal planner optimizes execution plans. > > Talat > > > On Tue, May 7, 2024 at 8:42 AM Keith Lee <leekeiabstract...@gmail.com> > wrote: > >> Hello, >> >> I'm running into issues restoring from savepoint after changing SQL >> statement. >> >> [ERROR] Could not execute SQL statement. Reason: >>> java.lang.IllegalStateException: Failed to rollback to >>> checkpoint/savepoint >>> file:/tmp/flink-savepoints/savepoint-52c344-3bedb8204ff0. Cannot map >>> checkpoint/savepoint state for operator cbc357ccb763df2852fee8c4fc7d55f2 to >>> the new program, because the operator is not available in the new program. >>> If you want to allow to skip this, you can set the --allowNonRestoredState >>> option on the CLI. >> >> >> EXECUTE STATEMENT SET was used, where an additional INSERT is added to >> the initial set with the first INSERT unmodified. I appreciate that under >> the hood, Flink does the planning and assigning of random uid for operator. >> Under this scenario, I'd expect the restore to be fine as the first >> statement remain unchanged, are there any ways around this. Also is it >> possible to force uid using configuration or SQL hint? >> >> Initial SQL statement set: >> >> EXECUTE STATEMENT SET BEGIN >>> >>> INSERT INTO UserErrorExperienceS3Sink (user_id, user_session, >>> interaction_type, interaction_target, interaction_tags, event_time) SELECT >>> user_id, user_session, interaction_type, interaction_target, >>> interaction_tags, event_time FROM UserBehaviourKafkaSource WHERE >>> interaction_result Like '%ERROR%'; >>> >>> END >> >> >> Updated SQL statement set: >> >> EXECUTE STATEMENT SET BEGIN >>> >>> INSERT INTO UserErrorExperienceS3Sink (user_id, user_session, >>> interaction_type, interaction_target, interaction_tags, event_time) SELECT >>> user_id, user_session, interaction_type, interaction_target, >>> interaction_tags, event_time FROM UserBehaviourKafkaSource WHERE >>> interaction_result Like '%ERROR%'; >>> >>> INSERT INTO CampaignAggregationsJDBC >>> SELECT >>> CONCAT_WS('/', interaction_tags, interaction_result, >>> DATE_FORMAT(window_start, 'YYYY-MM-DD HH:mm:ss.SSS'), >>> DATE_FORMAT(window_end, 'YYYY-MM-DD HH:mm:ss.SSS')) AS id, >>> interaction_tags as campaign, >>> interaction_result, >>> COUNT(*) AS interaction_count, >>> window_start, >>> window_end >>> FROM TABLE( >>> TUMBLE(TABLE UserBehaviourKafkaSource, DESCRIPTOR(event_time), >>> INTERVAL '10' SECONDS)) >>> GROUP BY window_start, window_end, interaction_tags, >>> interaction_result; >>> >>> END; >> >> >> This was done on Flink 1.18 SQL Client. >> >> Much appreciated >> Keith >> >