ming li created FLINK-30985:
-------------------------------

             Summary: [Flink][table-store] Change the Splits allocation 
algorithm of ContinuousFileSplitEnumerator in TableStore to a fair algorithm.
                 Key: FLINK-30985
                 URL: https://issues.apache.org/jira/browse/FLINK-30985
             Project: Flink
          Issue Type: Improvement
          Components: Table Store
            Reporter: ming li


Currently, {{assignSplits}} of {{ContinuousFileSplitEnumerator}} in 
{{TableStore}} is performed by traversing the {{{}HashMap{}}}, but since the 
number of buckets is fixed, the order of traversal is also fixed.
{code:java}
private void assignSplits() {
    bucketSplits.forEach(
            (bucket, splits) -> {
                if (splits.size() > 0) {
                    // To ensure the order of consumption, the data of the same 
bucket is given
                    // to a task to be consumed.
                    int task = bucket % context.currentParallelism();
                    if (readersAwaitingSplit.remove(task)) {
                        // if the reader that requested another split has 
failed in the
                        // meantime, remove
                        // it from the list of waiting readers
                        if (!context.registeredReaders().containsKey(task)) {
                            return;
                        }
                        context.assignSplit(splits.poll(), task);
                    }
                }
            });
}{code}
Assume that a {{task}} consumes multiple {{{}buckets{}}}, and there is enough 
split in each {{bucket}} , so that the first {{bucket}} will always be assigned 
to the task, and other buckets may not be consumed for a long time, resulting 
in uneven consumption and difficulty in advancing {{{}watermark{}}}. So I think 
we should change the split allocation algorithm to a fair algorithm.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to