For rebalance() this makes sense. I don't think anything must be changed. For regular data, there is no such issues as for this very small data set.
However for shuffle() I would expect that each source task uses a different shuffle pattern... -Matthias On 09/03/2015 03:28 PM, Fabian Hueske wrote: > In case of rebalance(), all sources start the round-robin partitioning at > index 0. Since each source emits only very few elements, only the first 15 > mappers receive any input. > It would be better to let each source start the round-robin partitioning at > a different index, something like startIdx = (numReceivers / numSenders) * > myIdx. > > In case of shuffle(), the ShufflePartitioner initializes Random() without a > seed (the current time is taken). > However, the ShufflePartitioner is only initialized once at the client side > (if I see that correctly) and then the same instance is deserialized by all > operators, i.e., all use random number generators with the same seed. > > I think, the StreamPartitioner class should be extended with a > configuration / initialize method which is called on each parallel operator. > > Cheers, Fabian > > 2015-09-03 15:04 GMT+02:00 Aljoscha Krettek <aljos...@apache.org>: > >> Hi, >> I don't think it's a bug. If there are 100 sources that each emit only 14 >> elements then only the first 14 mappers will ever receive data. The >> round-robin distribution is not global, since the sources operate >> independently from each other. >> >> Cheers, >> Aljoscha >> >> On Wed, 2 Sep 2015 at 20:00 Matthias J. Sax <mj...@apache.org> wrote: >> >>> Thanks for clarifying. shuffle() is similar to rebalance() -- however, >>> it redistributes randomly and not in round robin fashion. >>> >>> However, the problem you describe sounds like a bug to me. I included >>> dev list. Maybe anyone else can step in so we can identify it there is a >>> bug or not. >>> >>> -Matthias >>> >>> >>> On 09/02/2015 06:19 PM, LINZ, Arnaud wrote: >>>> Hi, >>>> >>>> You are right, but in fact it does not solve my problem, since I have >>> 100 parallelism everywhere. Each of my 100 sources gives only a few lines >>> (say 14 max), and only the first 14 next nodes will receive data. >>>> Same problem by replacing rebalance() with shuffle(). >>>> >>>> But I found a workaround: setting parallelism to 1 for the source (I >>> don't need a 100 directory scanners anyway), it forces the rebalancing >>> evenly between the mappers. >>>> >>>> Greetings, >>>> Arnaud >>>> >>>> >>>> -----Message d'origine----- >>>> De : Matthias J. Sax [mailto:mj...@apache.org] >>>> Envoyé : mercredi 2 septembre 2015 17:56 >>>> À : u...@flink.apache.org >>>> Objet : Re: How to force the parallelism on small streams? >>>> >>>> Hi, >>>> >>>> If I understand you correctly, you want to have 100 mappers. Thus you >>> need to apply the .setParallelism() after .map() >>>> >>>>> addSource(myFileSource).rebalance().map(myFileMapper).setParallelism(1 >>>>> 00) >>>> >>>> The order of commands you used, set the dop for the source to 100 >> (which >>> might be ignored, if the provided source function "myFileSource" does not >>> implements "ParallelSourceFunction" interface). The dop for the mapper >>> should be the default value. >>>> >>>> Using .rebalance() is absolutely correct. It distributes the emitted >>> tuples in a round robin fashion to all consumer tasks. >>>> >>>> -Matthias >>>> >>>> On 09/02/2015 05:41 PM, LINZ, Arnaud wrote: >>>>> Hi, >>>>> >>>>> >>>>> >>>>> I have a source that provides few items since it gives file names to >>>>> the mappers. The mapper opens the file and process records. As the >>>>> files are huge, one input line (a filename) gives a consequent work to >>> the next stage. >>>>> >>>>> My topology looks like : >>>>> >>>>> addSource(myFileSource).rebalance().setParallelism(100).map(myFileMapp >>>>> er) >>>>> >>>>> If 100 mappers are created, about 85 end immediately and only a few >>>>> process the files (for hours). I suspect an optimization making that >>>>> there is a minimum number of lines to pass to the next node or it is >>>>> “shutdown” ; but in my case I do want the lines to be evenly >>>>> distributed to each mapper. >>>>> >>>>> How to enforce that ? >>>>> >>>>> >>>>> >>>>> Greetings, >>>>> >>>>> Arnaud >>>>> >>>>> >>>>> ---------------------------------------------------------------------- >>>>> -- >>>>> >>>>> L'intégrité de ce message n'étant pas assurée sur internet, la société >>>>> expéditrice ne peut être tenue responsable de son contenu ni de ses >>>>> pièces jointes. Toute utilisation ou diffusion non autorisée est >>>>> interdite. Si vous n'êtes pas destinataire de ce message, merci de le >>>>> détruire et d'avertir l'expéditeur. >>>>> >>>>> The integrity of this message cannot be guaranteed on the Internet. >>>>> The company that sent this message cannot therefore be held liable for >>>>> its content nor attachments. Any unauthorized use or dissemination is >>>>> prohibited. If you are not the intended recipient of this message, >>>>> then please delete it and notify the sender. >>>> >>> >>> >> >
signature.asc
Description: OpenPGP digital signature