Hi Owen, Thanks for the quick response. No, I haven’t seen the previous blog post, yes it clears the things out a bit.
> To clarify, the code is attempting to simulate a straggler node due to high > load, which therefore processes data at a slower rate - not a failing node. > Some degree of this is a feature of multi-tenant Hadoop. In your benchmark you are manually slowing down just one TaskManager, so you are testing for the failing/slow machine case, where either: 1. the machine is slow on it’s own because it’s smaller than the others, 2. it’s overloaded by some service independent of the Flink 3. it's a failing node. Out of those three options, first two are not supported by Flink, in a sense that Flink assumes more or less equal machines in the cluster. The third is, as I wrote in the previous response, pretty uncommon scenario (until you reach really huge scale). How often one of your machine fails in a way that it is 6.6 times slower than the others? I agree Flink doesn’t handle this automatically at the moment (currently you would be expected to manually shut down the machine). Nevertheless there are some plans how to address this (speculative execution and load based balancing channel selection), but with no definite schedule. Also if the issue is "multi-tenant Hadoop.”, I would first try to better assign resources in the cluster, using for example CGroups via yarn/lxc/docker, or virtual machines. Cheers, Piotrek > On 10 Oct 2019, at 16:02, Owen Rees-Hayward <owe...@googlemail.com> wrote: > > Hi Piotr, > > Thanks for getting back to me and for the info. I try to describe the > motivation around the scenarios in the original post in the series - see the > 'Backpressure - why you might care' section on > http://owenrh.me.uk/blog/2019/09/30/ <http://owenrh.me.uk/blog/2019/09/30/>. > Maybe it could have been clearer. > > As you note, this will not affect every Flink job. However, one persons niche > is another persons day job. I definitely agree that keyed network exchanges, > which is going to the majority of analytics queries, are in a different > problem space. However, this is not an uncommon scenario in ingest pipelines. > > I'd be interested to know whether you saw the section in the post I referred > to above and whether this clears anything up? To clarify, the code is > attempting to simulate a straggler node due to high load, which therefore > processes data at a slower rate - not a failing node. Some degree of this is > a feature of multi-tenant Hadoop. > > Cheers, Owen > > On Thu, 10 Oct 2019 at 10:27, Piotr Nowojski <pi...@ververica.com > <mailto:pi...@ververica.com>> wrote: > Hi, > > I’m not entirely sure what you are testing. I have looked at your code (only > the constant straggler scenario) and please correct me if’m wrong, in your > job you are basically measuring throughput of > `Thread.sleep(straggler.waitMillis)`. > > In the first RichMap task (`subTaskId == 0`), per every record you do the > sleep(50ms), so after filling in all of the network buffers your whole job > will be bottlenecked by this throughput cap of 20 records / second. Every so > often when this struggling task will be able to process and free up some > buffer from the backlog. This briefly unblocks other three tasks (which are > capped at 133 records / second). Apart from those short stints, those other > tasks can not process constant 133 records / seconds, because records are > evenly distributed by the source between all of those tasks. Which is I think > clearly visible on the charts and every system would behave in exactly the > same way. > > But what scenario are you really trying to simulate? > > A data skew when one task is 6.65 (133 / 20 ) times more > overloaded/processing heavier records than the others? Yes, this is expected > behaviour, but your benchmark is testing this in a bit convoluted way. > > A failing machine which has 6.65 times less performance? With keyed network > exchanges there is again very little that you can do (except of the > speculative execution). Without keyed network exchanges, OK, I agree. In this > case, randomly/evenly distributing the records is not the optimal shuffling > strategy and there is some room for the improvement in Flink (we could > distribute records not randomly but to the less busy machines). However this > is a pretty much niche feature (failing machine + non keyed exchanges) and > you are not saying anywhere that this is what you are testing for. > > Piotrek > >> On 8 Oct 2019, at 18:10, Owen Rees-Hayward <owe...@googlemail.com >> <mailto:owe...@googlemail.com>> wrote: >> >> Hi, >> >> I am having a few issues with the Flink (v1.8.1) backpressure default >> settings, which lead to poor throughput in a comparison I am doing between >> Storm, Spark and Flink. >> >> I have a setup that simulates a progressively worse straggling task that >> Storm and Spark cope with the relatively well. Flink not so much. Code can >> be found here - https://github.com/owenrh/flink-variance >> <https://github.com/owenrh/flink-variance>. >> >> See this throughput chart for the an idea of how badly - >> https://owenrh.me.uk/assets/images/blog/smackdown/flink-constant-straggler.png >> >> <https://owenrh.me.uk/assets/images/blog/smackdown/flink-constant-straggler.png> >> >> I do not have any production experience with Flink, but I have had a look at >> the Flink docs and there is nothing in there that jumps out at me to explain >> or address this. I presume I am missing something, as I cannot believe Flink >> is this weak in the face of stragglers. It must be configuration right? >> >> Would appreciate any help on this. I've got a draft blog post that I will >> publish in a day or two, and don't want to criticise the Flink backpressure >> implementation for what seems most likely some default configuration issue. >> >> Thanks in advance, Owen >> >> -- >> Owen Rees-Hayward >> 07912 876046 >> twitter.com/owen4d <http://twitter.com/owen4d> > > > -- > Owen Rees-Hayward > 07912 876046 > twitter.com/owen4d <http://twitter.com/owen4d>