Thanks, One more thing to expect from the next version ! -----Original Message----- From: Aljoscha Krettek [mailto:aljos...@apache.org] Sent: lundi 8 février 2016 13:18 To: user@flink.apache.org Subject: Re: Distribution of sinks among the nodes
Hi, I just merged the new feature, so once this makes it into the 1.0-SNAPSHOT builds you should be able to use: env.setParallelism(4); env .addSource(kafkaSource) .rescale() .map(mapper).setParallelism(16); .rescale() .addSink(kafkaSink); to get your desired behavior. For this to work, the parallelism should be set to 16, with 4 nodes. Then each node will have one source, 4 mappers and 1 sink. The source will only be connected to the 4 mappers while the 4 mappers will be the only ones connected to the sink. Cheers, Aljoscha > On 04 Feb 2016, at 18:29, Aljoscha Krettek <aljos...@apache.org> wrote: > > I added a new Ticket: https://issues.apache.org/jira/browse/FLINK-3336 > > This will implement the data shipping pattern that you mentioned in your > initial mail. I also have the Pull request almost ready. > >> On 04 Feb 2016, at 16:25, Gwenhael Pasquiers >> <gwenhael.pasqui...@ericsson.com> wrote: >> >> Okay ; >> >> Then I guess that the best we can do is to disable chaining (we really want >> one thread per operator since they are doing long operations) and have the >> same parallelism for sinks as mapping : that way each map will have it’s own >> sink and there will be no exchanges between flink instances. >> >> From: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] On Behalf >> Of Stephan Ewen >> Sent: jeudi 4 février 2016 15:13 >> To: user@flink.apache.org >> Subject: Re: Distribution of sinks among the nodes >> >> To your other question, there are two things in Flink: >> >> (1) Chaining. Tasks are folded together into one task, run by one thread. >> >> (2) Resource groups: Tasks stay separate, have separate threads, but share a >> slot (which means share memory resources). See the link in my previous mail >> for an explanation concerning those. >> >> Greetings, >> Stephan >> >> >> On Thu, Feb 4, 2016 at 3:10 PM, Stephan Ewen <se...@apache.org> wrote: >> Hi Gwen! >> >> You actually need not 24 slots, but only as many as the highest parallelism >> is (16). Slots do not hold individual tasks, but "pipelines". >> >> Here is an illustration how that works. >> https://ci.apache.org/projects/flink/flink-docs-release-0.10/setup/co >> nfig.html#configuring-taskmanager-processing-slots >> >> You can control whether a task can share the slot with the previous task >> with the function "startNewResourceGroup()" in the streaming API. Sharing >> lots makes a few things easier to reason about, especially when adding >> operators to a program, you need not immediately add new machines. >> >> >> How to solve your program case >> -------------------------------------------- >> >> We can actually make a pretty simple addition to Flink that will cause the >> tasks to be locally connected, which in turn will cause the scheduler to >> distribute them like you intend. >> Rather than let the 4 sources rebalance across all 16 mappers, each one >> should redistribute to 4 local mappers, and these 4 mappers should send data >> to one local sink each. >> >> We'll try and add that today and ping you once it is in. >> >> The following would be sample code to use this: >> >> env.setParallelism(4); >> >> env >> .addSource(kafkaSource) >> .partitionFan() >> .map(mapper).setParallelism(16); >> .partitionFan() >> .addSink(kafkaSink); >> >> >> >> A bit of background why the mechanism is the way that it is right now >> --------------------------------------------------------------------- >> ------------------------- >> >> You can think of a slot as a slice of resources. In particular, an amount of >> memory from the memory manager, but also memory in the network stack. >> >> What we want to do quite soon is to make streaming programs more elastic. >> Consider for example the case that you have 16 slots on 4 machines, a >> machine fails, and you have no spare resources. In that case Flink should >> recognize that no spare resource can be acquired, and scale the job in. >> Since you have only 12 slots left, the parallelism of the mappers is reduced >> to 12, and the source task that was on the failed machine is moved to a slot >> on another machine. >> >> It is important that the guaranteed resources for each task do not change >> when scaling in, to keep behavior predictable. In this case, each slot will >> still at most host 1 source, 1 mapper, and 1 sink, as did some of the slots >> before. That is also the reason why the slots are per TaskManager, and not >> global, to associate them with a constant set of resources (mainly memory). >> >> >> Greetings, >> Stephan >> >> >> >> On Thu, Feb 4, 2016 at 9:54 AM, Gwenhael Pasquiers >> <gwenhael.pasqui...@ericsson.com> wrote: >> Don’t we need to set the number of slots to 24 (4 sources + 16 mappers + 4 >> sinks) ? >> >> Or is there a way not to set the number of slots per TaskManager instead of >> globally so that they are at least equally dispatched among the nodes ? >> >> As for the sink deployment : that’s not good news ; I mean we will have a >> non-negligible overhead : all the data generated by 3 of the 4 nodes will be >> sent to a third node instead of being sent to the “local” sink. Network I/O >> have a price. >> >> Do you have some sort of “topology” feature coming in the roadmap ? Maybe a >> listener on the JobManager / env that would be trigerred, asking usk on >> which node we would prefer each node to be deployed. That way you keep the >> standard behavior, don’t have to make a complicated generic-optimized >> algorithm, and let the user make it’s choices. Should I create a JIRA ? >> >> For the time being we could start the application 4 time : one time >> per node, put that’s not pretty at all J >> >> B.R. >> >> From: Till Rohrmann [mailto:trohrm...@apache.org] >> Sent: mercredi 3 février 2016 17:58 >> >> To: user@flink.apache.org >> Subject: Re: Distribution of sinks among the nodes >> >> Hi Gwenhäel, >> >> if you set the number of slots for each TaskManager to 4, then all of your >> mapper will be evenly spread out. The sources should also be evenly spread >> out. However, for the sinks since they depend on all mappers, it will be >> most likely random where they are deployed. So you might end up with 4 sink >> tasks on one machine. >> >> Cheers, >> Till >> >> >> >> On Wed, Feb 3, 2016 at 4:31 PM, Gwenhael Pasquiers >> <gwenhael.pasqui...@ericsson.com> wrote: >> It is one type of mapper with a parallelism of 16 It's the same for >> the sinks and sources (parallelism of 4) >> >> The settings are >> Env.setParallelism(4) >> Mapper.setPrallelism(env.getParallelism() * 4) >> >> We mean to have X mapper tasks per source / sink >> >> The mapper is doing some heavy computation and we have only 4 kafka >> partitions. That's why we need more mappers than sources / sinks >> >> >> -----Original Message----- >> From: Aljoscha Krettek [mailto:aljos...@apache.org] >> Sent: mercredi 3 février 2016 16:26 >> To: user@flink.apache.org >> Subject: Re: Distribution of sinks among the nodes >> >> Hi Gwenhäel, >> when you say 16 maps, are we talking about one mapper with parallelism 16 or >> 16 unique map operators? >> >> Regards, >> Aljoscha >>> On 03 Feb 2016, at 15:48, Gwenhael Pasquiers >>> <gwenhael.pasqui...@ericsson.com> wrote: >>> >>> Hi, >>> >>> We try to deploy an application with the following “architecture” : >>> >>> 4 kafka sources => 16 maps => 4 kafka sinks, on 4 nodes, with 24 slots (we >>> disabled operator chaining). >>> >>> So we’d like on each node : >>> 1x source => 4x map => 1x sink >>> >>> That way there are no exchanges between different instances of flink and >>> performances would be optimal. >>> >>> But we get (according to the flink GUI and the Host column when looking at >>> the details of each task) : >>> >>> Node 1 : 1 source => 2 map >>> Node 2 : 1 source => 1 map >>> Node 3 : 1 source => 1 map >>> Node 4 : 1 source => 12 maps => 4 sinks >>> >>> (I think no comments are needed J) >>> >>> The the Web UI says that there are 24 slots and they are all used >>> but they don’t seem evenly dispatched … >>> >>> How could we make Flink deploy the tasks the way we want ? >>> >>> B.R. >>> >>> Gwen’ >> >