[ https://issues.apache.org/jira/browse/FLINK-30985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ming li updated FLINK-30985: ---------------------------- Summary: [Flink][Table Store] Change the Splits allocation algorithm of ContinuousFileSplitEnumerator in TableStore to a fair algorithm. (was: [Flink][table-store] Change the Splits allocation algorithm of ContinuousFileSplitEnumerator in TableStore to a fair algorithm.) > [Flink][Table Store] Change the Splits allocation algorithm of > ContinuousFileSplitEnumerator in TableStore to a fair algorithm. > ------------------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-30985 > URL: https://issues.apache.org/jira/browse/FLINK-30985 > Project: Flink > Issue Type: Improvement > Components: Table Store > Reporter: ming li > Priority: Major > > Currently, {{assignSplits}} of {{ContinuousFileSplitEnumerator}} in > {{TableStore}} is performed by traversing the {{{}HashMap{}}}, but since the > number of buckets is fixed, the order of traversal is also fixed. > {code:java} > private void assignSplits() { > bucketSplits.forEach( > (bucket, splits) -> { > if (splits.size() > 0) { > // To ensure the order of consumption, the data of the > same bucket is given > // to a task to be consumed. > int task = bucket % context.currentParallelism(); > if (readersAwaitingSplit.remove(task)) { > // if the reader that requested another split has > failed in the > // meantime, remove > // it from the list of waiting readers > if (!context.registeredReaders().containsKey(task)) { > return; > } > context.assignSplit(splits.poll(), task); > } > } > }); > }{code} > Assume that a {{task}} consumes multiple {{{}buckets{}}}, and there is enough > split in each {{bucket}} , so that the first {{bucket}} will always be > assigned to the task, and other buckets may not be consumed for a long time, > resulting in uneven consumption and difficulty in advancing > {{{}watermark{}}}. So I think we should change the split allocation algorithm > to a fair algorithm. -- This message was sent by Atlassian Jira (v8.20.10#820010)