For rebalance() this makes sense. I don't think anything must be
changed. For regular data, there is no such issues as for this very
small data set.

However for shuffle() I would expect that each source task uses a
different shuffle pattern...

-Matthias

On 09/03/2015 03:28 PM, Fabian Hueske wrote:
> In case of rebalance(), all sources start the round-robin partitioning at
> index 0. Since each source emits only very few elements, only the first 15
> mappers receive any input.
> It would be better to let each source start the round-robin partitioning at
> a different index, something like startIdx = (numReceivers / numSenders) *
> myIdx.
> 
> In case of shuffle(), the ShufflePartitioner initializes Random() without a
> seed (the current time is taken).
> However, the ShufflePartitioner is only initialized once at the client side
> (if I see that correctly) and then the same instance is deserialized by all
> operators, i.e., all use random number generators with the same seed.
> 
> I think, the StreamPartitioner class should be extended with a
> configuration / initialize method which is called on each parallel operator.
> 
> Cheers, Fabian
> 
> 2015-09-03 15:04 GMT+02:00 Aljoscha Krettek <aljos...@apache.org>:
> 
>> Hi,
>> I don't think it's a bug. If there are 100 sources that each emit only 14
>> elements then only the first 14 mappers will ever receive data. The
>> round-robin distribution is not global, since the sources operate
>> independently from each other.
>>
>> Cheers,
>> Aljoscha
>>
>> On Wed, 2 Sep 2015 at 20:00 Matthias J. Sax <mj...@apache.org> wrote:
>>
>>> Thanks for clarifying. shuffle() is similar to rebalance() -- however,
>>> it redistributes randomly and not in round robin fashion.
>>>
>>> However, the problem you describe sounds like a bug to me. I included
>>> dev list. Maybe anyone else can step in so we can identify it there is a
>>> bug or not.
>>>
>>> -Matthias
>>>
>>>
>>> On 09/02/2015 06:19 PM, LINZ, Arnaud wrote:
>>>> Hi,
>>>>
>>>> You are right, but in fact it does not solve my problem, since I have
>>> 100 parallelism everywhere. Each of my 100 sources gives only a few lines
>>> (say 14 max), and only the first 14 next nodes will receive data.
>>>> Same problem by replacing rebalance() with shuffle().
>>>>
>>>> But I found a workaround: setting parallelism to 1 for the source (I
>>> don't need a 100 directory scanners anyway), it forces the rebalancing
>>> evenly between the mappers.
>>>>
>>>> Greetings,
>>>> Arnaud
>>>>
>>>>
>>>> -----Message d'origine-----
>>>> De : Matthias J. Sax [mailto:mj...@apache.org]
>>>> Envoyé : mercredi 2 septembre 2015 17:56
>>>> À : u...@flink.apache.org
>>>> Objet : Re: How to force the parallelism on small streams?
>>>>
>>>> Hi,
>>>>
>>>> If I understand you correctly, you want to have 100 mappers. Thus you
>>> need to apply the .setParallelism() after .map()
>>>>
>>>>> addSource(myFileSource).rebalance().map(myFileMapper).setParallelism(1
>>>>> 00)
>>>>
>>>> The order of commands you used, set the dop for the source to 100
>> (which
>>> might be ignored, if the provided source function "myFileSource" does not
>>> implements "ParallelSourceFunction" interface). The dop for the mapper
>>> should be the default value.
>>>>
>>>> Using .rebalance() is absolutely correct. It distributes the emitted
>>> tuples in a round robin fashion to all consumer tasks.
>>>>
>>>> -Matthias
>>>>
>>>> On 09/02/2015 05:41 PM, LINZ, Arnaud wrote:
>>>>> Hi,
>>>>>
>>>>>
>>>>>
>>>>> I have a source that provides few items since it gives file names to
>>>>> the mappers. The mapper opens the file and process records. As the
>>>>> files are huge, one input line (a filename) gives a consequent work to
>>> the next stage.
>>>>>
>>>>> My topology looks like :
>>>>>
>>>>> addSource(myFileSource).rebalance().setParallelism(100).map(myFileMapp
>>>>> er)
>>>>>
>>>>> If 100 mappers are created, about 85 end immediately and only a few
>>>>> process the files (for hours). I suspect an optimization making that
>>>>> there is a minimum number of lines to pass to the next node or it is
>>>>> “shutdown” ; but in my case I do want the lines to be evenly
>>>>> distributed to each mapper.
>>>>>
>>>>> How to enforce that ?
>>>>>
>>>>>
>>>>>
>>>>> Greetings,
>>>>>
>>>>> Arnaud
>>>>>
>>>>>
>>>>> ----------------------------------------------------------------------
>>>>> --
>>>>>
>>>>> L'intégrité de ce message n'étant pas assurée sur internet, la société
>>>>> expéditrice ne peut être tenue responsable de son contenu ni de ses
>>>>> pièces jointes. Toute utilisation ou diffusion non autorisée est
>>>>> interdite. Si vous n'êtes pas destinataire de ce message, merci de le
>>>>> détruire et d'avertir l'expéditeur.
>>>>>
>>>>> The integrity of this message cannot be guaranteed on the Internet.
>>>>> The company that sent this message cannot therefore be held liable for
>>>>> its content nor attachments. Any unauthorized use or dissemination is
>>>>> prohibited. If you are not the intended recipient of this message,
>>>>> then please delete it and notify the sender.
>>>>
>>>
>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to