Thanks for clarifying. shuffle() is similar to rebalance() -- however,
it redistributes randomly and not in round robin fashion.

However, the problem you describe sounds like a bug to me. I included
dev list. Maybe anyone else can step in so we can identify it there is a
bug or not.

-Matthias


On 09/02/2015 06:19 PM, LINZ, Arnaud wrote:
> Hi,
> 
> You are right, but in fact it does not solve my problem, since I have 100 
> parallelism everywhere. Each of my 100 sources gives only a few lines (say 14 
> max), and only the first 14 next nodes will receive data.
> Same problem by replacing rebalance() with shuffle().
> 
> But I found a workaround: setting parallelism to 1 for the source (I don't 
> need a 100 directory scanners anyway), it forces the rebalancing evenly 
> between the mappers.
> 
> Greetings,
> Arnaud
> 
> 
> -----Message d'origine-----
> De : Matthias J. Sax [mailto:mj...@apache.org] 
> Envoyé : mercredi 2 septembre 2015 17:56
> À : user@flink.apache.org
> Objet : Re: How to force the parallelism on small streams?
> 
> Hi,
> 
> If I understand you correctly, you want to have 100 mappers. Thus you need to 
> apply the .setParallelism() after .map()
> 
>> addSource(myFileSource).rebalance().map(myFileMapper).setParallelism(1
>> 00)
> 
> The order of commands you used, set the dop for the source to 100 (which 
> might be ignored, if the provided source function "myFileSource" does not 
> implements "ParallelSourceFunction" interface). The dop for the mapper should 
> be the default value.
> 
> Using .rebalance() is absolutely correct. It distributes the emitted tuples 
> in a round robin fashion to all consumer tasks.
> 
> -Matthias
> 
> On 09/02/2015 05:41 PM, LINZ, Arnaud wrote:
>> Hi,
>>
>>  
>>
>> I have a source that provides few items since it gives file names to 
>> the mappers. The mapper opens the file and process records. As the 
>> files are huge, one input line (a filename) gives a consequent work to the 
>> next stage.
>>
>> My topology looks like :
>>
>> addSource(myFileSource).rebalance().setParallelism(100).map(myFileMapp
>> er)
>>
>> If 100 mappers are created, about 85 end immediately and only a few 
>> process the files (for hours). I suspect an optimization making that 
>> there is a minimum number of lines to pass to the next node or it is 
>> “shutdown” ; but in my case I do want the lines to be evenly 
>> distributed to each mapper.
>>
>> How to enforce that ?
>>
>>  
>>
>> Greetings,
>>
>> Arnaud
>>
>>
>> ----------------------------------------------------------------------
>> --
>>
>> L'intégrité de ce message n'étant pas assurée sur internet, la société 
>> expéditrice ne peut être tenue responsable de son contenu ni de ses 
>> pièces jointes. Toute utilisation ou diffusion non autorisée est 
>> interdite. Si vous n'êtes pas destinataire de ce message, merci de le 
>> détruire et d'avertir l'expéditeur.
>>
>> The integrity of this message cannot be guaranteed on the Internet. 
>> The company that sent this message cannot therefore be held liable for 
>> its content nor attachments. Any unauthorized use or dissemination is 
>> prohibited. If you are not the intended recipient of this message, 
>> then please delete it and notify the sender.
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to