Hi, Sorry for my previous slightly confusing response, please take a look at the response from Gordon.
Piotrek > On 2 Mar 2020, at 12:05, Kaymak, Tobias <tobias.kay...@ricardo.ch> wrote: > > Hi, > > let me refine my question: My pipeline is generated from Beam, so the Flink > pipeline is a translated Beam pipeline. When I update my Apache Beam pipeline > code, working with a snapshot in Flink to stop the pipeline is not an option, > as the snapshot will use the old representation of the the Flink pipeline > when resuming from that snapshot. > > Meaning that I am looking for a way to drain the pipeline cleanly and using > the last committed offset in Kafka to resume processing after I started it > again (launching it through Beam will regenerate the Flink pipeline and it > should resume at the offset where it left of, that is the latest committed > offset in Kafka). > > Can this be achieved with a cancel or stop of the Flink pipeline? > > Best, > Tobias > > On Mon, Mar 2, 2020 at 11:09 AM Piotr Nowojski <pi...@ververica.com > <mailto:pi...@ververica.com>> wrote: > Hi Tobi, > > No, FlinkKafkaConsumer is not using committed Kafka’s offsets for recovery. > Offsets where to start from are stored in the checkpoint itself. Updating the > offsets back to Kafka is an optional, purely cosmetic thing from the Flink’s > perspective, so the job will start from the correct offsets. > > However, if you for whatever the reason re-start the job from a > savepoint/checkpoint that’s not the latest one, this will violate > exactly-once guarantees - there will be some duplicated records committed two > times in the sinks, as simply some records would be processed and committed > twice. Committing happens on checkpoint, so if you are recovering to some > previous checkpoint, there is nothing Flink can do - some records were > already committed before. > > Piotrek > >> On 2 Mar 2020, at 10:12, Kaymak, Tobias <tobias.kay...@ricardo.ch >> <mailto:tobias.kay...@ricardo.ch>> wrote: >> >> Thank you Piotr! >> >> One last question - let's assume my source is a Kafka topic - if I stop via >> the CLI with a savepoint in Flink 1.9, but do not use that savepoint when >> restarting my job - the job would continue from the last offset that has >> been committed in Kafka and thus I would also not experience a loss of data >> in my sink. Is that correct? >> >> Best, >> Tobi >> >> On Fri, Feb 28, 2020 at 3:17 PM Piotr Nowojski <pi...@ververica.com >> <mailto:pi...@ververica.com>> wrote: >> Yes, that’s correct. There shouldn’t be any data loss. Stop with savepoint >> is a solution to make sure, that if you are stopping a job (either >> permanently or temporarily) that all of the results are published/committed >> to external systems before you actually stop the job. >> >> If you just cancel/kill/crash a job, in some rare cases (if a checkpoint was >> completing at the time cluster was crashing), some records might not be >> committed before the cancellation/kill/crash happened. Also note that >> doesn’t mean there is a data loss, just those records will be published once >> you restore your job from a checkpoint. If you want to stop the job >> permanently, that might not happen, hence we need stop with savepoint. >> >> Piotrek >> >>> On 28 Feb 2020, at 15:02, Kaymak, Tobias <tobias.kay...@ricardo.ch >>> <mailto:tobias.kay...@ricardo.ch>> wrote: >>> >>> Thank you! For understanding the matter: When I have a streaming pipeline >>> (reading from Kafka, writing somewhere) and I click "cancel" and after that >>> I restart the pipeline - I should not expect any data to be lost - is that >>> correct? >>> >>> Best, >>> Tobias >>> >>> On Fri, Feb 28, 2020 at 2:51 PM Piotr Nowojski <pi...@ververica.com >>> <mailto:pi...@ververica.com>> wrote: >>> Thanks for confirming that Yadong. I’ve created a ticket for that [1]. >>> >>> Piotrek >>> >>> [1] https://issues.apache.org/jira/browse/FLINK-16340 >>> <https://issues.apache.org/jira/browse/FLINK-16340> >>> >>>> On 28 Feb 2020, at 14:32, Yadong Xie <vthink...@gmail.com >>>> <mailto:vthink...@gmail.com>> wrote: >>>> >>>> Hi >>>> >>>> 1. the old stop button was removed in flink 1.9.0 since it could not work >>>> properly as I know >>>> 2. if we have the feature of the stop with savepoint, we could add it to >>>> the web UI, but it may still need some work on the rest API to support the >>>> new feature >>>> >>>> >>>> Best, >>>> Yadong >>>> >>>> >>>> Piotr Nowojski <pi...@ververica.com <mailto:pi...@ververica.com>> >>>> 于2020年2月28日周五 下午8:49写道: >>>> Hi, >>>> >>>> I’m not sure. Maybe Yadong (CC) will know more, but to the best of my >>>> knowledge and research: >>>> >>>> 1. In Flink 1.9 we switched from the old webUI to a new one, that probably >>>> explains the difference you are seeing. >>>> 2. The “Stop” button in the old webUI, was not working properly - that was >>>> not stop with savepoint, as stop with savepoint is a relatively new >>>> feature. >>>> 3. Now that we have stop with savepoint (it can be used from CLI as you >>>> wrote), probably we could expose this feature in the new UI as well, >>>> unless it’s already exposed somewhere? Yadong, do you know an answer for >>>> that? >>>> >>>> Piotrek >>>> >>>>> On 27 Feb 2020, at 13:31, Kaymak, Tobias <tobias.kay...@ricardo.ch >>>>> <mailto:tobias.kay...@ricardo.ch>> wrote: >>>>> >>>>> Hello, >>>>> >>>>> before Flink 1.9 I was able to "Stop" a streaming pipeline - after >>>>> clicking that button in the webinterface it performed a clean shutdown. >>>>> Now with Flink 1.9 I just see the option to cancel it. >>>>> >>>>> However, using the commandline flink stop -d >>>>> 266c5b38cf9d8e61a398a0bef4a1b350 still does the trick. So the >>>>> functionality is there. >>>>> >>>>> Has the button been removed on purpose? >>>>> >>>>> Best, >>>>> Tobias >>>> >>> >>> >>> >>> -- >>> >>> Tobias Kaymak >>> Data Engineer >>> Data Intelligence >>> >>> tobias.kay...@ricardo.ch <mailto:tobias.kay...@ricardo.ch> >>> www.ricardo.ch <http://www.ricardo.ch/> >>> Theilerstrasse 1a, 6300 Zug >>> >> >