gang ye created FLINK-27405: ------------------------------- Summary: Refactor SourceCoordinator to abstract BaseCoordinator implementation Key: FLINK-27405 URL: https://issues.apache.org/jira/browse/FLINK-27405 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Reporter: gang ye
To solve small files issue caused by data skewness, Flink Iceberg data shuffling was proposed(design doc [https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo/edit]#). The basic idea is to use statistics operator to collect local statistics for traffic distribution at taskmanagers (workers). Local statistics are periodically sent to the statistics coordinator (running in jobmanager). Once globally aggregated statistics are ready, the statistics coordinator broadcasts them to all operator instances. And then a customized partitioner uses the global statistics which is passed down from statistics operator to distribute data to Iceberg writers. In the process of Flink Iceberg data shuffling implementation, we found that, StatisticsCoordinator can share function with SourceCoordinator#runInEventLoop, StatisticsCoordinatorContext needs similar function as SourceCoordinatorConext#callInCoordinatorThread and the StatisticsCoordinatorProvider ExecutorThreadFactory logic is almost same as SourceCoordinatorProvider#CoordinatorExecutorThreadFactory. So we would want to refactor the source coordinator classes to abstract a general coordinator implementation to reduce the duplicated code when adding other coordinators. -- This message was sent by Atlassian Jira (v8.20.7#820007)