Hi,

one possible explanation that I see is the following: in a shuffle, each there 
are input and output buffers for each parallel subtask to which data could be 
shuffled. Those buffers are flushed either when full or after a timeout 
interval. If you increase the parallelism, there are more buffers and each 
buffer gets a smaller fraction of the data. This, in turn, means that it takes 
longer until an individual buffer is full and data is emitted. The timeout 
interval enforces an upper bound.

Your experiments works on a very small scale, and I would not assume that this 
would increase latency without bounds - at least once you hit the buffer 
timeout interval the latency should no longer increase. You could validate this 
by configuring smaller buffer sizes and test how this impacts the experiment.

Best,
Stefan

> Am 03.01.2018 um 08:13 schrieb Netzer, Liron <liron.net...@citi.com>:
> 
> Hi group,
>  
> We have a standalone Flink cluster that is running on a UNIX host with 40 
> CPUs and 256GB RAM.
> There is one task manager and 24 slots were defined.
> When we decrease the parallelism of the Stream graph operators(each operator 
> has the same parallelism),
> we see a consistent change in the latency, it gets better:
> Test run
> Parallelism
> 99 percentile
> 95 percentile
> 75 percentile
> Mean
> #1
> 8
> 4.15 ms
> 2.02 ms
> 0.22 ms
> 0.42 ms
> #2
> 7
> 3.6 ms
> 1.68 ms
> 0.14 ms
> 0.34 ms
> #3
> 6
> 3 ms
> 1.4 ms
> 0.13 ms
> 0.3 ms
> #4
> 5
> 2.1 ms
> 0.95 ms
> 0.1 ms
> 0.22 ms
> #5
> 4
> 1.5 ms
> 0.64 ms
> 0.09 ms
> 0.16 ms
>  
> This was a surprise for us, as we expected that higher parallelism will 
> derive better latency.
> Could you try to assist us to understand this behavior? 
> I know that when there are more threads that are involved, there is probably 
> more serialization/deserialization, but this can't be the only reason for 
> this behavior.
>  
> We have two Kafka sources, and the rest of the operators are fixed windows, 
> flatmaps, coMappers and several KeyBys. 
> Except for the Kafka sources and some internal logging, there is no other I/O 
> (i.e. we do not connect to any external DB/queue)
> We use Flink 1.3.
>  
>  
> Thanks,
> Liron

Reply via email to