Hi, dev, We recently used *SourceCoordinator* and *ExternallyInducedSource* to work together on some file type connectors to fulfill some requirements, but we found that these two interfaces do not work well together.
*SourceCoordinator* (FLINK-15101) and *ExternallyInducedSource* (FLINK-20270) were introduced in Flip27. *SourceCoordinator* is responsible for running *SplitEnumerator* and coordinating the allocation of *Split*. *ExternallyInducedSource* allows us to delay making a c*heckpoint* in Source or make a c*heckpoint* at specified data. This works fine with connectors like *Kafka*. But in some connectors (such as hive connector), the split is completely allocated by the *SourceCoordinator*, and after the consumption is completed, it needs to wait for the allocation of the next batch of splits (it is not like kafka that continuously consumes the same split). In FLINK-28606, we introduced another mechanism: the *OperatorCoordinator* is not allowed to send *OperatorEvents* to the *Operator* before the *Operator's* checkpoint is completed. Considering this scenario, if the data we want has not been produced yet, but the *SourceCoordinator* receives the c*heckpoint* message, it will directly make a *checkpoint*, and the *ExternallyInducedSource* will not make a *checkpoint* immediately after receiving the *checkpoint*, but continues to wait for a new split. Even if a new split is generated, due to the behavior of closing *gateway* in *FLINK-28606*, the new split cannot be assigned to the *Source*, resulting in a deadlock (or forced to wait for checkpoint to time out). So should we also add a mechanism similar to *ExternallyInducedSource* in *OperatorCoordinator*: only make a checkpoint on *OperatorCoordinator* when *OperatorCoordinator* is ready, which allows us to delay making checkpoint? [1] https://issues.apache.org/jira/browse/FLINK-15101 [2] https://issues.apache.org/jira/browse/FLINK-20270 [3] https://issues.apache.org/jira/browse/FLINK-28606