Hi Ali! Seems like the Google Doc has restricted access, I tells me I have no permission to view it...
Stephan On Wed, Dec 9, 2015 at 8:49 PM, Kashmar, Ali <ali.kash...@emc.com> wrote: > Hi Stephan, > > Here’s a link to the screenshot I tried to attach earlier: > > https://drive.google.com/open?id=0B0_jTR8-IvUcMEdjWGFmYXJYS28 > > It looks to me like the distribution is fairly skewed across the nodes, > even though they’re executing the same pipeline. > > Thanks, > Ali > > > On 2015-12-09, 12:36 PM, "Stephan Ewen" <se...@apache.org> wrote: > > >Hi! > > > >The parallel socket source looks good. > >I think you forgot to attach the screenshot, or the mailing list dropped > >the attachment... > > > >Not sure if I can diagnose that without more details. The sources all do > >the same. Assuming that the server distributes the data evenly across all > >connected sockets, and that the network bandwidth ends up being divided in > >a fair way, all pipelines should run be similarly "eager". > > > >Greetings, > >Stephan > > > > > >On Wed, Dec 9, 2015 at 5:22 PM, Kashmar, Ali <ali.kash...@emc.com> wrote: > > > >> Hi Stephan, > >> > >> That was my original understanding, until I realized that I was not > >>using > >> a parallel socket source. I had a custom source that extended > >> SourceFunction which always runs with parallelism = 1. I looked through > >> the API and found the ParallelSourceFunction interface so I implemented > >> that and voila, now all 3 nodes in the cluster are actually receiving > >> traffic on socket connections. > >> > >> Now that I’m running it successfully end to end, I’m trying to improve > >>the > >> performance. Can you take a look at the attached screen shot and tell me > >> if the distribution of work amongst the pipelines is normal? I feel like > >> some pipelines are lot lazier than others, even though the cluster nodes > >> are exactly the same. > >> > >> By the way, here’s the class I wrote. It would be useful to have this > >> available in Flink distro: > >> > >> public class ParallelSocketSource implements > >> ParallelSourceFunction<String> { > >> > >> private static final long serialVersionUID = > >>-271094428915640892L; > >> private static final Logger LOG = > >> LoggerFactory.getLogger(ParallelSocketSource.class); > >> > >> private volatile boolean running = true; > >> private String host; > >> private int port; > >> > >> public ParallelSocketSource(String host, int port) { > >> this.host = host; > >> this.port = port; > >> } > >> > >> @Override > >> public void run(SourceContext<String> ctx) throws Exception { > >> try (Socket socket = new Socket(host, port); > >> BufferedReader reader = new BufferedReader(new > >> InputStreamReader(socket.getInputStream()))) { > >> String line = null; > >> while(running && ((line = reader.readLine()) != > >> null)) { > >> ctx.collect(line); > >> } > >> } catch(IOException ex) { > >> LOG.error("error reading from socket", ex); > >> } > >> } > >> > >> @Override > >> public void cancel() { > >> running = false; > >> } > >> } > >> > >> Regards, > >> Ali > >> > >> > >> On 2015-12-08, 3:35 PM, "Stephan Ewen" <se...@apache.org> wrote: > >> > >> >Hi Ali! > >> > > >> >In the case you have, the sequence of source-map-filter ... forms a > >> >pipeline. > >> > > >> >You mentioned that you set the parallelism to 16, so there should be 16 > >> >pipelines. These pipelines should be completely independent. > >> > > >> >Looking at the way the scheduler is implemented, independent pipelines > >> >should be spread across machines. But when you execute that in > >>parallel, > >> >you say all 16 pipelines end up on the same machine? > >> > > >> >Can you share with us the rough code of your program? Or a Screenshot > >>from > >> >the runtime dashboard that shows the program graph? > >> > > >> > > >> >If your cluster is basically for that one job only, you could try and > >>set > >> >the number of slots to 4 for each machine. Then you have 16 slots in > >>total > >> >and each node would run one of the 16 pipelines. > >> > > >> > > >> >Greetings, > >> >Stephan > >> > > >> > > >> >On Wed, Dec 2, 2015 at 4:06 PM, Kashmar, Ali <ali.kash...@emc.com> > >>wrote: > >> > > >> >> There is no shuffle operation in my flow. Mine actually looks like > >>this: > >> >> > >> >> Source: Custom Source -> Flat Map -> (Filter -> Flat Map -> Map -> > >>Map > >> >>-> > >> >> Map, Filter) > >> >> > >> >> > >> >> Maybe it’s treating this whole flow as one pipeline and assigning it > >>to > >> >>a > >> >> slot. What I really wanted was to have the custom source I built to > >>have > >> >> running instances on all nodes. I’m not really sure if that’s the > >>right > >> >> approach, but if we could add this as a feature that’d be great, > >>since > >> >> having more than one node running the same pipeline guarantees the > >> >> pipeline is never offline. > >> >> > >> >> -Ali > >> >> > >> >> On 2015-12-02, 4:39 AM, "Till Rohrmann" <trohrm...@apache.org> > wrote: > >> >> > >> >> >If I'm not mistaken, then the scheduler has already a preference to > >> >>spread > >> >> >independent pipelines out across the cluster. At least he uses a > >>queue > >> >>of > >> >> >instances from which it pops the first element if it allocates a new > >> >>slot. > >> >> >This instance is then appended to the queue again, if it has some > >> >> >resources > >> >> >(slots) left. > >> >> > > >> >> >I would assume that you have a shuffle operation involved in your > >>job > >> >>such > >> >> >that it makes sense for the scheduler to deploy all pipelines to the > >> >>same > >> >> >machine. > >> >> > > >> >> >Cheers, > >> >> >Till > >> >> >On Dec 1, 2015 4:01 PM, "Stephan Ewen" <se...@apache.org> wrote: > >> >> > > >> >> >> Slots are like "resource groups" which execute entire pipelines. > >>They > >> >> >> frequently have more than one operator. > >> >> >> > >> >> >> What you can try as a workaround is decrease the number of slots > >>per > >> >> >> machine to cause the operators to be spread across more machines. > >> >> >> > >> >> >> If this is a crucial issue for your use case, it should be simple > >>to > >> >> >>add a > >> >> >> "preference to spread out" to the scheduler... > >> >> >> > >> >> >> On Tue, Dec 1, 2015 at 3:26 PM, Kashmar, Ali <ali.kash...@emc.com > > > >> >> >>wrote: > >> >> >> > >> >> >> > Is there a way to make a task cluster-parallelizable? I.e. Make > >> >>sure > >> >> >>the > >> >> >> > parallel instances of the task are distributed across the > >>cluster. > >> >> >>When I > >> >> >> > run my flink job with a parallelism of 16, all the parallel > >>tasks > >> >>are > >> >> >> > assigned to the first task manager. > >> >> >> > > >> >> >> > - Ali > >> >> >> > > >> >> >> > On 2015-11-30, 2:18 PM, "Ufuk Celebi" <u...@apache.org> wrote: > >> >> >> > > >> >> >> > > > >> >> >> > >> On 30 Nov 2015, at 17:47, Kashmar, Ali <ali.kash...@emc.com> > >> >> wrote: > >> >> >> > >> Do the parallel instances of each task get distributed across > >> >>the > >> >> >> > >>cluster or is it possible that they all run on the same node? > >> >> >> > > > >> >> >> > >Yes, slots are requested from all nodes of the cluster. But > >>keep > >> >>in > >> >> >>mind > >> >> >> > >that multiple tasks (forming a local pipeline) can be > >>scheduled to > >> >> >>the > >> >> >> > >same slot (1 slot can hold many tasks). > >> >> >> > > > >> >> >> > >Have you seen this? > >> >> >> > > > >> >> >> > > >> >> >> > >> >> >> > >> >> > >> >> > >> > >> > https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/jo > >> >> >>b > >> >> >> > >_scheduling.html > >> >> >> > > > >> >> >> > >> If they can all run on the same node, what happens when that > >> >>node > >> >> >> > >>crashes? Does the job manager recreate them using the > >>remaining > >> >>open > >> >> >> > >>slots? > >> >> >> > > > >> >> >> > >What happens: The job manager tries to restart the program with > >> >>the > >> >> >>same > >> >> >> > >parallelism. Thus if you have enough free slots available in > >>your > >> >> >> > >cluster, this works smoothly (so yes, the remaining/available > >> >>slots > >> >> >>are > >> >> >> > >used) > >> >> >> > > > >> >> >> > >With a YARN cluster the task manager containers are restarted > >> >> >> > >automatically. In standalone mode, you have to take care of > >>this > >> >> >> yourself. > >> >> >> > > > >> >> >> > > > >> >> >> > >Does this help? > >> >> >> > > > >> >> >> > > Ufuk > >> >> >> > > > >> >> >> > > >> >> >> > > >> >> >> > >> >> > >> >> > >> > >