Thanks Fabian!

Yes, that is exactly what we are looking to achieve. I looked at fine grained 
recovery FLIP but not sure if that will do the trick. Like Fabian mentioned, we 
haven’t been enabling checkpointing (reasons below). I do understand it might 
not always be possible to actually take a checkpoint of an operator that is 
failing but as long as whole job graph is not restarted and only that failing 
operator is restarted EVEN IF checkpointing is not enabled I feel like that 
will do the trick. It is “acceptable” to lose state on that failing operator. 
Further, if a lifecycle hook is provided in operators say restart (similar to 
open / close), perhaps app developers can make an attempt to checkpoint state 
(if a mechanism is provided to programmatically do so) before restarting. Just 
some thoughts there… 

Back to our scenario - A lot of those high volume datasets we are processing 
generally require few events to be grouped by key but those events arrive 
within few seconds (if not milliseconds). However, there are low percentages of 
events which arrive late or endpoints just can’t send all the group events fast 
enough and hence are in operator memory until all the events in group arrive or 
a configured timeout is reached. We are talking about 100s of thousands of 
endpoints (we will soon be millions actually) streaming data at high volume 
here. Hence, currently we are not even enabling checkpointing and are relying 
on Kafka auto commits for the most part if apps need to be restarted (we were 
hoping to avoid perf issues and resource constraints - also because of 
transient nature of the datasets, benefits of not checkpointing seemed higher). 
However, a single operator failure causing entire job graph to restart is 
causing data loss. I think it is necessary to point out that we have slight 
leeway here in the sense that it is “okay” to have a little data loss (eg: data 
loss in operator that is actually failing) or some duplicates (say 1 of the 
Kafka consumers crashed). However, what we are running into is, one operator 
failing is causing data loss in 100s of operators that are running in parallel. 
We would really like to avoid that data loss. 

Thanks, Ashish

> On Mar 15, 2018, at 3:41 AM, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> If I understand fine-grained recovery correctly, one would still need to take 
> checkpoints.
> 
> Ashish would like to avoid checkpointing and accept to lose the state of the 
> failed task. 
> However, he would like to avoid losing more state than necessary due to 
> restarting of tasks that did not fail.
> 
> Best, Fabian
> 
> 2018-03-15 1:45 GMT+01:00 Aljoscha Krettek <aljos...@apache.org 
> <mailto:aljos...@apache.org>>:
> Hi,
> 
> Have you looked into fine-grained recovery? 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures
>  
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+:+Fine+Grained+Recovery+from+Task+Failures>
> 
> Stefan cc'ed might be able to give you some pointers about configuration.
> 
> Best,
> Aljoscha
> 
> 
>> On 6. Mar 2018, at 22:35, Ashish Pokharel <ashish...@yahoo.com 
>> <mailto:ashish...@yahoo.com>> wrote:
>> 
>> Hi Gordon,
>> 
>> The issue really is we are trying to avoid checkpointing as datasets are 
>> really heavy and all of the states are really transient in a few of our apps 
>> (flushed within few seconds). So high volume/velocity and transient nature 
>> of state make those app good candidates to just not have checkpoints. 
>> 
>> We do have offsets committed to Kafka AND we have “some” tolerance for gap / 
>> duplicate. However, we do want to handle “graceful” restarts / shutdown. For 
>> shutdown, we have been taking savepoints (which works great) but for 
>> restart, we just can’t find a way. 
>> 
>> Bottom line - we are trading off resiliency for resource utilization and 
>> performance but would like to harden apps for production deployments as much 
>> as we can.
>> 
>> Hope that makes sense.
>> 
>> Thanks, Ashish
>> 
>>> On Mar 6, 2018, at 10:19 PM, Tzu-Li Tai <tzuli...@gmail.com 
>>> <mailto:tzuli...@gmail.com>> wrote:
>>> 
>>> Hi Ashish,
>>> 
>>> Could you elaborate a bit more on why you think the restart of all operators
>>> lead to data loss?
>>> 
>>> When restart occurs, Flink will restart the job from the latest complete
>>> checkpoint.
>>> All operator states will be reloaded with state written in that checkpoint,
>>> and the position of the input stream will also be re-winded.
>>> 
>>> I don't think there is a way to force a checkpoint before restarting occurs,
>>> but as I mentioned, that should not be required, because the last complete
>>> checkpoint will be used.
>>> Am I missing something in your particular setup?
>>> 
>>> Cheers,
>>> Gordon
>>> 
>>> 
>>> 
>>> --
>>> Sent from: 
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>> 
> 
> 

Reply via email to