Hi all,

I ran into issue where unmodified SQL failed to restore from savepoint
after change in default-parallelism. This seems to be quite a big
limitation it prevents Flink SQL user from scaling freely.

Would appreciate if anyone can dis/confirm that this is a bug before I
start disambiguating the effort to fix the bug.

Best regards
Keith Lee


On Mon, May 13, 2024 at 11:37 AM Keith Lee (Jira) <j...@apache.org> wrote:

> Keith Lee created FLINK-35336:
> ---------------------------------
>
>              Summary: SQL failed to restore from savepoint after change in
> default-parallelismFlink/Flink:User
>                  Key: FLINK-35336
>                  URL: https://issues.apache.org/jira/browse/FLINK-35336
>              Project: Flink
>           Issue Type: Bug
>           Components: Table SQL / Planner
>     Affects Versions: 1.18.1
>          Environment: Flink SQL Client, Flink 1.18.1 on MacOS
>             Reporter: Keith Lee
>
>
> After bumping 'table.exec.resource.default-parallelism' from 1 to 4, I am
> observing the following exception on restoring job from savepoint with an
> unmodified statement set.
>
> {quote}[ERROR] Could not execute SQL statement. Reason:
> java.lang.IllegalStateException: Failed to rollback to
> checkpoint/savepoint
> file:/tmp/flink-savepoints/savepoint-4392e6-575fa6b692ff. Cannot map
> checkpoint/savepoint state for operator 46ba9b22862c3bbe9373c6abee964b2a to
> the new program, because the operator is not available in the new program.
> If you want to allow to skip this, you can set the --allowNonRestoredState
> option on the CLI.{quote}
>
> When started without savepoints, the jobgraph differs for the jobs despite
> identical statements being ran.
>
> There are 2 operators when default parallelism is 1.
> {quote}A: Source: UserBehaviourKafkaSource[68] -> (Calc[69] ->
> StreamRecordTimestampInserter[70] -> StreamingFileWriter -> Sink: end,
> Calc[71] -> LocalWindowAggregate[72])
> B: GlobalWindowAggregate[74] -> Calc[75] -> Sink:
> CampaignAggregationsJDBC[76]{quote}
>
> Three operators when default parallelism is 4.
>
> {quote}A: Source: UserBehaviourKafkaSource[86] -> (Calc[87] ->
> StreamRecordTimestampInserter[88] -> StreamingFileWriter, Calc[89] ->
> LocalWindowAggregate[90])
> B: Sink: end
> C: GlobalWindowAggregate[92] -> Calc[93] -> Sink:
> CampaignAggregationsJDBC[94]{quote}
>
> Notice that the operator 'Sink: end' is separated out when parallelism is
> set to 4, causing the incompatibility in job graph. EXPLAIN PLAN did not
> show any difference between syntax tree, physical plan or execution plan.
>
>
> I have attempted various configurations in `table.optimizer.*`.
>
> Steps to reproduce
>
> {quote}SET 'table.exec.resource.default-parallelism' = '1';
> EXECUTE STATEMENT SET BEGIN
>     INSERT INTO UserErrorExperienceS3Sink (user_id, user_session,
> interaction_type, interaction_target, interaction_tags, event_date,
> event_hour, event_time)
>     SELECT
>         user_id,
>         user_session,
>         interaction_type,
>         interaction_target,
>         interaction_tags,
>         DATE_FORMAT(event_time , 'yyyy-MM-dd'),
>         DATE_FORMAT(event_time , 'HH'),
>         event_time
>     FROM UserBehaviourKafkaSource
>     WHERE
>         interaction_result Like '%ERROR%';
>
>     INSERT INTO CampaignAggregationsJDBC
>     SELECT
>         CONCAT_WS('/', interaction_tags, interaction_result,
> DATE_FORMAT(window_start, 'YYYY-MM-DD HH:mm:ss.SSS'),
> DATE_FORMAT(window_end, 'YYYY-MM-DD HH:mm:ss.SSS')) AS id,
>         interaction_tags as campaign,
>         interaction_result,
>         COUNT(*) AS interaction_count,
>         window_start,
>         window_end
>     FROM
>         TABLE(TUMBLE(TABLE UserBehaviourKafkaSource,
> DESCRIPTOR(event_time), INTERVAL '10' SECONDS))
>     GROUP BY window_start, window_end, interaction_tags,
> interaction_result;
> END;
>
> STOP JOB '<JOB_ID>' WITH SAVEPOINT;
> SET 'execution.savepoint.path' = '/<SAVEPOINT_PATH>/';
> SET 'table.exec.resource.default-parallelism' = '4';
>
> <Re-run DML at line 2>
> {quote}
>
> DQLs
>
>
> {quote}-- S3 Sink
> CREATE TABLE UserErrorExperienceS3Sink (
>   user_id BIGINT,
>   user_session STRING,
>   interaction_type STRING,
>   interaction_target STRING,
>   interaction_tags STRING,
>   event_date STRING,
>   event_hour STRING,
>   event_time TIMESTAMP(3) WITHOUT TIME ZONE)
> PARTITIONED BY (event_date, event_hour)
> WITH (
>   'connector' = 'filesystem',
>   'path' = 's3://<S3BUCKET>/userErrorExperience/',
>   'format' = 'json');
>
> -- Kafka Source
> ADD JAR
> 'file:///Users/leekei/Downloads/flink-sql-connector-kafka-3.1.0-1.18.jar';
> CREATE TABLE UserBehaviourKafkaSource (
>   user_id BIGINT,
>   user_session STRING,
>   interaction_type STRING,
>   interaction_target STRING,
>   interaction_tags STRING,
>   interaction_result STRING,
>   event_time TIMESTAMP(3) WITHOUT TIME ZONE METADATA FROM 'timestamp',
>   WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND)
> WITH (
>   'connector' = 'kafka',
>   'topic' = 'user_behaviour',
>   'properties.bootstrap.servers' = 'localhost:9092',
>   'properties.group.id' = 'demoGroup',
>   'scan.startup.mode' = 'latest-offset',
>   'format' = 'csv');
>
> -- PostgreSQL Source/Sink
> ADD JAR
> 'file:///Users/leekei/Downloads/flink-connector-jdbc-3.1.2-1.18.jar';
> ADD JAR 'file:///Users/leekei/Downloads/postgresql-42.7.3.jar';
> CREATE TABLE CampaignAggregationsJDBC (
>   id STRING,
>   campaign STRING,
>   interaction_result STRING,
>   interaction_count BIGINT,
>   window_start TIMESTAMP(3) WITHOUT TIME ZONE,
>   window_end TIMESTAMP(3) WITHOUT TIME ZONE)
> WITH (
>   'connector' = 'jdbc',
>   'url' = 'jdbc:postgresql://localhost:5432/postgres',
>   'table-name' = 'campaign_aggregations');
> {quote}
>
>
>
> --
> This message was sent by Atlassian Jira
> (v8.20.10#820010)
>

Reply via email to