Thanks for clarifying. shuffle() is similar to rebalance() -- however, it redistributes randomly and not in round robin fashion.
However, the problem you describe sounds like a bug to me. I included dev list. Maybe anyone else can step in so we can identify it there is a bug or not. -Matthias On 09/02/2015 06:19 PM, LINZ, Arnaud wrote: > Hi, > > You are right, but in fact it does not solve my problem, since I have 100 > parallelism everywhere. Each of my 100 sources gives only a few lines (say 14 > max), and only the first 14 next nodes will receive data. > Same problem by replacing rebalance() with shuffle(). > > But I found a workaround: setting parallelism to 1 for the source (I don't > need a 100 directory scanners anyway), it forces the rebalancing evenly > between the mappers. > > Greetings, > Arnaud > > > -----Message d'origine----- > De : Matthias J. Sax [mailto:mj...@apache.org] > Envoyé : mercredi 2 septembre 2015 17:56 > À : user@flink.apache.org > Objet : Re: How to force the parallelism on small streams? > > Hi, > > If I understand you correctly, you want to have 100 mappers. Thus you need to > apply the .setParallelism() after .map() > >> addSource(myFileSource).rebalance().map(myFileMapper).setParallelism(1 >> 00) > > The order of commands you used, set the dop for the source to 100 (which > might be ignored, if the provided source function "myFileSource" does not > implements "ParallelSourceFunction" interface). The dop for the mapper should > be the default value. > > Using .rebalance() is absolutely correct. It distributes the emitted tuples > in a round robin fashion to all consumer tasks. > > -Matthias > > On 09/02/2015 05:41 PM, LINZ, Arnaud wrote: >> Hi, >> >> >> >> I have a source that provides few items since it gives file names to >> the mappers. The mapper opens the file and process records. As the >> files are huge, one input line (a filename) gives a consequent work to the >> next stage. >> >> My topology looks like : >> >> addSource(myFileSource).rebalance().setParallelism(100).map(myFileMapp >> er) >> >> If 100 mappers are created, about 85 end immediately and only a few >> process the files (for hours). I suspect an optimization making that >> there is a minimum number of lines to pass to the next node or it is >> “shutdown” ; but in my case I do want the lines to be evenly >> distributed to each mapper. >> >> How to enforce that ? >> >> >> >> Greetings, >> >> Arnaud >> >> >> ---------------------------------------------------------------------- >> -- >> >> L'intégrité de ce message n'étant pas assurée sur internet, la société >> expéditrice ne peut être tenue responsable de son contenu ni de ses >> pièces jointes. Toute utilisation ou diffusion non autorisée est >> interdite. Si vous n'êtes pas destinataire de ce message, merci de le >> détruire et d'avertir l'expéditeur. >> >> The integrity of this message cannot be guaranteed on the Internet. >> The company that sent this message cannot therefore be held liable for >> its content nor attachments. Any unauthorized use or dissemination is >> prohibited. If you are not the intended recipient of this message, >> then please delete it and notify the sender. >
signature.asc
Description: OpenPGP digital signature