Hi Piotrek,

I come back to you with a Jira ticket that I created and a proposal
the ticket : https://issues.apache.org/jira/browse/FLINK-7883
the proposal  :
https://github.com/aphilippot/flink/commit/9c58c95bb4b68ea337f7c583b7e039d86f3142a6

I'am open to any comments or suggestions

Antoine

Le mar. 10 oct. 2017 à 09:28, Piotr Nowojski <pi...@data-artisans.com> a
écrit :

> Hi,
>
> That’s good to hear :)
>
> I quickly went through the code and it seems reasonable. I think there
> might be need to think a little bit more about how this cancel checkpoint
> should be exposed to the operators and what should be default action -
> right now by default cancel flag is ignored, I would like to consider if
> throwing an UnsupportedOperation would be a better long therm solution.
>
> But at first glance I do not see any larger issues and it would great if
> you could make a pull request out of it.
>
> Piotrek
>
> On 9 Oct 2017, at 15:56, Antoine Philippot <antoine.philip...@teads.tv>
> wrote:
>
> Thanks for your advices Piotr.
>
> Firstly, yes, we are aware that even with clean shutdown we can end up
> with duplicated messages after a crash and it is acceptable as is it rare
> and unintentional unlike deploying new business code or up/down scale.
>
> I made a fork of the 1.2.1 version which we currently use and developed a
> simple POC based on the solution to pass a boolean stopSourceSavepoint from
> the job manager to the source when a cancel with savepoint is triggered.
> This is the altered code :
> https://github.com/aphilippot/flink/compare/release-1.2.1...aphilippot:flink_1_2_1_POC_savepoint
>
> We test it with our production workload and there are no duplicated
> messages any more while hundred of thousands were duplicated before.
>
> I planned to reapply/adapt this patch for the 1.3.2 release when we
> migrate to it and maybe later to the 1.4
>
> I'm open to suggestion or to help/develop this feature upstream if you
> want.
>
>
> Le lun. 2 oct. 2017 à 19:09, Piotr Nowojski <pi...@data-artisans.com> a
> écrit :
>
>> We are planning to work on this clean shut down after releasing Flink
>> 1.4. Implementing this properly would require some work, for example:
>> - adding some checkpoint options to add information about
>> “closing”/“shutting down” event
>> - add clean shutdown to source functions API
>> - implement handling of this clean shutdown in desired sources
>>
>> Those are not super complicated changes but also not trivial.
>>
>> One thing that you could do, is to implement some super hacky filter
>> function just after source operator, that you would manually trigger.
>> Normally it would pass all of the messages. Once triggered, it would wait
>> for next checkpoint to happen. It would assume that it is a save point, and
>> would start filtering out all of the subsequent messages. When this
>> checkpoint completes, you could manually shutdown your Flink application.
>> This could guarantee that there are no duplicated writes after a restart.
>> This might work for clean shutdown, but it would be a very hacky solution.
>>
>> Btw, keep in mind that even with clean shutdown you can end up with
>> duplicated messages after a crash and there is no way around this with
>> Kafka 0.9.
>>
>> Piotrek
>>
>> On Oct 2, 2017, at 5:30 PM, Antoine Philippot <antoine.philip...@teads.tv>
>> wrote:
>>
>> Thanks Piotr for your answer, we sadly can't use kafka 0.11 for now (and
>> until a while).
>>
>> We can not afford tens of thousands of duplicated messages for each
>> application upgrade, can I help by working on this feature ?
>> Do you have any hint or details on this part of that "todo list" ?
>>
>>
>> Le lun. 2 oct. 2017 à 16:50, Piotr Nowojski <pi...@data-artisans.com> a
>> écrit :
>>
>>> Hi,
>>>
>>> For failures recovery with Kafka 0.9 it is not possible to avoid
>>> duplicated messages. Using Flink 1.4 (unreleased yet) combined with Kafka
>>> 0.11 it will be possible to achieve exactly-once end to end semantic when
>>> writing to Kafka. However this still a work in progress:
>>>
>>> https://issues.apache.org/jira/browse/FLINK-6988
>>>
>>> However this is a superset of functionality that you are asking for.
>>> Exactly-once just for clean shutdowns is also on our “TODO” list (it
>>> would/could support Kafka 0.9), but it is not currently being actively
>>> developed.
>>>
>>> Piotr Nowojski
>>>
>>> On Oct 2, 2017, at 3:35 PM, Antoine Philippot <
>>> antoine.philip...@teads.tv> wrote:
>>>
>>> Hi,
>>>
>>> I'm working on a flink streaming app with a kafka09 to kafka09 use case
>>> which handles around 100k messages per seconds.
>>>
>>> To upgrade our application we used to run a flink cancel with savepoint
>>> command followed by a flink run with the previous saved savepoint and the
>>> new application fat jar as parameter. We notice that we can have more than
>>> 50k of duplicated messages in the kafka sink wich is not idempotent.
>>>
>>> This behaviour is actually problematic for this project and I try to
>>> find a solution / workaround to avoid these duplicated messages.
>>>
>>> The JobManager indicates clearly that the cancel call is triggered once
>>> the savepoint is finished, but during the savepoint execution, kafka source
>>> continue to poll new messages which will not be part of the savepoint and
>>> will be replayed on the next application start.
>>>
>>> I try to find a solution with the stop command line argument but the
>>> kafka source doesn't implement StoppableFunction (
>>> https://issues.apache.org/jira/browse/FLINK-3404) and the savepoint
>>> generation is not available with stop in contrary to cancel.
>>>
>>> Is there an other solution to not process duplicated messages for each
>>> application upgrade or rescaling ?
>>>
>>> If no, has someone planned to implement it? Otherwise, I can propose a
>>> pull request after some architecture advices.
>>>
>>> The final goal is to stop polling source and trigger a savepoint once
>>> polling stopped.
>>>
>>> Thanks
>>>
>>>
>>>
>>
>

Reply via email to