Hi,

The answer is correct but I'll try and elaborate a bit: the way data is sent to 
downstream operations depends on a couple of things in this case:

 - parallelism of first input operation
 - parallelism of second input operation
 - parallelism of co-operation
 - transmission pattern on first input (broadcast, rebalance, etc.)
 - transmission pattern on second input

Note that there is no parallelism on "streams" since there are technically no 
streams but only operations that are interconnected in a certain way.

Now, if the input parallelism and the operation parallelism are the same and 
you don't specify a transmission pattern then data will not be "shuffled" 
between the operations. If you specify broadcast or rebalance then you will get 
that, i.e. for broadcast an element from the input operator will be sent to 
every instance on the downstream operation.

Best,
Aljoscha

> On 3. Jan 2018, at 10:43, Timo Walther <twal...@apache.org> wrote:
> 
> Hi Mans,
> 
> I did a quick test on my PC where I simply set breakpoints in map1 and map2 
> (someStream has parallelism 1, otherStream 5, my CoMapFunction 8). Elements 
> of someStream end up in different CoMapTasks (2/8, 7/8 etc.).
> 
> So I guess the distribution is a round robin partioning. @Aljoscha might know 
> more about the internals?
> 
> Regards,
> Timo
> 
> 
> 
> Am 12/31/17 um 10:38 PM schrieb M Singh:
>> Hi:
>> 
>> Referring to documentation 
>> (https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/index.html
>>  
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/index.html>)
>>  for ConnectedStreams:
>> 
>> "Connects" two data streams retaining their types. Connect allowing for 
>> shared state between the two streams.
>> DataStream<Integer> someStream = //...
>> DataStream<String> otherStream = //...
>> 
>> ConnectedStreams<Integer, String> connectedStreams = 
>> someStream.connect(otherStream);
>> 
>> If the two connected streams have different number of partitions, eg 
>> (someStream has 4 and otherStream has 2), then how do the elements of the 
>> stream get distributed for the CoMapFunction:
>> 
>> connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
>>     @Override
>>     public Boolean map1(Integer value) {
>>         return true;
>>     }
>> 
>>     @Override
>>     public Boolean map2(String value) {
>>         return false;
>>     }
>> });
>> 
>> I believe that that if the second stream is broadcast, then each partition 
>> of the first will get all the elements of the second.  Is my understanding 
>> correct ?
>> 
>> If the streams are not broadcast and since the first stream has 4 partitions 
>> and second one had 2, then how are the elements of the second stream 
>> distributed to each partition of the first ?
>> 
>> Also, if the streams are not broadcasted but have same number of partitions, 
>> how are the elements distributed ?
>> 
>> Thanks
>> 
>> Mans
>> 
>> 
>> 
>> 
> 

Reply via email to