Hi Steven,

This is going to be tricky since in the new Source API the checkpointing
aspects that you based your logic on are pushed further away from the
low-level interfaces responsible for handling data and splits [1]. At the
same time, the SourceCoordinatorProvider is hardwired into the internals of
the framework, so I don't think it will be possible to provide a customized
implementation for testing purposes.

The only chance to tie data generation to checkpointing in the new Source
API that I see at the moment is via the SplitEnumerator serializer (
getEnumeratorCheckpointSerializer() method) [2]. In theory, it should be
possible to share a variable visible both to the generator function and to
the serializer and manipulate it whenever the serialize() method gets
called upon a checkpoint request. That said, you still won't get
notifications of successful checkpoints that you currently use (this info
is only available to the SourceCoordinator).

In general, regardless of the generator implementation itself, the new Source
API does not seem to support the use case of verifying checkpoints contents
in lockstep with produced data, at least I do not see an immediate solution
for this. Can you think of a different way of checking the correctness of
the Iceberg Sink implementation that does not rely on this approach?

Best,
Alexander Fedulov

[1]
https://github.com/apache/flink/blob/0f19c2472c54aac97e4067f5398731ab90036d1a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L337

[2]
https://github.com/apache/flink/blob/e4b000818c15b5b781c4e5262ba83bfc9d65121a/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java#L97

On Tue, Jun 7, 2022 at 6:03 PM Steven Wu <stevenz...@gmail.com> wrote:

> In Iceberg source, we have a data generator source that can control the
> records per checkpoint cycle. Can we support sth like this in the
> DataGeneratorSource?
>
>
> https://github.com/apache/iceberg/blob/master/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTestSource.java
> public BoundedTestSource(List<List<T>> elementsPerCheckpoint, boolean
> checkpointEnabled)
>
> Thanks,
> Steven
>
> On Tue, Jun 7, 2022 at 8:48 AM Alexander Fedulov <alexan...@ververica.com>
> wrote:
>
> > Hi everyone,
> >
> > I would like to open a discussion on FLIP-238: Introduce FLIP-27-based
> Data
> > Generator Source [1]. During the discussion about deprecating the
> > SourceFunction API [2] it became evident that an easy-to-use
> > FLIP-27-compatible data generator source is needed so that the current
> > SourceFunction-based data generator implementations could be phased out
> for
> > both Flink demo/PoC applications and for the internal Flink tests. This
> > FLIP proposes to introduce a generic DataGeneratorSource capable of
> > producing events of an arbitrary type based on a user-supplied
> MapFunction.
> >
> > Looking forward to your feedback.
> >
> > [1] https://cwiki.apache.org/confluence/x/9Av1D
> > [2] https://lists.apache.org/thread/d6cwqw9b3105wcpdkwq7rr4s7x4ywqr9
> >
> > Best,
> > Alexander Fedulov
> >
>

Reply via email to