Hi all, I ran into issue where unmodified SQL failed to restore from savepoint after change in default-parallelism. This seems to be quite a big limitation it prevents Flink SQL user from scaling freely.
Would appreciate if anyone can dis/confirm that this is a bug before I start disambiguating the effort to fix the bug. Best regards Keith Lee On Mon, May 13, 2024 at 11:37 AM Keith Lee (Jira) <j...@apache.org> wrote: > Keith Lee created FLINK-35336: > --------------------------------- > > Summary: SQL failed to restore from savepoint after change in > default-parallelismFlink/Flink:User > Key: FLINK-35336 > URL: https://issues.apache.org/jira/browse/FLINK-35336 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner > Affects Versions: 1.18.1 > Environment: Flink SQL Client, Flink 1.18.1 on MacOS > Reporter: Keith Lee > > > After bumping 'table.exec.resource.default-parallelism' from 1 to 4, I am > observing the following exception on restoring job from savepoint with an > unmodified statement set. > > {quote}[ERROR] Could not execute SQL statement. Reason: > java.lang.IllegalStateException: Failed to rollback to > checkpoint/savepoint > file:/tmp/flink-savepoints/savepoint-4392e6-575fa6b692ff. Cannot map > checkpoint/savepoint state for operator 46ba9b22862c3bbe9373c6abee964b2a to > the new program, because the operator is not available in the new program. > If you want to allow to skip this, you can set the --allowNonRestoredState > option on the CLI.{quote} > > When started without savepoints, the jobgraph differs for the jobs despite > identical statements being ran. > > There are 2 operators when default parallelism is 1. > {quote}A: Source: UserBehaviourKafkaSource[68] -> (Calc[69] -> > StreamRecordTimestampInserter[70] -> StreamingFileWriter -> Sink: end, > Calc[71] -> LocalWindowAggregate[72]) > B: GlobalWindowAggregate[74] -> Calc[75] -> Sink: > CampaignAggregationsJDBC[76]{quote} > > Three operators when default parallelism is 4. > > {quote}A: Source: UserBehaviourKafkaSource[86] -> (Calc[87] -> > StreamRecordTimestampInserter[88] -> StreamingFileWriter, Calc[89] -> > LocalWindowAggregate[90]) > B: Sink: end > C: GlobalWindowAggregate[92] -> Calc[93] -> Sink: > CampaignAggregationsJDBC[94]{quote} > > Notice that the operator 'Sink: end' is separated out when parallelism is > set to 4, causing the incompatibility in job graph. EXPLAIN PLAN did not > show any difference between syntax tree, physical plan or execution plan. > > > I have attempted various configurations in `table.optimizer.*`. > > Steps to reproduce > > {quote}SET 'table.exec.resource.default-parallelism' = '1'; > EXECUTE STATEMENT SET BEGIN > INSERT INTO UserErrorExperienceS3Sink (user_id, user_session, > interaction_type, interaction_target, interaction_tags, event_date, > event_hour, event_time) > SELECT > user_id, > user_session, > interaction_type, > interaction_target, > interaction_tags, > DATE_FORMAT(event_time , 'yyyy-MM-dd'), > DATE_FORMAT(event_time , 'HH'), > event_time > FROM UserBehaviourKafkaSource > WHERE > interaction_result Like '%ERROR%'; > > INSERT INTO CampaignAggregationsJDBC > SELECT > CONCAT_WS('/', interaction_tags, interaction_result, > DATE_FORMAT(window_start, 'YYYY-MM-DD HH:mm:ss.SSS'), > DATE_FORMAT(window_end, 'YYYY-MM-DD HH:mm:ss.SSS')) AS id, > interaction_tags as campaign, > interaction_result, > COUNT(*) AS interaction_count, > window_start, > window_end > FROM > TABLE(TUMBLE(TABLE UserBehaviourKafkaSource, > DESCRIPTOR(event_time), INTERVAL '10' SECONDS)) > GROUP BY window_start, window_end, interaction_tags, > interaction_result; > END; > > STOP JOB '<JOB_ID>' WITH SAVEPOINT; > SET 'execution.savepoint.path' = '/<SAVEPOINT_PATH>/'; > SET 'table.exec.resource.default-parallelism' = '4'; > > <Re-run DML at line 2> > {quote} > > DQLs > > > {quote}-- S3 Sink > CREATE TABLE UserErrorExperienceS3Sink ( > user_id BIGINT, > user_session STRING, > interaction_type STRING, > interaction_target STRING, > interaction_tags STRING, > event_date STRING, > event_hour STRING, > event_time TIMESTAMP(3) WITHOUT TIME ZONE) > PARTITIONED BY (event_date, event_hour) > WITH ( > 'connector' = 'filesystem', > 'path' = 's3://<S3BUCKET>/userErrorExperience/', > 'format' = 'json'); > > -- Kafka Source > ADD JAR > 'file:///Users/leekei/Downloads/flink-sql-connector-kafka-3.1.0-1.18.jar'; > CREATE TABLE UserBehaviourKafkaSource ( > user_id BIGINT, > user_session STRING, > interaction_type STRING, > interaction_target STRING, > interaction_tags STRING, > interaction_result STRING, > event_time TIMESTAMP(3) WITHOUT TIME ZONE METADATA FROM 'timestamp', > WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND) > WITH ( > 'connector' = 'kafka', > 'topic' = 'user_behaviour', > 'properties.bootstrap.servers' = 'localhost:9092', > 'properties.group.id' = 'demoGroup', > 'scan.startup.mode' = 'latest-offset', > 'format' = 'csv'); > > -- PostgreSQL Source/Sink > ADD JAR > 'file:///Users/leekei/Downloads/flink-connector-jdbc-3.1.2-1.18.jar'; > ADD JAR 'file:///Users/leekei/Downloads/postgresql-42.7.3.jar'; > CREATE TABLE CampaignAggregationsJDBC ( > id STRING, > campaign STRING, > interaction_result STRING, > interaction_count BIGINT, > window_start TIMESTAMP(3) WITHOUT TIME ZONE, > window_end TIMESTAMP(3) WITHOUT TIME ZONE) > WITH ( > 'connector' = 'jdbc', > 'url' = 'jdbc:postgresql://localhost:5432/postgres', > 'table-name' = 'campaign_aggregations'); > {quote} > > > > -- > This message was sent by Atlassian Jira > (v8.20.10#820010) >