Hi Stephan, Here’s a link to the screenshot I tried to attach earlier:
https://drive.google.com/open?id=0B0_jTR8-IvUcMEdjWGFmYXJYS28 It looks to me like the distribution is fairly skewed across the nodes, even though they’re executing the same pipeline. Thanks, Ali On 2015-12-09, 12:36 PM, "Stephan Ewen" <se...@apache.org> wrote: >Hi! > >The parallel socket source looks good. >I think you forgot to attach the screenshot, or the mailing list dropped >the attachment... > >Not sure if I can diagnose that without more details. The sources all do >the same. Assuming that the server distributes the data evenly across all >connected sockets, and that the network bandwidth ends up being divided in >a fair way, all pipelines should run be similarly "eager". > >Greetings, >Stephan > > >On Wed, Dec 9, 2015 at 5:22 PM, Kashmar, Ali <ali.kash...@emc.com> wrote: > >> Hi Stephan, >> >> That was my original understanding, until I realized that I was not >>using >> a parallel socket source. I had a custom source that extended >> SourceFunction which always runs with parallelism = 1. I looked through >> the API and found the ParallelSourceFunction interface so I implemented >> that and voila, now all 3 nodes in the cluster are actually receiving >> traffic on socket connections. >> >> Now that I’m running it successfully end to end, I’m trying to improve >>the >> performance. Can you take a look at the attached screen shot and tell me >> if the distribution of work amongst the pipelines is normal? I feel like >> some pipelines are lot lazier than others, even though the cluster nodes >> are exactly the same. >> >> By the way, here’s the class I wrote. It would be useful to have this >> available in Flink distro: >> >> public class ParallelSocketSource implements >> ParallelSourceFunction<String> { >> >> private static final long serialVersionUID = >>-271094428915640892L; >> private static final Logger LOG = >> LoggerFactory.getLogger(ParallelSocketSource.class); >> >> private volatile boolean running = true; >> private String host; >> private int port; >> >> public ParallelSocketSource(String host, int port) { >> this.host = host; >> this.port = port; >> } >> >> @Override >> public void run(SourceContext<String> ctx) throws Exception { >> try (Socket socket = new Socket(host, port); >> BufferedReader reader = new BufferedReader(new >> InputStreamReader(socket.getInputStream()))) { >> String line = null; >> while(running && ((line = reader.readLine()) != >> null)) { >> ctx.collect(line); >> } >> } catch(IOException ex) { >> LOG.error("error reading from socket", ex); >> } >> } >> >> @Override >> public void cancel() { >> running = false; >> } >> } >> >> Regards, >> Ali >> >> >> On 2015-12-08, 3:35 PM, "Stephan Ewen" <se...@apache.org> wrote: >> >> >Hi Ali! >> > >> >In the case you have, the sequence of source-map-filter ... forms a >> >pipeline. >> > >> >You mentioned that you set the parallelism to 16, so there should be 16 >> >pipelines. These pipelines should be completely independent. >> > >> >Looking at the way the scheduler is implemented, independent pipelines >> >should be spread across machines. But when you execute that in >>parallel, >> >you say all 16 pipelines end up on the same machine? >> > >> >Can you share with us the rough code of your program? Or a Screenshot >>from >> >the runtime dashboard that shows the program graph? >> > >> > >> >If your cluster is basically for that one job only, you could try and >>set >> >the number of slots to 4 for each machine. Then you have 16 slots in >>total >> >and each node would run one of the 16 pipelines. >> > >> > >> >Greetings, >> >Stephan >> > >> > >> >On Wed, Dec 2, 2015 at 4:06 PM, Kashmar, Ali <ali.kash...@emc.com> >>wrote: >> > >> >> There is no shuffle operation in my flow. Mine actually looks like >>this: >> >> >> >> Source: Custom Source -> Flat Map -> (Filter -> Flat Map -> Map -> >>Map >> >>-> >> >> Map, Filter) >> >> >> >> >> >> Maybe it’s treating this whole flow as one pipeline and assigning it >>to >> >>a >> >> slot. What I really wanted was to have the custom source I built to >>have >> >> running instances on all nodes. I’m not really sure if that’s the >>right >> >> approach, but if we could add this as a feature that’d be great, >>since >> >> having more than one node running the same pipeline guarantees the >> >> pipeline is never offline. >> >> >> >> -Ali >> >> >> >> On 2015-12-02, 4:39 AM, "Till Rohrmann" <trohrm...@apache.org> wrote: >> >> >> >> >If I'm not mistaken, then the scheduler has already a preference to >> >>spread >> >> >independent pipelines out across the cluster. At least he uses a >>queue >> >>of >> >> >instances from which it pops the first element if it allocates a new >> >>slot. >> >> >This instance is then appended to the queue again, if it has some >> >> >resources >> >> >(slots) left. >> >> > >> >> >I would assume that you have a shuffle operation involved in your >>job >> >>such >> >> >that it makes sense for the scheduler to deploy all pipelines to the >> >>same >> >> >machine. >> >> > >> >> >Cheers, >> >> >Till >> >> >On Dec 1, 2015 4:01 PM, "Stephan Ewen" <se...@apache.org> wrote: >> >> > >> >> >> Slots are like "resource groups" which execute entire pipelines. >>They >> >> >> frequently have more than one operator. >> >> >> >> >> >> What you can try as a workaround is decrease the number of slots >>per >> >> >> machine to cause the operators to be spread across more machines. >> >> >> >> >> >> If this is a crucial issue for your use case, it should be simple >>to >> >> >>add a >> >> >> "preference to spread out" to the scheduler... >> >> >> >> >> >> On Tue, Dec 1, 2015 at 3:26 PM, Kashmar, Ali <ali.kash...@emc.com> >> >> >>wrote: >> >> >> >> >> >> > Is there a way to make a task cluster-parallelizable? I.e. Make >> >>sure >> >> >>the >> >> >> > parallel instances of the task are distributed across the >>cluster. >> >> >>When I >> >> >> > run my flink job with a parallelism of 16, all the parallel >>tasks >> >>are >> >> >> > assigned to the first task manager. >> >> >> > >> >> >> > - Ali >> >> >> > >> >> >> > On 2015-11-30, 2:18 PM, "Ufuk Celebi" <u...@apache.org> wrote: >> >> >> > >> >> >> > > >> >> >> > >> On 30 Nov 2015, at 17:47, Kashmar, Ali <ali.kash...@emc.com> >> >> wrote: >> >> >> > >> Do the parallel instances of each task get distributed across >> >>the >> >> >> > >>cluster or is it possible that they all run on the same node? >> >> >> > > >> >> >> > >Yes, slots are requested from all nodes of the cluster. But >>keep >> >>in >> >> >>mind >> >> >> > >that multiple tasks (forming a local pipeline) can be >>scheduled to >> >> >>the >> >> >> > >same slot (1 slot can hold many tasks). >> >> >> > > >> >> >> > >Have you seen this? >> >> >> > > >> >> >> > >> >> >> >> >> >> >> >> >> >> >> >>https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/jo >> >> >>b >> >> >> > >_scheduling.html >> >> >> > > >> >> >> > >> If they can all run on the same node, what happens when that >> >>node >> >> >> > >>crashes? Does the job manager recreate them using the >>remaining >> >>open >> >> >> > >>slots? >> >> >> > > >> >> >> > >What happens: The job manager tries to restart the program with >> >>the >> >> >>same >> >> >> > >parallelism. Thus if you have enough free slots available in >>your >> >> >> > >cluster, this works smoothly (so yes, the remaining/available >> >>slots >> >> >>are >> >> >> > >used) >> >> >> > > >> >> >> > >With a YARN cluster the task manager containers are restarted >> >> >> > >automatically. In standalone mode, you have to take care of >>this >> >> >> yourself. >> >> >> > > >> >> >> > > >> >> >> > >Does this help? >> >> >> > > >> >> >> > > Ufuk >> >> >> > > >> >> >> > >> >> >> > >> >> >> >> >> >> >> >>