Hi Ali!

Seems like the Google Doc has restricted access, I tells me I have no
permission to view it...

Stephan


On Wed, Dec 9, 2015 at 8:49 PM, Kashmar, Ali <ali.kash...@emc.com> wrote:

> Hi Stephan,
>
> Here’s a link to the screenshot I tried to attach earlier:
>
> https://drive.google.com/open?id=0B0_jTR8-IvUcMEdjWGFmYXJYS28
>
> It looks to me like the distribution is fairly skewed across the nodes,
> even though they’re executing the same pipeline.
>
> Thanks,
> Ali
>
>
> On 2015-12-09, 12:36 PM, "Stephan Ewen" <se...@apache.org> wrote:
>
> >Hi!
> >
> >The parallel socket source looks good.
> >I think you forgot to attach the screenshot, or the mailing list dropped
> >the attachment...
> >
> >Not sure if I can diagnose that without more details. The sources all do
> >the same. Assuming that the server distributes the data evenly across all
> >connected sockets, and that the network bandwidth ends up being divided in
> >a fair way, all pipelines should run be similarly "eager".
> >
> >Greetings,
> >Stephan
> >
> >
> >On Wed, Dec 9, 2015 at 5:22 PM, Kashmar, Ali <ali.kash...@emc.com> wrote:
> >
> >> Hi Stephan,
> >>
> >> That was my original understanding, until I realized that I was not
> >>using
> >> a parallel socket source. I had a custom source that extended
> >> SourceFunction which always runs with parallelism = 1. I looked through
> >> the API and found the ParallelSourceFunction interface so I implemented
> >> that and voila, now all 3 nodes in the cluster are actually receiving
> >> traffic on socket connections.
> >>
> >> Now that I’m running it successfully end to end, I’m trying to improve
> >>the
> >> performance. Can you take a look at the attached screen shot and tell me
> >> if the distribution of work amongst the pipelines is normal? I feel like
> >> some pipelines are lot lazier than others, even though the cluster nodes
> >> are exactly the same.
> >>
> >> By the way, here’s the class I wrote. It would be useful to have this
> >> available in Flink distro:
> >>
> >> public class ParallelSocketSource implements
> >> ParallelSourceFunction<String> {
> >>
> >>         private static final long serialVersionUID =
> >>-271094428915640892L;
> >>         private static final Logger LOG =
> >> LoggerFactory.getLogger(ParallelSocketSource.class);
> >>
> >>         private volatile boolean running = true;
> >>         private String host;
> >>         private int port;
> >>
> >>         public ParallelSocketSource(String host, int port) {
> >>                 this.host = host;
> >>                 this.port = port;
> >>         }
> >>
> >>         @Override
> >>         public void run(SourceContext<String> ctx) throws Exception {
> >>                 try (Socket socket = new Socket(host, port);
> >>                         BufferedReader reader = new BufferedReader(new
> >> InputStreamReader(socket.getInputStream()))) {
> >>                         String line  = null;
> >>                         while(running && ((line = reader.readLine()) !=
> >> null)) {
> >>                                 ctx.collect(line);
> >>                         }
> >>                 } catch(IOException ex) {
> >>                         LOG.error("error reading from socket", ex);
> >>                 }
> >>         }
> >>
> >>         @Override
> >>         public void cancel() {
> >>                 running = false;
> >>         }
> >> }
> >>
> >> Regards,
> >> Ali
> >>
> >>
> >> On 2015-12-08, 3:35 PM, "Stephan Ewen" <se...@apache.org> wrote:
> >>
> >> >Hi Ali!
> >> >
> >> >In the case you have, the sequence of source-map-filter ... forms a
> >> >pipeline.
> >> >
> >> >You mentioned that you set the parallelism to 16, so there should be 16
> >> >pipelines. These pipelines should be completely independent.
> >> >
> >> >Looking at the way the scheduler is implemented, independent pipelines
> >> >should be spread across machines. But when you execute that in
> >>parallel,
> >> >you say all 16 pipelines end up on the same machine?
> >> >
> >> >Can you share with us the rough code of your program? Or a Screenshot
> >>from
> >> >the runtime dashboard that shows the program graph?
> >> >
> >> >
> >> >If your cluster is basically for that one job only, you could try and
> >>set
> >> >the number of slots to 4 for each machine. Then you have 16 slots in
> >>total
> >> >and each node would run one of the 16 pipelines.
> >> >
> >> >
> >> >Greetings,
> >> >Stephan
> >> >
> >> >
> >> >On Wed, Dec 2, 2015 at 4:06 PM, Kashmar, Ali <ali.kash...@emc.com>
> >>wrote:
> >> >
> >> >> There is no shuffle operation in my flow. Mine actually looks like
> >>this:
> >> >>
> >> >> Source: Custom Source -> Flat Map -> (Filter -> Flat Map -> Map ->
> >>Map
> >> >>->
> >> >> Map, Filter)
> >> >>
> >> >>
> >> >> Maybe it’s treating this whole flow as one pipeline and assigning it
> >>to
> >> >>a
> >> >> slot. What I really wanted was to have the custom source I built to
> >>have
> >> >> running instances on all nodes. I’m not really sure if that’s the
> >>right
> >> >> approach, but if we could add this as a feature that’d be great,
> >>since
> >> >> having more than one node running the same pipeline guarantees the
> >> >> pipeline is never offline.
> >> >>
> >> >> -Ali
> >> >>
> >> >> On 2015-12-02, 4:39 AM, "Till Rohrmann" <trohrm...@apache.org>
> wrote:
> >> >>
> >> >> >If I'm not mistaken, then the scheduler has already a preference to
> >> >>spread
> >> >> >independent pipelines out across the cluster. At least he uses a
> >>queue
> >> >>of
> >> >> >instances from which it pops the first element if it allocates a new
> >> >>slot.
> >> >> >This instance is then appended to the queue again, if it has some
> >> >> >resources
> >> >> >(slots) left.
> >> >> >
> >> >> >I would assume that you have a shuffle operation involved in your
> >>job
> >> >>such
> >> >> >that it makes sense for the scheduler to deploy all pipelines to the
> >> >>same
> >> >> >machine.
> >> >> >
> >> >> >Cheers,
> >> >> >Till
> >> >> >On Dec 1, 2015 4:01 PM, "Stephan Ewen" <se...@apache.org> wrote:
> >> >> >
> >> >> >> Slots are like "resource groups" which execute entire pipelines.
> >>They
> >> >> >> frequently have more than one operator.
> >> >> >>
> >> >> >> What you can try as a workaround is decrease the number of slots
> >>per
> >> >> >> machine to cause the operators to be spread across more machines.
> >> >> >>
> >> >> >> If this is a crucial issue for your use case, it should be simple
> >>to
> >> >> >>add a
> >> >> >> "preference to spread out" to the scheduler...
> >> >> >>
> >> >> >> On Tue, Dec 1, 2015 at 3:26 PM, Kashmar, Ali <ali.kash...@emc.com
> >
> >> >> >>wrote:
> >> >> >>
> >> >> >> > Is there a way to make a task cluster-parallelizable? I.e. Make
> >> >>sure
> >> >> >>the
> >> >> >> > parallel instances of the task are distributed across the
> >>cluster.
> >> >> >>When I
> >> >> >> > run my flink job with a parallelism of 16, all the parallel
> >>tasks
> >> >>are
> >> >> >> > assigned to the first task manager.
> >> >> >> >
> >> >> >> > - Ali
> >> >> >> >
> >> >> >> > On 2015-11-30, 2:18 PM, "Ufuk Celebi" <u...@apache.org> wrote:
> >> >> >> >
> >> >> >> > >
> >> >> >> > >> On 30 Nov 2015, at 17:47, Kashmar, Ali <ali.kash...@emc.com>
> >> >> wrote:
> >> >> >> > >> Do the parallel instances of each task get distributed across
> >> >>the
> >> >> >> > >>cluster or is it possible that they all run on the same node?
> >> >> >> > >
> >> >> >> > >Yes, slots are requested from all nodes of the cluster. But
> >>keep
> >> >>in
> >> >> >>mind
> >> >> >> > >that multiple tasks (forming a local pipeline) can be
> >>scheduled to
> >> >> >>the
> >> >> >> > >same slot (1 slot can hold many tasks).
> >> >> >> > >
> >> >> >> > >Have you seen this?
> >> >> >> > >
> >> >> >> >
> >> >> >>
> >> >> >>
> >> >>
> >> >>
> >>
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/jo
> >> >> >>b
> >> >> >> > >_scheduling.html
> >> >> >> > >
> >> >> >> > >> If they can all run on the same node, what happens when that
> >> >>node
> >> >> >> > >>crashes? Does the job manager recreate them using the
> >>remaining
> >> >>open
> >> >> >> > >>slots?
> >> >> >> > >
> >> >> >> > >What happens: The job manager tries to restart the program with
> >> >>the
> >> >> >>same
> >> >> >> > >parallelism. Thus if you have enough free slots available in
> >>your
> >> >> >> > >cluster, this works smoothly (so yes, the remaining/available
> >> >>slots
> >> >> >>are
> >> >> >> > >used)
> >> >> >> > >
> >> >> >> > >With a YARN cluster the task manager containers are restarted
> >> >> >> > >automatically. In standalone mode, you have to take care of
> >>this
> >> >> >> yourself.
> >> >> >> > >
> >> >> >> > >
> >> >> >> > >Does this help?
> >> >> >> > >
> >> >> >> > >­ Ufuk
> >> >> >> > >
> >> >> >> >
> >> >> >> >
> >> >> >>
> >> >>
> >> >>
> >>
>
>

Reply via email to