Hi Thias, Thanks a lot for your reply. It does help. We already have a custom File Enumerator implementation, so I'm already working on a work-a-round to limit it to only enumerate file splits we haven't processed yet.
On Mon, 9 Sept 2024 at 13:55, Schwalbe Matthias <matthias.schwa...@viseca.ch> wrote: > Hi Andreas, > > > > You made a correct observation: > > State Processor API does not cover coordinator state (in general) and more > specific enumerator state, which sits in the jobmanager. > > It only covers state regarding taskmanagers. > > I’ve once made a PoC to extend the respective functionality, but it ended > up as a big hack, … hence I didn’t continue … > > > > Hope that helps > > > > Thias > > > > > > *From:* Andreas Bube via user <user@flink.apache.org> > *Sent:* Monday, September 9, 2024 10:41 AM > *To:* user@flink.apache.org > *Subject:* [External] State Processor API: Changing UID for FIleSource > seems to remove file enumerator state > > > > ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠ > > > > Hi Flink users, > > I’ve encountered an issue while trying to assign a UID to a File Source > using the State Processor API. I’m unsure whether this is a bug or if I’m > missing something. I’d appreciate any insights or suggestions. > Scenario: > > - A job with a File Source has already processed several files. > - The File Source is continuously monitoring an S3 bucket. > - A savepoint has been created. > > Steps: > > 1. I changed the UID of the File Source using the State Processor API > with the following code: > > 2. SavepointWriter writer = SavepointWriter.fromExistingSavepoint(env, > inputSavepointPath); > > 3. > writer.changeOperatorIdentifier(OperatorIdentifier.forUidHash("bc764cd8ddf7a0cff126f51c16239658"), > OperatorIdentifier.forUid("FILE_SOURCE")); // The new UID is also updated in > the job > > 4. writer.write(outputSavepointPath); > > > 5. I updated the File Source UID in the job to match the new UID > ("FILE_SOURCE"). > 6. I restored the job from the modified savepoint. > > Issue: > > When restoring the job, the File Source reprocesses files that it had > previously processed. > > > > I was expecting the File Source to resume processing from where it left > off, instead it re-processed old files. While debugging, I observed the > following: > > - The state of the File Source’s file enumerator is stored in the > coordinator state ( > org.apache.flink.runtime.checkpoint.OperatorState#coordinatorState). > - The State Processor API doesn’t seem to assign a value to > OperatorState#coordinatorState when changing the UID of File Source > operator. Specifically, see > org.apache.flink.state.api.output.OperatorSubtaskStateReducer#reduce. > > I’m not very familiar with Flink’s internals, so I’d appreciate any help > on this issue. Is this the expected behavior? Is it a bug, or am I doing > something wrong? > > Thanks in advance for your help! > > > > > > -- > > [image: Logo] > > *Andreas Bube* > > *ENGINEER* > > E: ab...@toogoodtogo.com > > Landskronagade 66, 2100 København Ø, Denmark > > W:toogoodtogo.com <https://toogoodtogo.com/da>Privatlivspolitik > <https://toogoodtogo.com/da/privacy-policy> > > [image: B Corp logo] > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und > beinhaltet unter Umständen vertrauliche Mitteilungen. Da die > Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, > übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und > Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir > Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie > eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung > dieser Informationen ist streng verboten. > > This message is intended only for the named recipient and may contain > confidential or privileged information. As the confidentiality of email > communication cannot be guaranteed, we do not accept any responsibility for > the confidentiality and the intactness of this message. If you have > received it in error, please advise the sender by return e-mail and delete > this message and any attachments. Any unauthorised use or dissemination of > this information is strictly prohibited. >