I added a new Ticket: https://issues.apache.org/jira/browse/FLINK-3336

This will implement the data shipping pattern that you mentioned in your 
initial mail. I also have the Pull request almost ready.

> On 04 Feb 2016, at 16:25, Gwenhael Pasquiers 
> <[email protected]> wrote:
> 
> Okay ;
>  
> Then I guess that the best we can do is to disable chaining (we really want 
> one thread per operator since they are doing long operations) and have the 
> same parallelism for sinks as mapping : that way each map will have it’s own 
> sink and there will be no exchanges between flink instances.
>  
> From: [email protected] [mailto:[email protected]] On Behalf Of 
> Stephan Ewen
> Sent: jeudi 4 février 2016 15:13
> To: [email protected]
> Subject: Re: Distribution of sinks among the nodes
>  
> To your other question, there are two things in Flink:
>  
> (1) Chaining. Tasks are folded together into one task, run by one thread.
>  
> (2) Resource groups: Tasks stay separate, have separate threads, but share a 
> slot (which means share memory resources). See the link in my previous mail 
> for an explanation concerning those.
>  
> Greetings,
> Stephan
>  
>  
> On Thu, Feb 4, 2016 at 3:10 PM, Stephan Ewen <[email protected]> wrote:
> Hi Gwen!
>  
> You actually need not 24 slots, but only as many as the highest parallelism 
> is (16). Slots do not hold individual tasks, but "pipelines". 
>  
> Here is an illustration how that works.
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/config.html#configuring-taskmanager-processing-slots
>  
> You can control whether a task can share the slot with the previous task with 
> the function "startNewResourceGroup()" in the streaming API. Sharing lots 
> makes a few things easier to reason about, especially when adding operators 
> to a program, you need not immediately add new machines.
>  
>  
> How to solve your program case
> --------------------------------------------
>  
> We can actually make a pretty simple addition to Flink that will cause the 
> tasks to be locally connected, which in turn will cause the scheduler to 
> distribute them like you intend.
> Rather than let the 4 sources rebalance across all 16 mappers, each one 
> should redistribute to 4 local mappers, and these 4 mappers should send data 
> to one local sink each.
>  
> We'll try and add that today and ping you once it is in.
>  
> The following would be sample code to use this:
>  
> env.setParallelism(4);
>  
> env
>     .addSource(kafkaSource)
>     .partitionFan()
>     .map(mapper).setParallelism(16);
>     .partitionFan()
>     .addSink(kafkaSink);
>  
>  
>  
> A bit of background why the mechanism is the way that it is right now
> ----------------------------------------------------------------------------------------------
>  
> You can think of a slot as a slice of resources. In particular, an amount of 
> memory from the memory manager, but also memory in the network stack.
>  
> What we want to do quite soon is to make streaming programs more elastic. 
> Consider for example the case that you have 16 slots on 4 machines, a machine 
> fails, and you have no spare resources. In that case Flink should recognize 
> that no spare resource can be acquired, and scale the job in. Since you have 
> only 12 slots left, the parallelism of the mappers is reduced to 12, and the 
> source task that was on the failed machine is moved to a slot on another 
> machine.
>  
> It is important that the guaranteed resources for each task do not change 
> when scaling in, to keep behavior predictable. In this case, each slot will 
> still at most host 1 source, 1 mapper, and 1 sink, as did some of the slots 
> before. That is also the reason why the slots are per TaskManager, and not 
> global, to associate them with a constant set of resources (mainly memory).
>  
>  
> Greetings,
> Stephan
>  
>  
>  
> On Thu, Feb 4, 2016 at 9:54 AM, Gwenhael Pasquiers 
> <[email protected]> wrote:
> Don’t we need to set the number of slots to 24 (4 sources + 16 mappers + 4 
> sinks) ?
>  
> Or is there a way not to set the number of slots per TaskManager instead of 
> globally so that they are at least equally dispatched among the nodes ?
>  
> As for the sink deployment : that’s not good news ; I mean we will have a 
> non-negligible overhead : all the data generated by 3 of the 4 nodes will be 
> sent to a third node instead of being sent to the “local” sink. Network I/O 
> have a price.
>  
> Do you have some sort of “topology” feature coming in the roadmap ? Maybe a 
> listener on the JobManager / env that would be trigerred, asking usk on which 
> node we would prefer each node to be deployed. That way you keep the standard 
> behavior, don’t have to make a complicated generic-optimized algorithm, and 
> let the user make it’s choices. Should I create a JIRA ?
>  
> For the time being we could start the application 4 time : one time per node, 
> put that’s not pretty at all J
>  
> B.R.
>  
> From: Till Rohrmann [mailto:[email protected]] 
> Sent: mercredi 3 février 2016 17:58
> 
> To: [email protected]
> Subject: Re: Distribution of sinks among the nodes
>  
> Hi Gwenhäel,
> 
> if you set the number of slots for each TaskManager to 4, then all of your 
> mapper will be evenly spread out. The sources should also be evenly spread 
> out. However, for the sinks since they depend on all mappers, it will be most 
> likely random where they are deployed. So you might end up with 4 sink tasks 
> on one machine.
> 
> Cheers,
> Till
> 
> ​
>  
> On Wed, Feb 3, 2016 at 4:31 PM, Gwenhael Pasquiers 
> <[email protected]> wrote:
> It is one type of mapper with a parallelism of 16
> It's the same for the sinks and sources (parallelism of 4)
> 
> The settings are
> Env.setParallelism(4)
> Mapper.setPrallelism(env.getParallelism() * 4)
> 
> We mean to have X mapper tasks per source / sink
> 
> The mapper is doing some heavy computation and we have only 4 kafka 
> partitions. That's why we need more mappers than sources / sinks
> 
> 
> -----Original Message-----
> From: Aljoscha Krettek [mailto:[email protected]]
> Sent: mercredi 3 février 2016 16:26
> To: [email protected]
> Subject: Re: Distribution of sinks among the nodes
> 
> Hi Gwenhäel,
> when you say 16 maps, are we talking about one mapper with parallelism 16 or 
> 16 unique map operators?
> 
> Regards,
> Aljoscha
> > On 03 Feb 2016, at 15:48, Gwenhael Pasquiers 
> > <[email protected]> wrote:
> >
> > Hi,
> >
> > We try to deploy an application with the following “architecture” :
> >
> > 4 kafka sources => 16 maps => 4 kafka sinks, on 4 nodes, with 24 slots (we 
> > disabled operator chaining).
> >
> > So we’d like on each node :
> > 1x source => 4x map => 1x sink
> >
> > That way there are no exchanges between different instances of flink and 
> > performances would be optimal.
> >
> > But we get (according to the flink GUI and the Host column when looking at 
> > the details of each task) :
> >
> > Node 1 : 1 source =>  2 map
> > Node 2 : 1 source =>  1 map
> > Node 3 : 1 source =>  1 map
> > Node 4 : 1 source =>  12 maps => 4 sinks
> >
> > (I think no comments are needed J)
> >
> > The the Web UI says that there are 24 slots and they are all used but they 
> > don’t seem evenly dispatched …
> >
> > How could we make Flink deploy the tasks the way we want ?
> >
> > B.R.
> >
> > Gwen’
> 

Reply via email to