I looked a bit further and it seems it should actually be easier than I initially thought: SourceReader extends CheckpointListener interface and with its custom implementation it should be possible to achieve similar results. A prototype that I have for the generator uses an IteratorSourceReader under the hood by default but we could consider adding the ability to supply something like a DataGeneratorSourceReaderFactory that would allow provisioning the DataGeneratorSource with customized implementations for cases like this.
Best, Alexander Fedulov On Wed, Jun 8, 2022 at 12:58 AM Alexander Fedulov <alexan...@ververica.com> wrote: > Hi Steven, > > This is going to be tricky since in the new Source API the checkpointing > aspects that you based your logic on are pushed further away from the > low-level interfaces responsible for handling data and splits [1]. At the > same time, the SourceCoordinatorProvider is hardwired into the internals > of the framework, so I don't think it will be possible to provide a > customized implementation for testing purposes. > > The only chance to tie data generation to checkpointing in the new Source > API that I see at the moment is via the SplitEnumerator serializer ( > getEnumeratorCheckpointSerializer() method) [2]. In theory, it should be > possible to share a variable visible both to the generator function and to > the serializer and manipulate it whenever the serialize() method gets > called upon a checkpoint request. That said, you still won't get > notifications of successful checkpoints that you currently use (this info > is only available to the SourceCoordinator). > > In general, regardless of the generator implementation itself, the new Source > API does not seem to support the use case of verifying checkpoints > contents in lockstep with produced data, at least I do not see an immediate > solution for this. Can you think of a different way of checking the > correctness of the Iceberg Sink implementation that does not rely on this > approach? > > Best, > Alexander Fedulov > > [1] > https://github.com/apache/flink/blob/0f19c2472c54aac97e4067f5398731ab90036d1a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L337 > > [2] > https://github.com/apache/flink/blob/e4b000818c15b5b781c4e5262ba83bfc9d65121a/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java#L97 > > On Tue, Jun 7, 2022 at 6:03 PM Steven Wu <stevenz...@gmail.com> wrote: > >> In Iceberg source, we have a data generator source that can control the >> records per checkpoint cycle. Can we support sth like this in the >> DataGeneratorSource? >> >> >> https://github.com/apache/iceberg/blob/master/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTestSource.java >> public BoundedTestSource(List<List<T>> elementsPerCheckpoint, boolean >> checkpointEnabled) >> >> Thanks, >> Steven >> >> On Tue, Jun 7, 2022 at 8:48 AM Alexander Fedulov <alexan...@ververica.com >> > >> wrote: >> >> > Hi everyone, >> > >> > I would like to open a discussion on FLIP-238: Introduce FLIP-27-based >> Data >> > Generator Source [1]. During the discussion about deprecating the >> > SourceFunction API [2] it became evident that an easy-to-use >> > FLIP-27-compatible data generator source is needed so that the current >> > SourceFunction-based data generator implementations could be phased out >> for >> > both Flink demo/PoC applications and for the internal Flink tests. This >> > FLIP proposes to introduce a generic DataGeneratorSource capable of >> > producing events of an arbitrary type based on a user-supplied >> MapFunction. >> > >> > Looking forward to your feedback. >> > >> > [1] https://cwiki.apache.org/confluence/x/9Av1D >> > [2] https://lists.apache.org/thread/d6cwqw9b3105wcpdkwq7rr4s7x4ywqr9 >> > >> > Best, >> > Alexander Fedulov >> > >> >