I added a new Ticket: https://issues.apache.org/jira/browse/FLINK-3336
This will implement the data shipping pattern that you mentioned in your initial mail. I also have the Pull request almost ready. > On 04 Feb 2016, at 16:25, Gwenhael Pasquiers > <[email protected]> wrote: > > Okay ; > > Then I guess that the best we can do is to disable chaining (we really want > one thread per operator since they are doing long operations) and have the > same parallelism for sinks as mapping : that way each map will have it’s own > sink and there will be no exchanges between flink instances. > > From: [email protected] [mailto:[email protected]] On Behalf Of > Stephan Ewen > Sent: jeudi 4 février 2016 15:13 > To: [email protected] > Subject: Re: Distribution of sinks among the nodes > > To your other question, there are two things in Flink: > > (1) Chaining. Tasks are folded together into one task, run by one thread. > > (2) Resource groups: Tasks stay separate, have separate threads, but share a > slot (which means share memory resources). See the link in my previous mail > for an explanation concerning those. > > Greetings, > Stephan > > > On Thu, Feb 4, 2016 at 3:10 PM, Stephan Ewen <[email protected]> wrote: > Hi Gwen! > > You actually need not 24 slots, but only as many as the highest parallelism > is (16). Slots do not hold individual tasks, but "pipelines". > > Here is an illustration how that works. > https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/config.html#configuring-taskmanager-processing-slots > > You can control whether a task can share the slot with the previous task with > the function "startNewResourceGroup()" in the streaming API. Sharing lots > makes a few things easier to reason about, especially when adding operators > to a program, you need not immediately add new machines. > > > How to solve your program case > -------------------------------------------- > > We can actually make a pretty simple addition to Flink that will cause the > tasks to be locally connected, which in turn will cause the scheduler to > distribute them like you intend. > Rather than let the 4 sources rebalance across all 16 mappers, each one > should redistribute to 4 local mappers, and these 4 mappers should send data > to one local sink each. > > We'll try and add that today and ping you once it is in. > > The following would be sample code to use this: > > env.setParallelism(4); > > env > .addSource(kafkaSource) > .partitionFan() > .map(mapper).setParallelism(16); > .partitionFan() > .addSink(kafkaSink); > > > > A bit of background why the mechanism is the way that it is right now > ---------------------------------------------------------------------------------------------- > > You can think of a slot as a slice of resources. In particular, an amount of > memory from the memory manager, but also memory in the network stack. > > What we want to do quite soon is to make streaming programs more elastic. > Consider for example the case that you have 16 slots on 4 machines, a machine > fails, and you have no spare resources. In that case Flink should recognize > that no spare resource can be acquired, and scale the job in. Since you have > only 12 slots left, the parallelism of the mappers is reduced to 12, and the > source task that was on the failed machine is moved to a slot on another > machine. > > It is important that the guaranteed resources for each task do not change > when scaling in, to keep behavior predictable. In this case, each slot will > still at most host 1 source, 1 mapper, and 1 sink, as did some of the slots > before. That is also the reason why the slots are per TaskManager, and not > global, to associate them with a constant set of resources (mainly memory). > > > Greetings, > Stephan > > > > On Thu, Feb 4, 2016 at 9:54 AM, Gwenhael Pasquiers > <[email protected]> wrote: > Don’t we need to set the number of slots to 24 (4 sources + 16 mappers + 4 > sinks) ? > > Or is there a way not to set the number of slots per TaskManager instead of > globally so that they are at least equally dispatched among the nodes ? > > As for the sink deployment : that’s not good news ; I mean we will have a > non-negligible overhead : all the data generated by 3 of the 4 nodes will be > sent to a third node instead of being sent to the “local” sink. Network I/O > have a price. > > Do you have some sort of “topology” feature coming in the roadmap ? Maybe a > listener on the JobManager / env that would be trigerred, asking usk on which > node we would prefer each node to be deployed. That way you keep the standard > behavior, don’t have to make a complicated generic-optimized algorithm, and > let the user make it’s choices. Should I create a JIRA ? > > For the time being we could start the application 4 time : one time per node, > put that’s not pretty at all J > > B.R. > > From: Till Rohrmann [mailto:[email protected]] > Sent: mercredi 3 février 2016 17:58 > > To: [email protected] > Subject: Re: Distribution of sinks among the nodes > > Hi Gwenhäel, > > if you set the number of slots for each TaskManager to 4, then all of your > mapper will be evenly spread out. The sources should also be evenly spread > out. However, for the sinks since they depend on all mappers, it will be most > likely random where they are deployed. So you might end up with 4 sink tasks > on one machine. > > Cheers, > Till > > > > On Wed, Feb 3, 2016 at 4:31 PM, Gwenhael Pasquiers > <[email protected]> wrote: > It is one type of mapper with a parallelism of 16 > It's the same for the sinks and sources (parallelism of 4) > > The settings are > Env.setParallelism(4) > Mapper.setPrallelism(env.getParallelism() * 4) > > We mean to have X mapper tasks per source / sink > > The mapper is doing some heavy computation and we have only 4 kafka > partitions. That's why we need more mappers than sources / sinks > > > -----Original Message----- > From: Aljoscha Krettek [mailto:[email protected]] > Sent: mercredi 3 février 2016 16:26 > To: [email protected] > Subject: Re: Distribution of sinks among the nodes > > Hi Gwenhäel, > when you say 16 maps, are we talking about one mapper with parallelism 16 or > 16 unique map operators? > > Regards, > Aljoscha > > On 03 Feb 2016, at 15:48, Gwenhael Pasquiers > > <[email protected]> wrote: > > > > Hi, > > > > We try to deploy an application with the following “architecture” : > > > > 4 kafka sources => 16 maps => 4 kafka sinks, on 4 nodes, with 24 slots (we > > disabled operator chaining). > > > > So we’d like on each node : > > 1x source => 4x map => 1x sink > > > > That way there are no exchanges between different instances of flink and > > performances would be optimal. > > > > But we get (according to the flink GUI and the Host column when looking at > > the details of each task) : > > > > Node 1 : 1 source => 2 map > > Node 2 : 1 source => 1 map > > Node 3 : 1 source => 1 map > > Node 4 : 1 source => 12 maps => 4 sinks > > > > (I think no comments are needed J) > > > > The the Web UI says that there are 24 slots and they are all used but they > > don’t seem evenly dispatched … > > > > How could we make Flink deploy the tasks the way we want ? > > > > B.R. > > > > Gwen’ >
