Hi, dev,

We recently used *SourceCoordinator* and *ExternallyInducedSource* to work
together on some file type connectors to fulfill some requirements, but we
found that these two interfaces do not work well together.

*SourceCoordinator* (FLINK-15101) and *ExternallyInducedSource*
(FLINK-20270) were introduced in Flip27. *SourceCoordinator* is responsible
for running *SplitEnumerator* and coordinating the allocation of *Split*.
*ExternallyInducedSource* allows us to delay making a c*heckpoint* in
Source or make a c*heckpoint* at specified data. This works fine with
connectors like *Kafka*.

But in some connectors (such as hive connector), the split is completely
allocated by the *SourceCoordinator*, and after the consumption is
completed, it needs to wait for the allocation of the next batch of splits
(it is not like kafka that continuously consumes the same split). In
FLINK-28606, we introduced another mechanism: the *OperatorCoordinator* is
not allowed to send *OperatorEvents* to the *Operator* before the
*Operator's* checkpoint is completed.

Considering this scenario, if the data we want has not been produced yet,
but the *SourceCoordinator* receives the c*heckpoint* message, it will
directly make a *checkpoint*, and the *ExternallyInducedSource* will not
make a *checkpoint* immediately after receiving the *checkpoint*, but
continues to wait for a new split. Even if a new split is generated, due to
the behavior of closing *gateway* in *FLINK-28606*, the new split cannot be
assigned to the *Source*, resulting in a deadlock (or forced to wait for
checkpoint to time out).

So should we also add a mechanism similar to *ExternallyInducedSource* in
*OperatorCoordinator*: only make a checkpoint on *OperatorCoordinator* when
*OperatorCoordinator* is ready, which allows us to delay making checkpoint?

[1] https://issues.apache.org/jira/browse/FLINK-15101
[2] https://issues.apache.org/jira/browse/FLINK-20270
[3] https://issues.apache.org/jira/browse/FLINK-28606

Reply via email to