Hi,
I don't think it's a bug. If there are 100 sources that each emit only 14
elements then only the first 14 mappers will ever receive data. The
round-robin distribution is not global, since the sources operate
independently from each other.

Cheers,
Aljoscha

On Wed, 2 Sep 2015 at 20:00 Matthias J. Sax <mj...@apache.org> wrote:

> Thanks for clarifying. shuffle() is similar to rebalance() -- however,
> it redistributes randomly and not in round robin fashion.
>
> However, the problem you describe sounds like a bug to me. I included
> dev list. Maybe anyone else can step in so we can identify it there is a
> bug or not.
>
> -Matthias
>
>
> On 09/02/2015 06:19 PM, LINZ, Arnaud wrote:
> > Hi,
> >
> > You are right, but in fact it does not solve my problem, since I have
> 100 parallelism everywhere. Each of my 100 sources gives only a few lines
> (say 14 max), and only the first 14 next nodes will receive data.
> > Same problem by replacing rebalance() with shuffle().
> >
> > But I found a workaround: setting parallelism to 1 for the source (I
> don't need a 100 directory scanners anyway), it forces the rebalancing
> evenly between the mappers.
> >
> > Greetings,
> > Arnaud
> >
> >
> > -----Message d'origine-----
> > De : Matthias J. Sax [mailto:mj...@apache.org]
> > Envoyé : mercredi 2 septembre 2015 17:56
> > À : u...@flink.apache.org
> > Objet : Re: How to force the parallelism on small streams?
> >
> > Hi,
> >
> > If I understand you correctly, you want to have 100 mappers. Thus you
> need to apply the .setParallelism() after .map()
> >
> >> addSource(myFileSource).rebalance().map(myFileMapper).setParallelism(1
> >> 00)
> >
> > The order of commands you used, set the dop for the source to 100 (which
> might be ignored, if the provided source function "myFileSource" does not
> implements "ParallelSourceFunction" interface). The dop for the mapper
> should be the default value.
> >
> > Using .rebalance() is absolutely correct. It distributes the emitted
> tuples in a round robin fashion to all consumer tasks.
> >
> > -Matthias
> >
> > On 09/02/2015 05:41 PM, LINZ, Arnaud wrote:
> >> Hi,
> >>
> >>
> >>
> >> I have a source that provides few items since it gives file names to
> >> the mappers. The mapper opens the file and process records. As the
> >> files are huge, one input line (a filename) gives a consequent work to
> the next stage.
> >>
> >> My topology looks like :
> >>
> >> addSource(myFileSource).rebalance().setParallelism(100).map(myFileMapp
> >> er)
> >>
> >> If 100 mappers are created, about 85 end immediately and only a few
> >> process the files (for hours). I suspect an optimization making that
> >> there is a minimum number of lines to pass to the next node or it is
> >> “shutdown” ; but in my case I do want the lines to be evenly
> >> distributed to each mapper.
> >>
> >> How to enforce that ?
> >>
> >>
> >>
> >> Greetings,
> >>
> >> Arnaud
> >>
> >>
> >> ----------------------------------------------------------------------
> >> --
> >>
> >> L'intégrité de ce message n'étant pas assurée sur internet, la société
> >> expéditrice ne peut être tenue responsable de son contenu ni de ses
> >> pièces jointes. Toute utilisation ou diffusion non autorisée est
> >> interdite. Si vous n'êtes pas destinataire de ce message, merci de le
> >> détruire et d'avertir l'expéditeur.
> >>
> >> The integrity of this message cannot be guaranteed on the Internet.
> >> The company that sent this message cannot therefore be held liable for
> >> its content nor attachments. Any unauthorized use or dissemination is
> >> prohibited. If you are not the intended recipient of this message,
> >> then please delete it and notify the sender.
> >
>
>

Reply via email to