[ 
https://issues.apache.org/jira/browse/FLINK-22887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17368697#comment-17368697
 ] 

Jiayi Liao commented on FLINK-22887:
------------------------------------

[~pnowojski]  In my implementation, the optimized partitioner uses the backlog 
statistics lazily. For example, {{RebalancePartitioner}} should send a record 
into channel 12, but in my implemenation, the optimized partitioner will get 
the backlog of the chosen channel firstly and skip the channel if the backlog 
statistics is too high(e.g. backlogCount > 2). The {{BacklogStatistics}} 
maintains a reference of {{ResultPartitionWriter}} and exposes a API {{boolean 
isChannelBackPressured(int targetChannel)}} to {{StreamPartitioner}}. 

I tried to think about an earger way to aquire the backlog statistics, like 
gather the channels' statistics every five minutes. But it doesn't look like a 
good idea. So no progress so far. 

Your idea also makes sense and it ever came to my mind too. But it seems not so 
"elegant" to introduce a {{synchronized}} variable in such a critial module for 
a statistics value. (maybe it's only from my self-psychology effect :)

> Backlog based optimizations for RebalancePartitioner and RescalePartitioner
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-22887
>                 URL: https://issues.apache.org/jira/browse/FLINK-22887
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Network
>    Affects Versions: 1.13.1
>            Reporter: Jiayi Liao
>            Priority: Major
>
> {\{RebalancePartitioner}} uses round-robin to distribute the records but this 
> may not work as expected, because the environments and the processing ability 
> of the downstream tasks may differ from each other. In such cases, the 
> throughput of the whole job will be limited by the slowest downstream 
> subtask, which is very similar with the "HASH" scenario.
> Instead, after the credit-based mechanism is introduced, we can leverage the 
> {{backlog}} on the sender side to identify the "load" on each receiver side, 
> which help us distribute the data more fairly in {{RebalancePartitioner}} and 
> {{RescalePartitioner}}. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to