Have you tried MirrorMaker 2's consumer offset translation feature? I have not used this myself, but it sounds like what you are looking for! https://issues.apache.org/jira/browse/KAFKA-9076 https://kafka.apache.org/26/javadoc/org/apache/kafka/connect/mirror/Checkpoint.html https://strimzi.io/blog/2020/03/30/introducing-mirrormaker2/
I tried to find some better docs to link for you, but that's the best I got :) It looks like there is just the Java API. On Wed, May 4, 2022 at 3:29 PM Hemanga Borah <borah.hema...@gmail.com> wrote: > Thank you for the suggestions, guys! > > @Austin Cawley-Edwards > Your idea is spot on! This approach would surely work. We could take a > savepoint of each of our apps, load it using state processor apis and > create another savepoint accounting for the delta on the offsets, and start > the app on the new cloud using this modified savepoint. > However, the solution will not be generic, and we have to do this for each > of our applications. This can be quite cumbersome as we have several > applications (around 25). > > We are thinking of overriding the FlinkKafkaConsumerBase to account for > the offset deltas during the start-up of any app. Do you think it is safe > to do that? Is there a better way of doing this? > > @Schwalbe Matthias > Thank you for your suggestion. We do use exactly-once semantics, but, our > apps can tolerate a few duplicates in rare cases like this one where we are > migrating clouds. However, your suggestion is really helpful and we will > use it in case some of the apps cannot tolerate duplicate data. > > > On Wed, May 4, 2022 at 12:00 AM Schwalbe Matthias < > matthias.schwa...@viseca.ch> wrote: > >> Hello Hemanga, >> >> >> >> MirrorMaker can cause havoc in many respects, for one, it does not have >> strict exactly-once.semantics… >> >> >> >> The way I would tackle this problem (and have done in similar >> situaltions): >> >> >> >> - For the source topics that need to be have exactly-once-semantics >> and that are not intrinsically idempotent: >> - Add one extra operator after the source that deduplicates events by >> unique id for a rolling time range (on the source cloud provider) >> - Take a savepoint after the rolling time-range has passed (at least >> once completely) >> - Move your job to the target cloud provider >> - Reconfigure the resp. source with a new kafka consumer group.id, >> - Change the uid() of the resp. kafka source, >> - Configure start-by-timestamp for the resp. source with a timestamp >> that lies within the rolling time range (of above) >> - Configure the job to ignore recovery for state that does not have >> a corresponding operator in the job (the previous kafka source uid()s) >> - Start the job on new cloud provider, wait for it to pick >> up/back-fill >> - Take a savepoint >> - Remove deduplication operator if that causes too much >> load/latency/whatever >> >> >> >> This scheme sounds more complicated than it really is … and has saved my >> sanity quite a number of times 😊 >> >> >> >> Good luck and ready to answer more details >> >> >> >> Thias >> >> >> >> *From:* Hemanga Borah <borah.hema...@gmail.com> >> *Sent:* Tuesday, May 3, 2022 3:12 AM >> *To:* user@flink.apache.org >> *Subject:* Migrating Flink apps across cloud with state >> >> >> >> Hello, >> We are attempting to port our Flink applications from one cloud provider >> to another. >> >> These Flink applications consume data from Kafka topics and output to >> various destinations (Kafka or databases). The applications have states >> stored in them. Some of these stored states are aggregations, for example, >> at times we store hours (or days) worth of data to aggregate over time. >> Some other applications have cached information for data enrichment, for >> example, we store data in Flink state for days, so that we can join them >> with newly arrived data. The amount of data on the input topics is a lot, >> and it will be expensive to reprocess the data from the beginning of the >> topic. >> >> As such, we want to retain the state of the application when we move to >> a different cloud provider so that we can retain the aggregations and >> cache, and do not have to start from the beginning of the input topics. >> >> We are replicating the Kafka topics using MirrorMaker 2. This is our >> procedure: >> >> - Replicate the input topics of each Flink application from source >> cloud to destination cloud. >> - Take a savepoint of the Flink application on the source cloud >> provider. >> - Start the Flink application on the destination cloud provider using >> the savepoint from the source cloud provider. >> >> >> However, this does not work as we want because there is a difference in >> offset in the new topics in the new cloud provider (because of MirrorMaker >> implementation). The offsets of the new topic do not match the ones stored >> on the Flink savepoint, hence, Flink cannot map to the offsets of the new >> topic during startup. >> >> Has anyone tried to move clouds while retaining the Flink state? >> >> Thanks, >> Hemanga >> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und >> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die >> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, >> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und >> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir >> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie >> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung >> dieser Informationen ist streng verboten. >> >> This message is intended only for the named recipient and may contain >> confidential or privileged information. As the confidentiality of email >> communication cannot be guaranteed, we do not accept any responsibility for >> the confidentiality and the intactness of this message. If you have >> received it in error, please advise the sender by return e-mail and delete >> this message and any attachments. Any unauthorised use or dissemination of >> this information is strictly prohibited. >> >