Hi Stephan, That was my original understanding, until I realized that I was not using a parallel socket source. I had a custom source that extended SourceFunction which always runs with parallelism = 1. I looked through the API and found the ParallelSourceFunction interface so I implemented that and voila, now all 3 nodes in the cluster are actually receiving traffic on socket connections.
Now that I’m running it successfully end to end, I’m trying to improve the performance. Can you take a look at the attached screen shot and tell me if the distribution of work amongst the pipelines is normal? I feel like some pipelines are lot lazier than others, even though the cluster nodes are exactly the same. By the way, here’s the class I wrote. It would be useful to have this available in Flink distro: public class ParallelSocketSource implements ParallelSourceFunction<String> { private static final long serialVersionUID = -271094428915640892L; private static final Logger LOG = LoggerFactory.getLogger(ParallelSocketSource.class); private volatile boolean running = true; private String host; private int port; public ParallelSocketSource(String host, int port) { this.host = host; this.port = port; } @Override public void run(SourceContext<String> ctx) throws Exception { try (Socket socket = new Socket(host, port); BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) { String line = null; while(running && ((line = reader.readLine()) != null)) { ctx.collect(line); } } catch(IOException ex) { LOG.error("error reading from socket", ex); } } @Override public void cancel() { running = false; } } Regards, Ali On 2015-12-08, 3:35 PM, "Stephan Ewen" <se...@apache.org> wrote: >Hi Ali! > >In the case you have, the sequence of source-map-filter ... forms a >pipeline. > >You mentioned that you set the parallelism to 16, so there should be 16 >pipelines. These pipelines should be completely independent. > >Looking at the way the scheduler is implemented, independent pipelines >should be spread across machines. But when you execute that in parallel, >you say all 16 pipelines end up on the same machine? > >Can you share with us the rough code of your program? Or a Screenshot from >the runtime dashboard that shows the program graph? > > >If your cluster is basically for that one job only, you could try and set >the number of slots to 4 for each machine. Then you have 16 slots in total >and each node would run one of the 16 pipelines. > > >Greetings, >Stephan > > >On Wed, Dec 2, 2015 at 4:06 PM, Kashmar, Ali <ali.kash...@emc.com> wrote: > >> There is no shuffle operation in my flow. Mine actually looks like this: >> >> Source: Custom Source -> Flat Map -> (Filter -> Flat Map -> Map -> Map >>-> >> Map, Filter) >> >> >> Maybe it’s treating this whole flow as one pipeline and assigning it to >>a >> slot. What I really wanted was to have the custom source I built to have >> running instances on all nodes. I’m not really sure if that’s the right >> approach, but if we could add this as a feature that’d be great, since >> having more than one node running the same pipeline guarantees the >> pipeline is never offline. >> >> -Ali >> >> On 2015-12-02, 4:39 AM, "Till Rohrmann" <trohrm...@apache.org> wrote: >> >> >If I'm not mistaken, then the scheduler has already a preference to >>spread >> >independent pipelines out across the cluster. At least he uses a queue >>of >> >instances from which it pops the first element if it allocates a new >>slot. >> >This instance is then appended to the queue again, if it has some >> >resources >> >(slots) left. >> > >> >I would assume that you have a shuffle operation involved in your job >>such >> >that it makes sense for the scheduler to deploy all pipelines to the >>same >> >machine. >> > >> >Cheers, >> >Till >> >On Dec 1, 2015 4:01 PM, "Stephan Ewen" <se...@apache.org> wrote: >> > >> >> Slots are like "resource groups" which execute entire pipelines. They >> >> frequently have more than one operator. >> >> >> >> What you can try as a workaround is decrease the number of slots per >> >> machine to cause the operators to be spread across more machines. >> >> >> >> If this is a crucial issue for your use case, it should be simple to >> >>add a >> >> "preference to spread out" to the scheduler... >> >> >> >> On Tue, Dec 1, 2015 at 3:26 PM, Kashmar, Ali <ali.kash...@emc.com> >> >>wrote: >> >> >> >> > Is there a way to make a task cluster-parallelizable? I.e. Make >>sure >> >>the >> >> > parallel instances of the task are distributed across the cluster. >> >>When I >> >> > run my flink job with a parallelism of 16, all the parallel tasks >>are >> >> > assigned to the first task manager. >> >> > >> >> > - Ali >> >> > >> >> > On 2015-11-30, 2:18 PM, "Ufuk Celebi" <u...@apache.org> wrote: >> >> > >> >> > > >> >> > >> On 30 Nov 2015, at 17:47, Kashmar, Ali <ali.kash...@emc.com> >> wrote: >> >> > >> Do the parallel instances of each task get distributed across >>the >> >> > >>cluster or is it possible that they all run on the same node? >> >> > > >> >> > >Yes, slots are requested from all nodes of the cluster. But keep >>in >> >>mind >> >> > >that multiple tasks (forming a local pipeline) can be scheduled to >> >>the >> >> > >same slot (1 slot can hold many tasks). >> >> > > >> >> > >Have you seen this? >> >> > > >> >> > >> >> >> >> >> >>https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/jo >> >>b >> >> > >_scheduling.html >> >> > > >> >> > >> If they can all run on the same node, what happens when that >>node >> >> > >>crashes? Does the job manager recreate them using the remaining >>open >> >> > >>slots? >> >> > > >> >> > >What happens: The job manager tries to restart the program with >>the >> >>same >> >> > >parallelism. Thus if you have enough free slots available in your >> >> > >cluster, this works smoothly (so yes, the remaining/available >>slots >> >>are >> >> > >used) >> >> > > >> >> > >With a YARN cluster the task manager containers are restarted >> >> > >automatically. In standalone mode, you have to take care of this >> >> yourself. >> >> > > >> >> > > >> >> > >Does this help? >> >> > > >> >> > > Ufuk >> >> > > >> >> > >> >> > >> >> >> >>