Hi Thias,

Thanks a lot for your reply. It does help. We already have a custom File
Enumerator implementation, so I'm already working on a work-a-round to
limit it to only enumerate file splits we haven't processed yet.

On Mon, 9 Sept 2024 at 13:55, Schwalbe Matthias <matthias.schwa...@viseca.ch>
wrote:

> Hi Andreas,
>
>
>
> You made a correct observation:
>
> State Processor API does not cover coordinator state (in general) and more
> specific enumerator state, which sits in the jobmanager.
>
> It only covers state regarding taskmanagers.
>
> I’ve once made a PoC to extend the respective functionality, but it ended
> up as a big hack, … hence I didn’t continue …
>
>
>
> Hope that helps
>
>
>
> Thias
>
>
>
>
>
> *From:* Andreas Bube via user <user@flink.apache.org>
> *Sent:* Monday, September 9, 2024 10:41 AM
> *To:* user@flink.apache.org
> *Subject:* [External] State Processor API: Changing UID for FIleSource
> seems to remove file enumerator state
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hi Flink users,
>
> I’ve encountered an issue while trying to assign a UID to a File Source
> using the State Processor API. I’m unsure whether this is a bug or if I’m
> missing something. I’d appreciate any insights or suggestions.
> Scenario:
>
>    - A job with a File Source has already processed several files.
>    - The File Source is continuously monitoring an S3 bucket.
>    - A savepoint has been created.
>
> Steps:
>
>    1. I changed the UID of the File Source using the State Processor API
>    with the following code:
>
> 2.  SavepointWriter writer = SavepointWriter.fromExistingSavepoint(env, 
> inputSavepointPath);
>
> 3.  
> writer.changeOperatorIdentifier(OperatorIdentifier.forUidHash("bc764cd8ddf7a0cff126f51c16239658"),
>  OperatorIdentifier.forUid("FILE_SOURCE")); // The new UID is also updated in 
> the job
>
> 4.  writer.write(outputSavepointPath);
>
>
>    5. I updated the File Source UID in the job to match the new UID
>    ("FILE_SOURCE").
>    6. I restored the job from the modified savepoint.
>
> Issue:
>
> When restoring the job, the File Source reprocesses files that it had
> previously processed.
>
>
>
> I was expecting the File Source to resume processing from where it left
> off, instead it re-processed old files. While debugging, I observed the
> following:
>
>    - The state of the File Source’s file enumerator is stored in the
>    coordinator state (
>    org.apache.flink.runtime.checkpoint.OperatorState#coordinatorState).
>    - The State Processor API doesn’t seem to assign a value to
>    OperatorState#coordinatorState when changing the UID of File Source
>    operator. Specifically, see
>    org.apache.flink.state.api.output.OperatorSubtaskStateReducer#reduce.
>
> I’m not very familiar with Flink’s internals, so I’d appreciate any help
> on this issue. Is this the expected behavior? Is it a bug, or am I doing
> something wrong?
>
> Thanks in advance for your help!
>
>
>
>
>
> --
>
> [image: Logo]
>
> *Andreas Bube*
>
> *ENGINEER*
>
> E: ab...@toogoodtogo.com
>
> Landskronagade 66, 2100 København Ø, Denmark
>
> W:toogoodtogo.com <https://toogoodtogo.com/da>Privatlivspolitik
> <https://toogoodtogo.com/da/privacy-policy>
>
> [image: B Corp logo]
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

Reply via email to