[ https://issues.apache.org/jira/browse/FLINK-30514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Flink Jira Bot updated FLINK-30514: ----------------------------------- Labels: auto-deprioritized-major pull-request-available (was: pull-request-available stale-major) Priority: Minor (was: Major) This issue was labeled "stale-major" 7 days ago and has not received any updates so it is being deprioritized. If this ticket is actually Major, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > HybridSource savepoint recovery sequence > ---------------------------------------- > > Key: FLINK-30514 > URL: https://issues.apache.org/jira/browse/FLINK-30514 > Project: Flink > Issue Type: Bug > Components: Connectors / FileSystem > Affects Versions: 1.16.0, 1.15.2, 1.15.3 > Reporter: Denis Golovachev > Priority: Minor > Labels: auto-deprioritized-major, pull-request-available > > {{org.apache.flink.connector.base.source.hybrid.HybridSourceReader}} > accumulates splits during recovery in > {{{}org.apache.flink.connector.base.source.hybrid.HybridSourceReader#restoredSplits{}}}. > As a next step it creates a reader and pushes all {{recoveredSplits to}} the > reader. > {{org.apache.flink.connector.base.source.hybrid.HybridSourceReader#setCurrentReader}} > Instantiation sequence of the {{setCurrentReader}} is following > - {{reader.start()}} > - {{reader.addSplits()}} > Seems like it doesn't work if we use {{FileSourceReader}} as an underlying > reader. > {{FileSourceReader#start()}} method checks if any splits are available to > read and executes {{sendSplitRequest}} if empty. Current > {{HybridSourceReader}} recovery sequence is not aligned with this. > So, every time we recover we immediately jump to the next splits. > Let me show you some logs. In this experiment job should have started with > files inside the 1000000 bucket but jumped to the bucket number 2000000 > Job Manager > {code:java} > 2022-12-27 13:38:47.155 StaticFileSplitEnumerator - Assigned split to > subtask 1 : FileSourceSplit: > s3a://bucket/2000000/part-00001-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet > [0, 97489087) hosts=[localhost] ID=0000000032 position=null > 2022-12-27 13:38:47.156 StaticFileSplitEnumerator - Assigned split to > subtask 9 : FileSourceSplit: > s3a://bucket/2000000/part-00002-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet > [0, 97342071) hosts=[localhost] ID=0000000033 position=null > 2022-12-27 13:38:47.156 StaticFileSplitEnumerator - Assigned split to > subtask 6 : FileSourceSplit: > s3a://bucket/2000000/part-00000-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet > [0, 97377047) hosts=[localhost] ID=0000000031 position=null > 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to > subtask 5 : FileSourceSplit: > s3a://bucket/2000000/part-00003-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet > [0, 97406878) hosts=[localhost] ID=0000000034 position=null > 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to > subtask 2 : FileSourceSplit: > s3a://bucket/2000000/part-00009-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet > [0, 97536205) hosts=[localhost] ID=0000000040 position=null > 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to > subtask 4 : FileSourceSplit: > s3a://bucket/2000000/part-00004-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet > [0, 97420601) hosts=[localhost] ID=0000000035 position=null > 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to > subtask 8 : FileSourceSplit: > s3a://bucket/2000000/part-00005-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet > [0, 97472256) hosts=[localhost] ID=0000000036 position=null > 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to > subtask 3 : FileSourceSplit: > s3a://bucket/2000000/part-00006-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet > [0, 97495880) hosts=[localhost] ID=0000000037 position=null > 2022-12-27 13:38:47.157 StaticFileSplitEnumerator - Assigned split to > subtask 0 : FileSourceSplit: > s3a://bucket/2000000/part-00007-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet > [0, 97389425) hosts=[localhost] ID=0000000038 position=null > 2022-12-27 13:38:47.158 StaticFileSplitEnumerator - Assigned split to > subtask 7 : FileSourceSplit: > s3a://bucket/2000000/part-00008-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet > [0, 97428709) hosts=[localhost] ID=0000000039 position=null > {code} > Task Manager > {code:java} > 2246:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader: > [FileSourceSplit: > s3a://bucket/1000000/part-00007-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet > [0, 79887236) hosts=[localhost] ID=0000000018 > position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226029] > 2247:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader: > [FileSourceSplit: > s3a://bucket/1000000/part-00000-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet > [0, 79987191) hosts=[localhost] ID=0000000011 > position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226030] > 2248:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader: > [FileSourceSplit: > s3a://bucket/1000000/part-00009-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet > [0, 80247830) hosts=[localhost] ID=0000000020 > position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226535] > 2249:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader: > [FileSourceSplit: > s3a://bucket/1000000/part-00004-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet > [0, 80055663) hosts=[localhost] ID=0000000015 > position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226712] > 2250:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader: > [FileSourceSplit: > s3a://bucket/1000000/part-00005-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet > [0, 80022187) hosts=[localhost] ID=0000000016 > position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226346] > 2251:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader: > [FileSourceSplit: > s3a://bucket/1000000/part-00006-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet > [0, 80109242) hosts=[localhost] ID=0000000017 > position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=227284] > 2252:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader: > [FileSourceSplit: > s3a://bucket/1000000/part-00001-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet > [0, 79980911) hosts=[localhost] ID=0000000012 > position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226429] > 2253:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader: > [FileSourceSplit: > s3a://bucket/1000000/part-00003-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet > [0, 79996693) hosts=[localhost] ID=0000000014 > position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=227154] > 2254:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader: > [FileSourceSplit: > s3a://bucket/1000000/part-00002-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet > [0, 80040476) hosts=[localhost] ID=0000000013 > position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=225920] > 2255:2022-12-27 13:38:47.110 SourceReaderBase - Adding split(s) to reader: > [FileSourceSplit: > s3a://bucket/1000000/part-00008-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet > [0, 79986997) hosts=[localhost] ID=0000000019 > position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226278] > 2265:2022-12-27 13:38:47.115 FileSourceSplitReader - Handling split change > SplitAddition:[[FileSourceSplit: > s3a://bucket/1000000/part-00006-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet > [0, 80109242) hosts=[localhost] ID=0000000017 > position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=227284]] > 2266:2022-12-27 13:38:47.115 FileSourceSplitReader - Handling split change > SplitAddition:[[FileSourceSplit: > s3a://bucket/1000000/part-00004-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet > [0, 80055663) hosts=[localhost] ID=0000000015 > position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226712]] > 2267:2022-12-27 13:38:47.115 FileSourceSplitReader - Handling split change > SplitAddition:[[FileSourceSplit: > s3a://bucket/1000000/part-00005-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet > [0, 80022187) hosts=[localhost] ID=0000000016 > position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226346]] > 2268:2022-12-27 13:38:47.115 FileSourceSplitReader - Handling split change > SplitAddition:[[FileSourceSplit: > s3a://bucket/1000000/part-00003-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet > [0, 79996693) hosts=[localhost] ID=0000000014 > position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=227154]] > 2269:2022-12-27 13:38:47.115 FileSourceSplitReader - Handling split change > SplitAddition:[[FileSourceSplit: > s3a://bucket/1000000/part-00009-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet > [0, 80247830) hosts=[localhost] ID=0000000020 > position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226535]] > 2270:2022-12-27 13:38:47.115 FileSourceSplitReader - Handling split change > SplitAddition:[[FileSourceSplit: > s3a://bucket/1000000/part-00002-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet > [0, 80040476) hosts=[localhost] ID=0000000013 > position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=225920]] > 2271:2022-12-27 13:38:47.115 FileSourceSplitReader - Handling split change > SplitAddition:[[FileSourceSplit: > s3a://bucket/1000000/part-00007-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet > [0, 79887236) hosts=[localhost] ID=0000000018 > position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226029]] > 2272:2022-12-27 13:38:47.115 FileSourceSplitReader - Handling split change > SplitAddition:[[FileSourceSplit: > s3a://bucket/1000000/part-00001-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet > [0, 79980911) hosts=[localhost] ID=0000000012 > position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226429]] > 2273:2022-12-27 13:38:47.115 FileSourceSplitReader - Handling split change > SplitAddition:[[FileSourceSplit: > s3a://bucket/1000000/part-00008-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet > [0, 79986997) hosts=[localhost] ID=0000000019 > position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226278]] > 2275:2022-12-27 13:38:47.116 FileSourceSplitReader - Handling split change > SplitAddition:[[FileSourceSplit: > s3a://bucket/1000000/part-00000-db38c407-84c0-486f-9d71-c214f142b1c8-c000.zstd.parquet > [0, 79987191) hosts=[localhost] ID=0000000011 > position=CheckpointedPosition: offset=NO_OFFSET, recordsToSkip=226030]] > 2281:2022-12-27 13:38:47.160 SourceReaderBase - Adding split(s) to reader: > [FileSourceSplit: > s3a://bucket/2000000/part-00009-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet > [0, 97536205) hosts=[localhost] ID=0000000040 position=null] > 2282:2022-12-27 13:38:47.160 SourceReaderBase - Adding split(s) to reader: > [FileSourceSplit: > s3a://bucket/2000000/part-00001-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet > [0, 97489087) hosts=[localhost] ID=0000000032 position=null] > 2283:2022-12-27 13:38:47.159 SourceReaderBase - Adding split(s) to reader: > [FileSourceSplit: > s3a://bucket/2000000/part-00002-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet > [0, 97342071) hosts=[localhost] ID=0000000033 position=null] > 2284:2022-12-27 13:38:47.160 SourceReaderBase - Adding split(s) to reader: > [FileSourceSplit: > s3a://bucket/2000000/part-00000-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet > [0, 97377047) hosts=[localhost] ID=0000000031 position=null] > 2285:2022-12-27 13:38:47.160 SourceReaderBase - Adding split(s) to reader: > [FileSourceSplit: > s3a://bucket/2000000/part-00003-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet > [0, 97406878) hosts=[localhost] ID=0000000034 position=null] > 2288:2022-12-27 13:38:47.161 SourceReaderBase - Adding split(s) to reader: > [FileSourceSplit: > s3a://bucket/2000000/part-00005-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet > [0, 97472256) hosts=[localhost] ID=0000000036 position=null] > 2289:2022-12-27 13:38:47.161 SourceReaderBase - Adding split(s) to reader: > [FileSourceSplit: > s3a://bucket/2000000/part-00004-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet > [0, 97420601) hosts=[localhost] ID=0000000035 position=null] > 2292:2022-12-27 13:38:47.162 SourceReaderBase - Adding split(s) to reader: > [FileSourceSplit: > s3a://bucket/2000000/part-00006-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet > [0, 97495880) hosts=[localhost] ID=0000000037 position=null] > 2293:2022-12-27 13:38:47.163 SourceReaderBase - Adding split(s) to reader: > [FileSourceSplit: > s3a://bucket/2000000/part-00007-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet > [0, 97389425) hosts=[localhost] ID=0000000038 position=null] > 2295:2022-12-27 13:38:47.163 SourceReaderBase - Adding split(s) to reader: > [FileSourceSplit: > s3a://bucket/2000000/part-00008-00cb73ef-346b-4e1e-a86a-007223ddf275-c000.zstd.parquet > [0, 97428709) hosts=[localhost] ID=0000000039 position=null] > {code} > Same logs in github gist: > [https://gist.github.com/WonderBeat/ddfdc852556997b09451d48766b54183] > This can be fixed with a simple reordering in the > {{{}HybridSourceReader#createReader{}}}. {{"reader.addSplits}} -> > {{reader.start"}} sounds logical, wdyt? -- This message was sent by Atlassian Jira (v8.20.10#820010)