Matthias Pohl created FLINK-31143: ------------------------------------- Summary: Invalid request: offset doesn't match when restarting from a savepoint Key: FLINK-31143 URL: https://issues.apache.org/jira/browse/FLINK-31143 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.16.1, 1.15.3, 1.17.0 Reporter: Matthias Pohl
I tried to run the following case: {code:java} public static void main(String[] args) throws Exception { final String createTableQuery = "CREATE TABLE left_table (a int, c varchar) " + "WITH (" + " 'connector' = 'datagen', " + " 'rows-per-second' = '1', " + " 'fields.a.kind' = 'sequence', " + " 'fields.a.start' = '0', " + " 'fields.a.end' = '100000'" + ");"; final String selectQuery = "SELECT * FROM left_table;"; final Configuration initialConfig = new Configuration(); initialConfig.set(CoreOptions.DEFAULT_PARALLELISM, 1); final EnvironmentSettings initialSettings = EnvironmentSettings.newInstance() .inStreamingMode() .withConfiguration(initialConfig) .build(); final TableEnvironment initialTableEnv = TableEnvironment.create(initialSettings); initialTableEnv.executeSql(createTableQuery); final TableResult tableResult = initialTableEnv.sqlQuery(selectQuery).execute(); tableResult.await(); final String savepointPath; try (CloseableIterator<Row> tableResultIterator = tableResult.collect()) { // consume two results System.out.println(tableResultIterator.next()); System.out.println(tableResultIterator.next()); final JobClient jobClient = tableResult.getJobClient().orElseThrow(IllegalStateException::new); final File savepointDirectory = Files.createTempDir(); savepointPath = jobClient .stopWithSavepoint( true, savepointDirectory.getAbsolutePath(), SavepointFormatType.CANONICAL) .get(); } final SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath(savepointPath, true); final Configuration restartConfig = new Configuration(initialConfig); SavepointRestoreSettings.toConfiguration(savepointRestoreSettings, restartConfig); final EnvironmentSettings restartSettings = EnvironmentSettings.newInstance() .inStreamingMode() .withConfiguration(restartConfig) .build(); final TableEnvironment restartTableEnv = TableEnvironment.create(restartSettings); restartTableEnv.executeSql(createTableQuery); restartTableEnv.sqlQuery(selectQuery).execute().print(); } {code} Expected behavior: The job continues omitting the inital two records and starts printing results from 2 onwards. Observed behavior: No results are printed. The logs show that an invalid request was handled: {code} org.apache.flink.streaming.api.operators.collect.CollectSinkFunction [] - Invalid request. Received version = b497a74f-e85c-404b-8df3-1b4b1a0c2de1, offset = 0, while expected version = b497a74f-e85c-404b-8df3-1b4b1a0c2de1, offset = 1 {code} It looks like the right offset is not picked up from the savepoint (see [CollectSinkFunction:411|https://github.com/apache/flink/blob/5ae8cb0503449b07f76d0ab621c3e81734496b26/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java#L411]). -- This message was sent by Atlassian Jira (v8.20.10#820010)