Hi Stephan,

Here’s a link to the screenshot I tried to attach earlier:

https://drive.google.com/open?id=0B0_jTR8-IvUcMEdjWGFmYXJYS28

It looks to me like the distribution is fairly skewed across the nodes,
even though they’re executing the same pipeline.

Thanks,
Ali


On 2015-12-09, 12:36 PM, "Stephan Ewen" <se...@apache.org> wrote:

>Hi!
>
>The parallel socket source looks good.
>I think you forgot to attach the screenshot, or the mailing list dropped
>the attachment...
>
>Not sure if I can diagnose that without more details. The sources all do
>the same. Assuming that the server distributes the data evenly across all
>connected sockets, and that the network bandwidth ends up being divided in
>a fair way, all pipelines should run be similarly "eager".
>
>Greetings,
>Stephan
>
>
>On Wed, Dec 9, 2015 at 5:22 PM, Kashmar, Ali <ali.kash...@emc.com> wrote:
>
>> Hi Stephan,
>>
>> That was my original understanding, until I realized that I was not
>>using
>> a parallel socket source. I had a custom source that extended
>> SourceFunction which always runs with parallelism = 1. I looked through
>> the API and found the ParallelSourceFunction interface so I implemented
>> that and voila, now all 3 nodes in the cluster are actually receiving
>> traffic on socket connections.
>>
>> Now that I’m running it successfully end to end, I’m trying to improve
>>the
>> performance. Can you take a look at the attached screen shot and tell me
>> if the distribution of work amongst the pipelines is normal? I feel like
>> some pipelines are lot lazier than others, even though the cluster nodes
>> are exactly the same.
>>
>> By the way, here’s the class I wrote. It would be useful to have this
>> available in Flink distro:
>>
>> public class ParallelSocketSource implements
>> ParallelSourceFunction<String> {
>>
>>         private static final long serialVersionUID =
>>-271094428915640892L;
>>         private static final Logger LOG =
>> LoggerFactory.getLogger(ParallelSocketSource.class);
>>
>>         private volatile boolean running = true;
>>         private String host;
>>         private int port;
>>
>>         public ParallelSocketSource(String host, int port) {
>>                 this.host = host;
>>                 this.port = port;
>>         }
>>
>>         @Override
>>         public void run(SourceContext<String> ctx) throws Exception {
>>                 try (Socket socket = new Socket(host, port);
>>                         BufferedReader reader = new BufferedReader(new
>> InputStreamReader(socket.getInputStream()))) {
>>                         String line  = null;
>>                         while(running && ((line = reader.readLine()) !=
>> null)) {
>>                                 ctx.collect(line);
>>                         }
>>                 } catch(IOException ex) {
>>                         LOG.error("error reading from socket", ex);
>>                 }
>>         }
>>
>>         @Override
>>         public void cancel() {
>>                 running = false;
>>         }
>> }
>>
>> Regards,
>> Ali
>>
>>
>> On 2015-12-08, 3:35 PM, "Stephan Ewen" <se...@apache.org> wrote:
>>
>> >Hi Ali!
>> >
>> >In the case you have, the sequence of source-map-filter ... forms a
>> >pipeline.
>> >
>> >You mentioned that you set the parallelism to 16, so there should be 16
>> >pipelines. These pipelines should be completely independent.
>> >
>> >Looking at the way the scheduler is implemented, independent pipelines
>> >should be spread across machines. But when you execute that in
>>parallel,
>> >you say all 16 pipelines end up on the same machine?
>> >
>> >Can you share with us the rough code of your program? Or a Screenshot
>>from
>> >the runtime dashboard that shows the program graph?
>> >
>> >
>> >If your cluster is basically for that one job only, you could try and
>>set
>> >the number of slots to 4 for each machine. Then you have 16 slots in
>>total
>> >and each node would run one of the 16 pipelines.
>> >
>> >
>> >Greetings,
>> >Stephan
>> >
>> >
>> >On Wed, Dec 2, 2015 at 4:06 PM, Kashmar, Ali <ali.kash...@emc.com>
>>wrote:
>> >
>> >> There is no shuffle operation in my flow. Mine actually looks like
>>this:
>> >>
>> >> Source: Custom Source -> Flat Map -> (Filter -> Flat Map -> Map ->
>>Map
>> >>->
>> >> Map, Filter)
>> >>
>> >>
>> >> Maybe it’s treating this whole flow as one pipeline and assigning it
>>to
>> >>a
>> >> slot. What I really wanted was to have the custom source I built to
>>have
>> >> running instances on all nodes. I’m not really sure if that’s the
>>right
>> >> approach, but if we could add this as a feature that’d be great,
>>since
>> >> having more than one node running the same pipeline guarantees the
>> >> pipeline is never offline.
>> >>
>> >> -Ali
>> >>
>> >> On 2015-12-02, 4:39 AM, "Till Rohrmann" <trohrm...@apache.org> wrote:
>> >>
>> >> >If I'm not mistaken, then the scheduler has already a preference to
>> >>spread
>> >> >independent pipelines out across the cluster. At least he uses a
>>queue
>> >>of
>> >> >instances from which it pops the first element if it allocates a new
>> >>slot.
>> >> >This instance is then appended to the queue again, if it has some
>> >> >resources
>> >> >(slots) left.
>> >> >
>> >> >I would assume that you have a shuffle operation involved in your
>>job
>> >>such
>> >> >that it makes sense for the scheduler to deploy all pipelines to the
>> >>same
>> >> >machine.
>> >> >
>> >> >Cheers,
>> >> >Till
>> >> >On Dec 1, 2015 4:01 PM, "Stephan Ewen" <se...@apache.org> wrote:
>> >> >
>> >> >> Slots are like "resource groups" which execute entire pipelines.
>>They
>> >> >> frequently have more than one operator.
>> >> >>
>> >> >> What you can try as a workaround is decrease the number of slots
>>per
>> >> >> machine to cause the operators to be spread across more machines.
>> >> >>
>> >> >> If this is a crucial issue for your use case, it should be simple
>>to
>> >> >>add a
>> >> >> "preference to spread out" to the scheduler...
>> >> >>
>> >> >> On Tue, Dec 1, 2015 at 3:26 PM, Kashmar, Ali <ali.kash...@emc.com>
>> >> >>wrote:
>> >> >>
>> >> >> > Is there a way to make a task cluster-parallelizable? I.e. Make
>> >>sure
>> >> >>the
>> >> >> > parallel instances of the task are distributed across the
>>cluster.
>> >> >>When I
>> >> >> > run my flink job with a parallelism of 16, all the parallel
>>tasks
>> >>are
>> >> >> > assigned to the first task manager.
>> >> >> >
>> >> >> > - Ali
>> >> >> >
>> >> >> > On 2015-11-30, 2:18 PM, "Ufuk Celebi" <u...@apache.org> wrote:
>> >> >> >
>> >> >> > >
>> >> >> > >> On 30 Nov 2015, at 17:47, Kashmar, Ali <ali.kash...@emc.com>
>> >> wrote:
>> >> >> > >> Do the parallel instances of each task get distributed across
>> >>the
>> >> >> > >>cluster or is it possible that they all run on the same node?
>> >> >> > >
>> >> >> > >Yes, slots are requested from all nodes of the cluster. But
>>keep
>> >>in
>> >> >>mind
>> >> >> > >that multiple tasks (forming a local pipeline) can be
>>scheduled to
>> >> >>the
>> >> >> > >same slot (1 slot can hold many tasks).
>> >> >> > >
>> >> >> > >Have you seen this?
>> >> >> > >
>> >> >> >
>> >> >>
>> >> >>
>> >>
>> >>
>> 
>>https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/jo
>> >> >>b
>> >> >> > >_scheduling.html
>> >> >> > >
>> >> >> > >> If they can all run on the same node, what happens when that
>> >>node
>> >> >> > >>crashes? Does the job manager recreate them using the
>>remaining
>> >>open
>> >> >> > >>slots?
>> >> >> > >
>> >> >> > >What happens: The job manager tries to restart the program with
>> >>the
>> >> >>same
>> >> >> > >parallelism. Thus if you have enough free slots available in
>>your
>> >> >> > >cluster, this works smoothly (so yes, the remaining/available
>> >>slots
>> >> >>are
>> >> >> > >used)
>> >> >> > >
>> >> >> > >With a YARN cluster the task manager containers are restarted
>> >> >> > >automatically. In standalone mode, you have to take care of
>>this
>> >> >> yourself.
>> >> >> > >
>> >> >> > >
>> >> >> > >Does this help?
>> >> >> > >
>> >> >> > >­ Ufuk
>> >> >> > >
>> >> >> >
>> >> >> >
>> >> >>
>> >>
>> >>
>>

Reply via email to