I looked a bit further and it seems it should actually be easier than I
initially thought:  SourceReader extends CheckpointListener interface and
with its custom implementation it should be possible to achieve similar
results. A prototype that I have for the generator uses an IteratorSourceReader
under the hood by default but we could consider adding the ability to
supply something like a DataGeneratorSourceReaderFactory that would allow
provisioning the DataGeneratorSource with customized implementations for
cases like this.

Best,
Alexander Fedulov

On Wed, Jun 8, 2022 at 12:58 AM Alexander Fedulov <alexan...@ververica.com>
wrote:

> Hi Steven,
>
> This is going to be tricky since in the new Source API the checkpointing
> aspects that you based your logic on are pushed further away from the
> low-level interfaces responsible for handling data and splits [1]. At the
> same time, the SourceCoordinatorProvider is hardwired into the internals
> of the framework, so I don't think it will be possible to provide a
> customized implementation for testing purposes.
>
> The only chance to tie data generation to checkpointing in the new Source
> API that I see at the moment is via the SplitEnumerator serializer (
> getEnumeratorCheckpointSerializer() method) [2]. In theory, it should be
> possible to share a variable visible both to the generator function and to
> the serializer and manipulate it whenever the serialize() method gets
> called upon a checkpoint request. That said, you still won't get
> notifications of successful checkpoints that you currently use (this info
> is only available to the SourceCoordinator).
>
> In general, regardless of the generator implementation itself, the new Source
> API does not seem to support the use case of verifying checkpoints
> contents in lockstep with produced data, at least I do not see an immediate
> solution for this. Can you think of a different way of checking the
> correctness of the Iceberg Sink implementation that does not rely on this
> approach?
>
> Best,
> Alexander Fedulov
>
> [1]
> https://github.com/apache/flink/blob/0f19c2472c54aac97e4067f5398731ab90036d1a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L337
>
> [2]
> https://github.com/apache/flink/blob/e4b000818c15b5b781c4e5262ba83bfc9d65121a/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java#L97
>
> On Tue, Jun 7, 2022 at 6:03 PM Steven Wu <stevenz...@gmail.com> wrote:
>
>> In Iceberg source, we have a data generator source that can control the
>> records per checkpoint cycle. Can we support sth like this in the
>> DataGeneratorSource?
>>
>>
>> https://github.com/apache/iceberg/blob/master/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTestSource.java
>> public BoundedTestSource(List<List<T>> elementsPerCheckpoint, boolean
>> checkpointEnabled)
>>
>> Thanks,
>> Steven
>>
>> On Tue, Jun 7, 2022 at 8:48 AM Alexander Fedulov <alexan...@ververica.com
>> >
>> wrote:
>>
>> > Hi everyone,
>> >
>> > I would like to open a discussion on FLIP-238: Introduce FLIP-27-based
>> Data
>> > Generator Source [1]. During the discussion about deprecating the
>> > SourceFunction API [2] it became evident that an easy-to-use
>> > FLIP-27-compatible data generator source is needed so that the current
>> > SourceFunction-based data generator implementations could be phased out
>> for
>> > both Flink demo/PoC applications and for the internal Flink tests. This
>> > FLIP proposes to introduce a generic DataGeneratorSource capable of
>> > producing events of an arbitrary type based on a user-supplied
>> MapFunction.
>> >
>> > Looking forward to your feedback.
>> >
>> > [1] https://cwiki.apache.org/confluence/x/9Av1D
>> > [2] https://lists.apache.org/thread/d6cwqw9b3105wcpdkwq7rr4s7x4ywqr9
>> >
>> > Best,
>> > Alexander Fedulov
>> >
>>
>

Reply via email to