[ 
https://issues.apache.org/jira/browse/FLINK-30985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17686991#comment-17686991
 ] 

ming li commented on FLINK-30985:
---------------------------------

[~lzljs3620320] This is somewhat different from 
[FLINK-31008|https://issues.apache.org/jira/browse/FLINK-31008]. Considering 
the situation where a task consumes multiple buckets at the same time, we 
always allocate the split of the first bucket, which may cause other buckets to 
not consume for a long time.

> [Flink][Table Store] Change the Splits allocation algorithm of 
> ContinuousFileSplitEnumerator in TableStore to a fair algorithm.
> -------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-30985
>                 URL: https://issues.apache.org/jira/browse/FLINK-30985
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table Store
>            Reporter: ming li
>            Priority: Major
>
> Currently, {{assignSplits}} of {{ContinuousFileSplitEnumerator}} in 
> {{TableStore}} is performed by traversing the {{{}HashMap{}}}, but since the 
> number of buckets is fixed, the order of traversal is also fixed.
> {code:java}
> private void assignSplits() {
>     bucketSplits.forEach(
>             (bucket, splits) -> {
>                 if (splits.size() > 0) {
>                     // To ensure the order of consumption, the data of the 
> same bucket is given
>                     // to a task to be consumed.
>                     int task = bucket % context.currentParallelism();
>                     if (readersAwaitingSplit.remove(task)) {
>                         // if the reader that requested another split has 
> failed in the
>                         // meantime, remove
>                         // it from the list of waiting readers
>                         if (!context.registeredReaders().containsKey(task)) {
>                             return;
>                         }
>                         context.assignSplit(splits.poll(), task);
>                     }
>                 }
>             });
> }{code}
> Assume that a {{task}} consumes multiple {{{}buckets{}}}, and there is enough 
> split in each {{bucket}} , so that the first {{bucket}} will always be 
> assigned to the task, and other buckets may not be consumed for a long time, 
> resulting in uneven consumption and difficulty in advancing 
> {{{}watermark{}}}. So I think we should change the split allocation algorithm 
> to a fair algorithm.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to