[ 
https://issues.apache.org/jira/browse/FLINK-30985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ming li updated FLINK-30985:
----------------------------
    Summary: [Flink][Table Store] Change the Splits allocation algorithm of 
ContinuousFileSplitEnumerator in TableStore to a fair algorithm.  (was: 
[Flink][table-store] Change the Splits allocation algorithm of 
ContinuousFileSplitEnumerator in TableStore to a fair algorithm.)

> [Flink][Table Store] Change the Splits allocation algorithm of 
> ContinuousFileSplitEnumerator in TableStore to a fair algorithm.
> -------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-30985
>                 URL: https://issues.apache.org/jira/browse/FLINK-30985
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table Store
>            Reporter: ming li
>            Priority: Major
>
> Currently, {{assignSplits}} of {{ContinuousFileSplitEnumerator}} in 
> {{TableStore}} is performed by traversing the {{{}HashMap{}}}, but since the 
> number of buckets is fixed, the order of traversal is also fixed.
> {code:java}
> private void assignSplits() {
>     bucketSplits.forEach(
>             (bucket, splits) -> {
>                 if (splits.size() > 0) {
>                     // To ensure the order of consumption, the data of the 
> same bucket is given
>                     // to a task to be consumed.
>                     int task = bucket % context.currentParallelism();
>                     if (readersAwaitingSplit.remove(task)) {
>                         // if the reader that requested another split has 
> failed in the
>                         // meantime, remove
>                         // it from the list of waiting readers
>                         if (!context.registeredReaders().containsKey(task)) {
>                             return;
>                         }
>                         context.assignSplit(splits.poll(), task);
>                     }
>                 }
>             });
> }{code}
> Assume that a {{task}} consumes multiple {{{}buckets{}}}, and there is enough 
> split in each {{bucket}} , so that the first {{bucket}} will always be 
> assigned to the task, and other buckets may not be consumed for a long time, 
> resulting in uneven consumption and difficulty in advancing 
> {{{}watermark{}}}. So I think we should change the split allocation algorithm 
> to a fair algorithm.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to