On Fri, 1 Mar 2019 at 13:05, LINZ, Arnaud <al...@bouyguestelecom.fr> wrote:
> Hi, > > > > I think I should go into more details to explain my use case. > > I have one non parallel source (parallelism = 1) that list binary files in > a HDFS directory. DataSet emitted by the source is a data set of file > names, not file content. These filenames are rebalanced, and sent to > workers (parallelism = 15) that will use a flatmapper that open the file, > read it, decode it, and send records (forward mode) to the sinks (with a > few 1-to-1 mapping in-between). So the flatmap operation is a > time-consuming one as the files are more than 200Mb large each; the > flatmapper will emit millions of record to the sink given one source record > (filename). > > > > The rebalancing, occurring at the file name level, does not use much I/O > and I cannot use one-to-one mode at that point if I want some parallelims > since I have only one source. > > > > I did not put file decoding directly in the sources because I have no good > way to distribute files to sources without a controller (input directory is > unique, filenames are random and cannot be “attributed” to one particular > source instance easily). > Crazy idea: If you know the task number and the number of tasks, you can hash the filename using a shared algorithm (e.g. md5 or sha1 or crc32) and then just check modulo number of tasks == task number. That would let you run the list files in parallel without sharing state. which would allow file decoding directly in the sources > Alternatively, I could have used a dispatcher daemon separated from the > streaming app that distribute files to various directories, each directory > being associated with a flink source instance, and put the file reading & > decoding directly in the source, but that seemed more complex to code and > exploit than the filename source. Would it have been better from the > checkpointing perspective? > > > > About the ungraceful source sleep(), is there a way, programmatically, to > know the “load” of the app, or to determine if checkpointing takes too much > time, so that I can do it only on purpose? > > > > Thanks, > > Arnaud > > > > *De :* zhijiang <wangzhijiang...@aliyun.com> > *Envoyé :* vendredi 1 mars 2019 04:59 > *À :* user <user@flink.apache.org>; LINZ, Arnaud <al...@bouyguestelecom.fr > > > *Objet :* Re: Checkpoints and catch-up burst (heavy back pressure) > > > > Hi Arnaud, > > > > Thanks for the further feedbacks! > > > > For option1: 40min still does not makes sense, which indicates it might > take more time to finish checkpoint in your case. I also experienced some > scenarios of catching up data to take several hours to finish one > checkpoint. If the current checkpoint expires because of timeout, the next > new triggered checkpoint might still be failed for timeout. So it seems > better to wait the current checkpoint until finishes, not expires it, > unless we can not bear this long time for some reasons such as wondering > failover to restore more data during this time. > > > > For option2: The default network setting should be make sense. The lower > values might cause performance regression and the higher values would > increase the inflighing buffers and checkpoint delay more seriously. > > > > For option3: If the resource is limited, it is still not working on your > side. > > > > It is an option and might work in your case for sleeping some time in > source as you mentioned, although it seems not a graceful way. > > > > I think there are no data skew in your case to cause backpressure, because > you used the rebalance mode as mentioned. Another option might use the > forward mode which would be better than rebalance mode if possible in your > case. Because the source and downstream task is one-to-one in forward mode, > so the total flighting buffers are 2+2+8 for one single downstream task > before barrier. If in rebalance mode, the total flighting buffer would be > (a*2+a*2+8) for one single downstream task (`a` is the parallelism of > source vertex), because it is all-to-all connection. The barrier alignment > takes more time in rebalance mode than forward mode. > > > > Best, > > Zhijiang > > ------------------------------------------------------------------ > > From:LINZ, Arnaud <al...@bouyguestelecom.fr> > > Send Time:2019年3月1日(星期五) 00:46 > > To:zhijiang <wangzhijiang...@aliyun.com>; user <user@flink.apache.org> > > Subject:RE: Checkpoints and catch-up burst (heavy back pressure) > > > > Update : > > Option 1 does not work. It still fails at the end of the timeout, no > matter its value. > > Should I implement a “bandwidth” management system by using artificial > Thread.sleep in the source depending on the back pressure ? > > > > *De :* LINZ, Arnaud > *Envoyé :* jeudi 28 février 2019 15:47 > *À :* 'zhijiang' <wangzhijiang...@aliyun.com>; user <user@flink.apache.org > > > *Objet :* RE: Checkpoints and catch-up burst (heavy back pressure) > > > > Hi Zhihiang, > > > > Thanks for your feedback. > > - I’ll try option 1 ; time out is 4min for now, I’ll switch it to > 40min and will let you know. Setting it higher than 40 min does not make > much sense since after 40 min the pending output is already quite large. > - Option 3 won’t work ; I already take too many ressources, and as my > source is more or less a hdfs directory listing, it will always be far > faster than any mapper that reads the file and emits records based on its > content or sink that store the transformed data, unless I put “sleeps” in > it (but is this really a good idea?) > - Option 2: taskmanager.network.memory.buffers-per-channel and > taskmanager.network.memory.buffers-per-gate are currently unset in my > configuration (so to their default of 2 and 8), but for this streaming app > I have very few exchanges between nodes (just a rebalance after the source > that emit file names, everything else is local to the node). Should I > adjust their values nonetheless ? To higher or lower values ? > > Best, > > Arnaud > > *De :* zhijiang <wangzhijiang...@aliyun.com> > *Envoyé :* jeudi 28 février 2019 10:58 > *À :* user <user@flink.apache.org>; LINZ, Arnaud <al...@bouyguestelecom.fr > > > *Objet :* Re: Checkpoints and catch-up burst (heavy back pressure) > > > > Hi Arnaud, > > > > I think there are two key points. First the checkpoint barrier might be > emitted delay from source under high backpressure for synchronizing lock. > > Second the barrier has to be queued in flighting data buffers, so the > downstream task has to process all the buffers before barriers to trigger > checkpoint and this would take some time under back pressure. > > > > There has three ways to work around: > > 1. Increase the checkpoint timeout avoid expire in short time. > > 2. Decrease the setting of network buffers to decrease the amount of > flighting buffers before barrier, you can check the config of > "taskmanager.network.memory.buffers-per-channel" and > "taskmanager.network.memory.buffers-per-gate". > > 3. Adjust the parallelism such as increasing it for sink vertex in order > to process source data faster, to avoid backpressure in some extent. > > > > You could check which way is suitable for your scenario and may have a try. > > > > Best, > > Zhijiang > > ------------------------------------------------------------------ > > From:LINZ, Arnaud <al...@bouyguestelecom.fr> > > Send Time:2019年2月28日(星期四) 17:28 > > To:user <user@flink.apache.org> > > Subject:Checkpoints and catch-up burst (heavy back pressure) > > > > Hello, > > > > I have a simple streaming app that get data from a source and store it to > HDFS using a sink similar to the bucketing file sink. Checkpointing mode is > “exactly once”. > > Everything is fine on a “normal” course as the sink is faster than the > source; but when we stop the application for a while and then restart it, > we have a catch-up burst to get all the messages emitted in the meanwhile. > > During this burst, the source is faster than the sink, and all checkpoints > fail (time out) until the source has been totally caught up. This is > annoying because the sink does not “commit” the data before a successful > checkpoint is made, and so the app release all the “catch up” data as a > atomic block that can be huge if the streaming app was stopped for a while, > adding an unwanted stress to all the following hive treatments that use the > data provided in micro batches and to the Hadoop cluster. > > > > How should I handle the situation? Is there something special to do to get > checkpoints even during heavy load? > > > > The problem does not seem to be new, but I was unable to find any > practical solution in the documentation. > > > > Best regards, > > Arnaud > > > > > > > > > > > ------------------------------ > > > L'intégrité de ce message n'étant pas assurée sur internet, la société > expéditrice ne peut être tenue responsable de son contenu ni de ses pièces > jointes. Toute utilisation ou diffusion non autorisée est interdite. Si > vous n'êtes pas destinataire de ce message, merci de le détruire et > d'avertir l'expéditeur. > > The integrity of this message cannot be guaranteed on the Internet. The > company that sent this message cannot therefore be held liable for its > content nor attachments. Any unauthorized use or dissemination is > prohibited. If you are not the intended recipient of this message, then > please delete it and notify the sender. > > > > >