Hi Ming,

I am not sure if I fully understand what you want. It seems what you are
looking for is to have a checkpoint triggered at a customized timing which
aligns with some semantic. This is not what the current checkpoint in Flink
was designed for. I think the basic idea of checkpoint is to just take a
snapshot of the current state, so we can restore to that state in case of
failure. This is completely orthogonal to the data semantic.

Even with the ExternallyInducedSourceReader, the checkpoint is still
triggered by the JM. It is just the effective checkpoint barrier message (a
custom message in this case) will not be sent by the JM, but by the
external source storage. This helps when the external source storage needs
its own internal state to be aligned with the state of the Flink
SourceReader. For example, if the external source storage can only seek at
some bulk boundary, then it might wait until the current bulk to finish
before it sends the custom checkpoint barrier to the SourceReader.

 Considering this scenario, if the data we want has not been produced yet,
> but the *SourceCoordinator* receives the c*heckpoint* message, it will
> directly make a *checkpoint*, and the *ExternallyInducedSource* will not
> make a *checkpoint* immediately after receiving the *checkpoint*, but
> continues to wait for a new split. Even if a new split is generated, due to
> the behavior of closing *gateway* in *FLINK-28606*, the new split cannot be
> assigned to the *Source*, resulting in a deadlock (or forced to wait for
> checkpoint to time out).


In this case, the source reader should not "wait" for the splits that are
not included in this checkpoint. These splits should be a part of the next
checkpoint. It would be the Sink's responsibility to ensure the output is
committed in a way that aligns with the user semantic.

That said, I agree it might be useful in some cases if users can decided
the checkpoint triggering timing. But that will be a new feature which
needs some careful design.

Thanks,

Jiangjie (Becket) Qin


On Mon, Feb 27, 2023 at 8:35 PM ming li <joyce.li0...@gmail.com> wrote:

> Hi, dev,
>
> We recently used *SourceCoordinator* and *ExternallyInducedSource* to work
> together on some file type connectors to fulfill some requirements, but we
> found that these two interfaces do not work well together.
>
> *SourceCoordinator* (FLINK-15101) and *ExternallyInducedSource*
> (FLINK-20270) were introduced in Flip27. *SourceCoordinator* is responsible
> for running *SplitEnumerator* and coordinating the allocation of *Split*.
> *ExternallyInducedSource* allows us to delay making a c*heckpoint* in
> Source or make a c*heckpoint* at specified data. This works fine with
> connectors like *Kafka*.
>
> But in some connectors (such as hive connector), the split is completely
> allocated by the *SourceCoordinator*, and after the consumption is
> completed, it needs to wait for the allocation of the next batch of splits
> (it is not like kafka that continuously consumes the same split). In
> FLINK-28606, we introduced another mechanism: the *OperatorCoordinator* is
> not allowed to send *OperatorEvents* to the *Operator* before the
> *Operator's* checkpoint is completed.
>
> Considering this scenario, if the data we want has not been produced yet,
> but the *SourceCoordinator* receives the c*heckpoint* message, it will
> directly make a *checkpoint*, and the *ExternallyInducedSource* will not
> make a *checkpoint* immediately after receiving the *checkpoint*, but
> continues to wait for a new split. Even if a new split is generated, due to
> the behavior of closing *gateway* in *FLINK-28606*, the new split cannot be
> assigned to the *Source*, resulting in a deadlock (or forced to wait for
> checkpoint to time out).
>
> So should we also add a mechanism similar to *ExternallyInducedSource* in
> *OperatorCoordinator*: only make a checkpoint on *OperatorCoordinator* when
> *OperatorCoordinator* is ready, which allows us to delay making checkpoint?
>
> [1] https://issues.apache.org/jira/browse/FLINK-15101
> [2] https://issues.apache.org/jira/browse/FLINK-20270
> [3] https://issues.apache.org/jira/browse/FLINK-28606
>

Reply via email to