Hi Arnaud,

Thanks for the further feedbacks!

For option1: 40min still does not makes sense, which indicates it might take 
more time to finish checkpoint in your case. I also experienced some scenarios 
of catching up data to take several hours to finish one checkpoint. If the 
current checkpoint expires because of timeout, the next new triggered 
checkpoint might still be failed for timeout. So it seems better to wait the 
current checkpoint until finishes, not expires it, unless we can not bear this 
long time for some reasons such as wondering failover to restore more data 
during this time.

For option2: The default network setting should be make sense. The lower values 
might cause performance regression and the higher values would increase the 
inflighing buffers and checkpoint delay more seriously.

For option3: If the resource is limited, it is still not working on your side.

It is an option and might work in your case for sleeping some time in source as 
you mentioned, although it seems not a graceful way.

I think there are no data skew in your case to cause backpressure, because you 
used the rebalance mode as mentioned. Another option might use the forward mode 
which would be better than rebalance mode if possible in your case. Because the 
source and downstream task is one-to-one in forward mode, so the total 
flighting buffers are 2+2+8 for one single downstream task before barrier. If 
in rebalance mode, the total flighting buffer would be (a*2+a*2+8) for one 
single downstream task (`a` is the parallelism of source vertex), because it is 
all-to-all connection. The barrier alignment takes more time in rebalance mode 
than forward mode.

Best,
Zhijiang
------------------------------------------------------------------
From:LINZ, Arnaud <al...@bouyguestelecom.fr>
Send Time:2019年3月1日(星期五) 00:46
To:zhijiang <wangzhijiang...@aliyun.com>; user <user@flink.apache.org>
Subject:RE: Checkpoints and catch-up burst (heavy back pressure)


Update :
Option  1 does not work. It still fails at the end of the timeout, no matter 
its value.
Should I implement a “bandwidth” management system by using artificial 
Thread.sleep in the source depending on the back pressure ? 
De : LINZ, Arnaud 
Envoyé : jeudi 28 février 2019 15:47
À : 'zhijiang' <wangzhijiang...@aliyun.com>; user <user@flink.apache.org>
Objet : RE: Checkpoints and catch-up burst (heavy back pressure)
Hi Zhihiang,
Thanks for your feedback.
I’ll try option 1 ; time out is 4min for now, I’ll switch it to 40min and will 
let you know. Setting it higher than 40 min does not make much sense since 
after 40 min the pending output is already quite large.
Option 3 won’t work ; I already take too many ressources, and as my source is 
more or less a hdfs directory listing, it will always be far faster than any 
mapper that reads the file and emits records based on its content or sink that 
store the transformed data, unless I put “sleeps” in it (but is this really a 
good idea?)
Option 2: taskmanager.network.memory.buffers-per-channel and 
taskmanager.network.memory.buffers-per-gate are currently unset in my 
configuration (so to their default of 2 and 8), but for this streaming app I 
have very few exchanges between nodes (just a rebalance after the source that 
emit file names, everything else is local to the node). Should I adjust their 
values nonetheless ? To higher or lower values ?
Best,
Arnaud
De : zhijiang <wangzhijiang...@aliyun.com> 
Envoyé : jeudi 28 février 2019 10:58
À : user <user@flink.apache.org>; LINZ, Arnaud <al...@bouyguestelecom.fr>
Objet : Re: Checkpoints and catch-up burst (heavy back pressure)
Hi Arnaud,
I think there are two key points. First the checkpoint barrier might be emitted 
delay from source under high backpressure for synchronizing lock. 
Second the barrier has to be queued in flighting data buffers, so the 
downstream task has to process all the buffers before barriers to trigger 
checkpoint and this would take some time under back pressure.
There has three ways to work around:
1. Increase the checkpoint timeout avoid expire in short time.
2. Decrease the setting of network buffers to decrease the amount of flighting 
buffers before barrier, you can check the config of  
"taskmanager.network.memory.buffers-per-channel" and 
"taskmanager.network.memory.buffers-per-gate".
3. Adjust the parallelism such as increasing it for sink vertex in order to 
process source data faster, to avoid backpressure in some extent.
You could check which way is suitable for your scenario and may have a try.
Best,
Zhijiang
------------------------------------------------------------------
From:LINZ, Arnaud <al...@bouyguestelecom.fr>
Send Time:2019年2月28日(星期四) 17:28
To:user <user@flink.apache.org>
Subject:Checkpoints and catch-up burst (heavy back pressure)
Hello,
I have a simple streaming app that get data from a source and store it to HDFS 
using a sink similar to the bucketing file sink. Checkpointing mode is “exactly 
once”.
Everything is fine on a “normal” course as the sink is faster than the source; 
but when we stop the application for a while and then restart it, we have a 
catch-up burst to get all the messages emitted in the meanwhile.
During this burst, the source is faster than the sink, and all checkpoints fail 
(time out) until the source has been totally caught up. This is annoying 
because the sink does not “commit” the data before a successful checkpoint is 
made, and so the app release all the “catch up” data as a atomic block that can 
be huge if the streaming app was stopped for a while, adding an unwanted stress 
to all the following hive treatments that use the data provided in micro 
batches and to the Hadoop cluster.
How should I handle the situation? Is there something special to do to get 
checkpoints even during heavy load?
The problem does not seem to be new, but I was unable to find any practical 
solution in the documentation.
Best regards,
Arnaud


 L'intégrité de ce message n'étant pas assurée sur internet, la société 
expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
l'expéditeur.

 The integrity of this message cannot be guaranteed on the Internet. The 
company that sent this message cannot therefore be held liable for its content 
nor attachments. Any unauthorized use or dissemination is prohibited. If you 
are not the intended recipient of this message, then please delete it and 
notify the sender.

Reply via email to