ming li created FLINK-30985: ------------------------------- Summary: [Flink][table-store] Change the Splits allocation algorithm of ContinuousFileSplitEnumerator in TableStore to a fair algorithm. Key: FLINK-30985 URL: https://issues.apache.org/jira/browse/FLINK-30985 Project: Flink Issue Type: Improvement Components: Table Store Reporter: ming li
Currently, {{assignSplits}} of {{ContinuousFileSplitEnumerator}} in {{TableStore}} is performed by traversing the {{{}HashMap{}}}, but since the number of buckets is fixed, the order of traversal is also fixed. {code:java} private void assignSplits() { bucketSplits.forEach( (bucket, splits) -> { if (splits.size() > 0) { // To ensure the order of consumption, the data of the same bucket is given // to a task to be consumed. int task = bucket % context.currentParallelism(); if (readersAwaitingSplit.remove(task)) { // if the reader that requested another split has failed in the // meantime, remove // it from the list of waiting readers if (!context.registeredReaders().containsKey(task)) { return; } context.assignSplit(splits.poll(), task); } } }); }{code} Assume that a {{task}} consumes multiple {{{}buckets{}}}, and there is enough split in each {{bucket}} , so that the first {{bucket}} will always be assigned to the task, and other buckets may not be consumed for a long time, resulting in uneven consumption and difficulty in advancing {{{}watermark{}}}. So I think we should change the split allocation algorithm to a fair algorithm. -- This message was sent by Atlassian Jira (v8.20.10#820010)