Hi,

Sorry for my previous slightly confusing response, please take a look at the 
response from Gordon.

Piotrek

> On 2 Mar 2020, at 12:05, Kaymak, Tobias <tobias.kay...@ricardo.ch> wrote:
> 
> Hi,
> 
> let me refine my question: My pipeline is generated from Beam, so the Flink 
> pipeline is a translated Beam pipeline. When I update my Apache Beam pipeline 
> code, working with a snapshot in Flink to stop the pipeline is not an option, 
> as the snapshot will use the old representation of the the Flink pipeline 
> when resuming from that snapshot.
> 
> Meaning that I am looking for a way to drain the pipeline cleanly and using 
> the last committed offset in Kafka to resume processing after I started it 
> again (launching it through Beam will regenerate the Flink pipeline and it 
> should resume at the offset where it left of, that is the latest committed 
> offset in Kafka).
> 
> Can this be achieved with a cancel or stop of the Flink pipeline?
> 
> Best,
> Tobias
> 
> On Mon, Mar 2, 2020 at 11:09 AM Piotr Nowojski <pi...@ververica.com 
> <mailto:pi...@ververica.com>> wrote:
> Hi Tobi,
> 
> No, FlinkKafkaConsumer is not using committed Kafka’s offsets for recovery. 
> Offsets where to start from are stored in the checkpoint itself. Updating the 
> offsets back to Kafka is an optional, purely cosmetic thing from the Flink’s 
> perspective, so the job will start from the correct offsets.
> 
> However, if you for whatever the reason re-start the job from a 
> savepoint/checkpoint that’s not the latest one, this will violate 
> exactly-once guarantees - there will be some duplicated records committed two 
> times in the sinks, as simply some records would be processed and committed 
> twice. Committing happens on checkpoint, so if you are recovering to some 
> previous checkpoint, there is nothing Flink can do - some records were 
> already committed before.
> 
> Piotrek
> 
>> On 2 Mar 2020, at 10:12, Kaymak, Tobias <tobias.kay...@ricardo.ch 
>> <mailto:tobias.kay...@ricardo.ch>> wrote:
>> 
>> Thank you Piotr!
>> 
>> One last question - let's assume my source is a Kafka topic - if I stop via 
>> the CLI with a savepoint in Flink 1.9, but do not use that savepoint when 
>> restarting my job - the job would continue from the last offset that has 
>> been committed in Kafka and thus I would also not experience a loss of data 
>> in my sink. Is that correct?
>> 
>> Best,
>> Tobi
>> 
>> On Fri, Feb 28, 2020 at 3:17 PM Piotr Nowojski <pi...@ververica.com 
>> <mailto:pi...@ververica.com>> wrote:
>> Yes, that’s correct. There shouldn’t be any data loss. Stop with savepoint 
>> is a solution to make sure, that if you are stopping a job (either 
>> permanently or temporarily) that all of the results are published/committed 
>> to external systems before you actually stop the job. 
>> 
>> If you just cancel/kill/crash a job, in some rare cases (if a checkpoint was 
>> completing at the time cluster was crashing), some records might not be 
>> committed before the cancellation/kill/crash happened. Also note that 
>> doesn’t mean there is a data loss, just those records will be published once 
>> you restore your job from a checkpoint. If you want to stop the job 
>> permanently, that might not happen, hence we need stop with savepoint.
>> 
>> Piotrek
>> 
>>> On 28 Feb 2020, at 15:02, Kaymak, Tobias <tobias.kay...@ricardo.ch 
>>> <mailto:tobias.kay...@ricardo.ch>> wrote:
>>> 
>>> Thank you! For understanding the matter: When I have a streaming pipeline 
>>> (reading from Kafka, writing somewhere) and I click "cancel" and after that 
>>> I restart the pipeline - I should not expect any data to be lost - is that 
>>> correct?
>>> 
>>> Best,
>>> Tobias 
>>> 
>>> On Fri, Feb 28, 2020 at 2:51 PM Piotr Nowojski <pi...@ververica.com 
>>> <mailto:pi...@ververica.com>> wrote:
>>> Thanks for confirming that Yadong. I’ve created a ticket for that [1].
>>> 
>>> Piotrek
>>> 
>>> [1] https://issues.apache.org/jira/browse/FLINK-16340 
>>> <https://issues.apache.org/jira/browse/FLINK-16340>
>>> 
>>>> On 28 Feb 2020, at 14:32, Yadong Xie <vthink...@gmail.com 
>>>> <mailto:vthink...@gmail.com>> wrote:
>>>> 
>>>> Hi
>>>> 
>>>> 1. the old stop button was removed in flink 1.9.0 since it could not work 
>>>> properly as I know
>>>> 2. if we have the feature of the stop with savepoint, we could add it to 
>>>> the web UI, but it may still need some work on the rest API to support the 
>>>> new feature
>>>> 
>>>> 
>>>> Best,
>>>> Yadong
>>>> 
>>>> 
>>>> Piotr Nowojski <pi...@ververica.com <mailto:pi...@ververica.com>> 
>>>> 于2020年2月28日周五 下午8:49写道:
>>>> Hi,
>>>> 
>>>> I’m not sure. Maybe Yadong (CC) will know more, but to the best of my 
>>>> knowledge and research:
>>>> 
>>>> 1. In Flink 1.9 we switched from the old webUI to a new one, that probably 
>>>> explains the difference you are seeing.
>>>> 2. The “Stop” button in the old webUI, was not working properly - that was 
>>>> not stop with savepoint, as stop with savepoint is a relatively new 
>>>> feature.
>>>> 3. Now that we have stop with savepoint (it can be used from CLI as you 
>>>> wrote), probably we could expose this feature in the new UI as well, 
>>>> unless it’s already exposed somewhere? Yadong, do you know an answer for 
>>>> that?
>>>> 
>>>> Piotrek
>>>> 
>>>>> On 27 Feb 2020, at 13:31, Kaymak, Tobias <tobias.kay...@ricardo.ch 
>>>>> <mailto:tobias.kay...@ricardo.ch>> wrote:
>>>>> 
>>>>> Hello,
>>>>> 
>>>>> before Flink 1.9 I was able to "Stop" a streaming pipeline - after 
>>>>> clicking that button in the webinterface it performed a clean shutdown. 
>>>>> Now with Flink 1.9 I just see the option to cancel it. 
>>>>> 
>>>>> However, using the commandline flink stop -d 
>>>>> 266c5b38cf9d8e61a398a0bef4a1b350 still does the trick. So the 
>>>>> functionality is there. 
>>>>> 
>>>>> Has the button been removed on purpose?
>>>>> 
>>>>> Best,
>>>>> Tobias
>>>> 
>>> 
>>> 
>>> 
>>> -- 
>>> 
>>> Tobias Kaymak
>>> Data Engineer
>>> Data Intelligence
>>> 
>>> tobias.kay...@ricardo.ch <mailto:tobias.kay...@ricardo.ch>
>>> www.ricardo.ch <http://www.ricardo.ch/>
>>> Theilerstrasse 1a, 6300 Zug
>>> 
>> 
> 

Reply via email to