Thank you for the suggestions, guys!

@Austin Cawley-Edwards
Your idea is spot on! This approach would surely work. We could take a
savepoint of each of our apps, load it using state processor apis and
create another savepoint accounting for the delta on the offsets, and start
the app on the new cloud using this modified savepoint.
However, the solution will not be generic, and we have to do this for each
of our applications. This can be quite cumbersome as we have several
applications (around 25).

We are thinking of overriding the FlinkKafkaConsumerBase to account for the
offset deltas during the start-up of any app. Do you think it is safe to do
that? Is there a better way of doing this?

@Schwalbe Matthias
Thank you for your suggestion. We do use exactly-once semantics, but, our
apps can tolerate a few duplicates in rare cases like this one where we are
migrating clouds. However, your suggestion is really helpful and we will
use it in case some of the apps cannot tolerate duplicate data.


On Wed, May 4, 2022 at 12:00 AM Schwalbe Matthias <
matthias.schwa...@viseca.ch> wrote:

> Hello Hemanga,
>
>
>
> MirrorMaker can cause havoc in many respects, for one, it does not have
> strict exactly-once.semantics…
>
>
>
> The way I would tackle this problem (and have done in similar situaltions):
>
>
>
>    - For the source topics that need to be have exactly-once-semantics
>    and that are not intrinsically idempotent:
>    - Add one extra operator after the source that deduplicates events by
>    unique id for a rolling time range (on the source cloud provider)
>    - Take a savepoint after the rolling time-range has passed (at least
>    once completely)
>    - Move your job to the target cloud provider
>    - Reconfigure the resp. source with a new kafka consumer group.id,
>    - Change the uid() of the resp. kafka source,
>    - Configure start-by-timestamp for the resp. source with a timestamp
>    that lies within the rolling time range (of above)
>    - Configure the job to ignore  recovery for state that does not have a
>    corresponding operator in the job (the previous kafka source uid()s)
>    - Start the job on new cloud provider, wait for it to pick up/back-fill
>    - Take a savepoint
>    - Remove deduplication operator if that causes too much
>    load/latency/whatever
>
>
>
> This scheme sounds more complicated than it really is … and has saved my
> sanity quite a number of times 😊
>
>
>
> Good luck and ready to answer more details
>
>
>
> Thias
>
>
>
> *From:* Hemanga Borah <borah.hema...@gmail.com>
> *Sent:* Tuesday, May 3, 2022 3:12 AM
> *To:* user@flink.apache.org
> *Subject:* Migrating Flink apps across cloud with state
>
>
>
> Hello,
>  We are attempting to port our Flink applications from one cloud provider
> to another.
>
>  These Flink applications consume data from Kafka topics and output to
> various destinations (Kafka or databases). The applications have states
> stored in them. Some of these stored states are aggregations, for example,
> at times we store hours (or days) worth of data to aggregate over time.
> Some other applications have cached information for data enrichment, for
> example, we store data in Flink state for days, so that we can join them
> with newly arrived data. The amount of data on the input topics is a lot,
> and it will be expensive to reprocess the data from the beginning of the
> topic.
>
>  As such, we want to retain the state of the application when we move to a
> different cloud provider so that we can retain the aggregations and cache,
> and do not have to start from the beginning of the input topics.
>
>  We are replicating the Kafka topics using MirrorMaker 2. This is our
> procedure:
>
>    - Replicate the input topics of each Flink application from source
>    cloud to destination cloud.
>    - Take a savepoint of the Flink application on the source cloud
>    provider.
>    - Start the Flink application on the destination cloud provider using
>    the savepoint from the source cloud provider.
>
>
> However, this does not work as we want because there is a difference in
> offset in the new topics in the new cloud provider (because of MirrorMaker
> implementation). The offsets of the new topic do not match the ones stored
> on the Flink savepoint, hence, Flink cannot map to the offsets of the new
> topic during startup.
>
> Has anyone tried to move clouds while retaining the Flink state?
>
> Thanks,
> Hemanga
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

Reply via email to