[ https://issues.apache.org/jira/browse/FLINK-31143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17692269#comment-17692269 ]
Weijie Guo edited comment on FLINK-31143 at 2/22/23 4:36 PM: ------------------------------------------------------------- [~mapohl] Sorry, what I said before is not very clear. We resubmit a SQL job, which will create a new `JobClient`. Maybe it is not appropriate to call it the restart of `JobClient`. The key point is that this will create a new `CollectResultFetcher`. Since `CollectResultFetcher` is located on the client side, the interaction between `CollectDynamicSink` and it is somewhat similar to between external systems. A stronger mechanism is needed to ensure end-to-end exactly-once. In my opinion, it cannot guarantee fault tolerance at least from the current implementation. was (Author: weijie guo): [~mapohl] Sorry, what I said before is not very clear. We resubmit a SQL job, which will create a new `JobClient`. Maybe it is not appropriate to call it the restart of `JobClient`. The key point is that this will create a new `CollectResultFetcher`. In my opinion, it cannot guarantee fault tolerance at least from the current implementation. > Invalid request: offset doesn't match when restarting from a savepoint > ---------------------------------------------------------------------- > > Key: FLINK-31143 > URL: https://issues.apache.org/jira/browse/FLINK-31143 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Affects Versions: 1.17.0, 1.15.3, 1.16.1 > Reporter: Matthias Pohl > Priority: Critical > > I tried to run the following case: > {code:java} > public static void main(String[] args) throws Exception { > final String createTableQuery = > "CREATE TABLE left_table (a int, c varchar) " > + "WITH (" > + " 'connector' = 'datagen', " > + " 'rows-per-second' = '1', " > + " 'fields.a.kind' = 'sequence', " > + " 'fields.a.start' = '0', " > + " 'fields.a.end' = '100000'" > + ");"; > final String selectQuery = "SELECT * FROM left_table;"; > final Configuration initialConfig = new Configuration(); > initialConfig.set(CoreOptions.DEFAULT_PARALLELISM, 1); > final EnvironmentSettings initialSettings = > EnvironmentSettings.newInstance() > .inStreamingMode() > .withConfiguration(initialConfig) > .build(); > final TableEnvironment initialTableEnv = > TableEnvironment.create(initialSettings); > // create job and consume two results > initialTableEnv.executeSql(createTableQuery); > final TableResult tableResult = > initialTableEnv.sqlQuery(selectQuery).execute(); > tableResult.await(); > System.out.println(tableResultIterator.next()); > System.out.println(tableResultIterator.next()); > // stop job with savepoint > final String savepointPath; > try (CloseableIterator<Row> tableResultIterator = > tableResult.collect()) { > final JobClient jobClient = > > tableResult.getJobClient().orElseThrow(IllegalStateException::new); > final File savepointDirectory = Files.createTempDir(); > savepointPath = > jobClient > .stopWithSavepoint( > true, > savepointDirectory.getAbsolutePath(), > SavepointFormatType.CANONICAL) > .get(); > } > // restart the very same job from the savepoint > final SavepointRestoreSettings savepointRestoreSettings = > SavepointRestoreSettings.forPath(savepointPath, true); > final Configuration restartConfig = new Configuration(initialConfig); > SavepointRestoreSettings.toConfiguration(savepointRestoreSettings, > restartConfig); > final EnvironmentSettings restartSettings = > EnvironmentSettings.newInstance() > .inStreamingMode() > .withConfiguration(restartConfig) > .build(); > final TableEnvironment restartTableEnv = > TableEnvironment.create(restartSettings); > restartTableEnv.executeSql(createTableQuery); > restartTableEnv.sqlQuery(selectQuery).execute().print(); > } > {code} > h3. Expected behavior > The job continues omitting the inital two records and starts printing results > from 2 onwards. > h3. Observed behavior > No results are printed. The logs show that an invalid request was handled: > {code:java} > org.apache.flink.streaming.api.operators.collect.CollectSinkFunction [] - > Invalid request. Received version = b497a74f-e85c-404b-8df3-1b4b1a0c2de1, > offset = 0, while expected version = b497a74f-e85c-404b-8df3-1b4b1a0c2de1, > offset = 1 > {code} > It looks like the right offset is not picked up from the savepoint (see > [CollectSinkFunction:411|https://github.com/apache/flink/blob/5ae8cb0503449b07f76d0ab621c3e81734496b26/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectSinkFunction.java#L411]). -- This message was sent by Atlassian Jira (v8.20.10#820010)