Btw, it is working with a parallelism 1 source, because only a single source partitions (round-robin or random) the data. Several sources do not assign work to the same few mappers.
2015-09-03 15:22 GMT+02:00 Matthias J. Sax <mj...@apache.org>: > If it would be only 14 elements, you are obviously right. However, if I > understood Arnaud correctly, the problem is, that there are more than 14 > elements: > > > Each of my 100 sources gives only a few lines (say 14 max) > > That would be about 140 lines in total. > > Using non-parallel source, he is able to distribute the elements to all > 100 mappers. I assume that about 40 mappers receive 2 lines, and 60 > receive 1 line. > > @Arnaud: is this correct? > > > -Matthias > > On 09/03/2015 03:04 PM, Aljoscha Krettek wrote: > > Hi, > > I don't think it's a bug. If there are 100 sources that each emit only > > 14 elements then only the first 14 mappers will ever receive data. The > > round-robin distribution is not global, since the sources operate > > independently from each other. > > > > Cheers, > > Aljoscha > > > > On Wed, 2 Sep 2015 at 20:00 Matthias J. Sax <mj...@apache.org > > <mailto:mj...@apache.org>> wrote: > > > > Thanks for clarifying. shuffle() is similar to rebalance() -- > however, > > it redistributes randomly and not in round robin fashion. > > > > However, the problem you describe sounds like a bug to me. I included > > dev list. Maybe anyone else can step in so we can identify it there > is a > > bug or not. > > > > -Matthias > > > > > > On 09/02/2015 06:19 PM, LINZ, Arnaud wrote: > > > Hi, > > > > > > You are right, but in fact it does not solve my problem, since I > > have 100 parallelism everywhere. Each of my 100 sources gives only a > > few lines (say 14 max), and only the first 14 next nodes will > > receive data. > > > Same problem by replacing rebalance() with shuffle(). > > > > > > But I found a workaround: setting parallelism to 1 for the source > > (I don't need a 100 directory scanners anyway), it forces the > > rebalancing evenly between the mappers. > > > > > > Greetings, > > > Arnaud > > > > > > > > > -----Message d'origine----- > > > De : Matthias J. Sax [mailto:mj...@apache.org > > <mailto:mj...@apache.org>] > > > Envoyé : mercredi 2 septembre 2015 17:56 > > > À : user@flink.apache.org <mailto:user@flink.apache.org> > > > Objet : Re: How to force the parallelism on small streams? > > > > > > Hi, > > > > > > If I understand you correctly, you want to have 100 mappers. Thus > > you need to apply the .setParallelism() after .map() > > > > > >> > > > addSource(myFileSource).rebalance().map(myFileMapper).setParallelism(1 > > >> 00) > > > > > > The order of commands you used, set the dop for the source to 100 > > (which might be ignored, if the provided source function > > "myFileSource" does not implements "ParallelSourceFunction" > > interface). The dop for the mapper should be the default value. > > > > > > Using .rebalance() is absolutely correct. It distributes the > > emitted tuples in a round robin fashion to all consumer tasks. > > > > > > -Matthias > > > > > > On 09/02/2015 05:41 PM, LINZ, Arnaud wrote: > > >> Hi, > > >> > > >> > > >> > > >> I have a source that provides few items since it gives file names > to > > >> the mappers. The mapper opens the file and process records. As the > > >> files are huge, one input line (a filename) gives a consequent > > work to the next stage. > > >> > > >> My topology looks like : > > >> > > >> > > > addSource(myFileSource).rebalance().setParallelism(100).map(myFileMapp > > >> er) > > >> > > >> If 100 mappers are created, about 85 end immediately and only a > few > > >> process the files (for hours). I suspect an optimization making > that > > >> there is a minimum number of lines to pass to the next node or it > is > > >> “shutdown” ; but in my case I do want the lines to be evenly > > >> distributed to each mapper. > > >> > > >> How to enforce that ? > > >> > > >> > > >> > > >> Greetings, > > >> > > >> Arnaud > > >> > > >> > > >> > > > ---------------------------------------------------------------------- > > >> -- > > >> > > >> L'intégrité de ce message n'étant pas assurée sur internet, la > > société > > >> expéditrice ne peut être tenue responsable de son contenu ni de > ses > > >> pièces jointes. Toute utilisation ou diffusion non autorisée est > > >> interdite. Si vous n'êtes pas destinataire de ce message, merci > de le > > >> détruire et d'avertir l'expéditeur. > > >> > > >> The integrity of this message cannot be guaranteed on the > Internet. > > >> The company that sent this message cannot therefore be held > > liable for > > >> its content nor attachments. Any unauthorized use or > dissemination is > > >> prohibited. If you are not the intended recipient of this message, > > >> then please delete it and notify the sender. > > > > > > >