Nathan created FLINK-28622:
------------------------------

             Summary: Can't restore a flink job that uses Table API and Kafka 
connector with savepoint
                 Key: FLINK-28622
                 URL: https://issues.apache.org/jira/browse/FLINK-28622
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / API
    Affects Versions: 1.15.0
            Reporter: Nathan


I canceled a flink job with a savepoint, then tried to restore the job with the 
savepoint (just using the same jar file) but it said it cannot map savepoint 
state. I was just using the same jar file so I think the execution plan and 
generated operator ID should be the same? (Flink version has not been changed)
 
Related errors:
{code:java}
used by: java.util.concurrent.CompletionException: 
java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint 
file:/root/flink-savepoints/savepoint-5f285c-c2749410db07. Cannot map 
checkpoint/savepoint state for operator dd5fc1f28f42d777f818e2e8ea18c331 to the 
new program, because the operator is not available in the new program. If you 
want to allow to skip this, you can set the --allowNonRestoredState option on 
the CLI.

used by: java.lang.IllegalStateException: Failed to rollback to 
checkpoint/savepoint file:/root/flink-savepoints/savepoint-5f285c-c2749410db07. 
Cannot map checkpoint/savepoint state for operator 
dd5fc1f28f42d777f818e2e8ea18c331 to the new program, because the operator is 
not available in the new program. If you want to allow to skip this, you can 
set the --allowNonRestoredState option on the CLI. {code}
My code:
{code:java}
public final class FlinkJob {

    public static void main(String[] args) {

        final String JOB_NAME = "FlinkJob";

        final EnvironmentSettings settings = 
EnvironmentSettings.inStreamingMode();
        final TableEnvironment tEnv = TableEnvironment.create(settings);
        tEnv.getConfig().set("pipeline.name", JOB_NAME);
        tEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC"));

        tEnv.executeSql("CREATE TEMPORARY TABLE ApiLog (" +
                "  `_timestamp` TIMESTAMP(3) METADATA FROM 'timestamp' 
VIRTUAL," +
                "  `_partition` INT METADATA FROM 'partition' VIRTUAL," +
                "  `_offset` BIGINT METADATA FROM 'offset' VIRTUAL," +
                "  `Data` STRING," +
                "  `Action` STRING," +
                "  `ProduceDateTime` TIMESTAMP_LTZ(6)," +
                "  `OffSet` INT" +
                ") WITH (" +
                "  'connector' = 'kafka'," +
                "  'topic' = 'api.log'," +
                "  'properties.group.id' = 'flink'," +
                "  'properties.bootstrap.servers' = '<mykafkahost...>'," +
                "  'format' = 'json'," +
                "  'json.timestamp-format.standard' = 'ISO-8601'" +
                ")");

        tEnv.executeSql("CREATE TABLE print_table (" +
                " `_timestamp` TIMESTAMP(3)," +
                " `_partition` INT," +
                " `_offset` BIGINT," +
                " `Data` STRING," +
                " `Action` STRING," +
                " `ProduceDateTime` TIMESTAMP(6)," +
                " `OffSet` INT" +
                ") WITH ('connector' = 'print')");

        tEnv.executeSql("INSERT INTO print_table" +
                " SELECT * FROM ApiLog");

    }

} {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to