Hi Alex, Thanks for creating the FLIP and opening up the discussion. +1 overall for getting this in place.
One question: you've already mentioned that this focussed on the DataStream API. I think it would be a bit confusing that we have a Datagen connector (on the Table side) that wouldn't leverage this target interface. I think it would be good if we could already have one generic Datagen connector which works for both DataStream API (so that would be a new one in the Flink repo) and that the Datagen in the Table landscape is using this target interface too. What do you think? Best regards, Martijn Op wo 8 jun. 2022 om 14:21 schreef Alexander Fedulov < alexan...@ververica.com>: > Hi Xianxun, > > Thanks for bringing it up. I do believe it would be useful to have such a > CDC data generator but I see the > efforts to provide one a bit orthogonal to the DataSourceGenerator proposed > in the FLIP. FLIP-238 focuses > on the DataStream API and I could see integration into the Table/SQL > ecosystem as the next step that I would > prefer to keep separate (see KafkaDynamicSource reusing > KafkaSource<RowData> > under the hood [1]). > > [1] > > https://github.com/apache/flink/blob/3adca15859b36c61116fc56fabc24e9647f0ec5f/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java#L223 > > Best, > Alexander Fedulov > > > > > On Wed, Jun 8, 2022 at 11:01 AM Xianxun Ye <yxx_c...@163.com> wrote: > > > Hey Alexander, > > > > Making datagen source connector easier to use is really helpful during > > doing some PoC/Demo. > > And I thought about is it possible to produce a changelog stream by > > datagen source, so a new flink developer can practice flink sql with cdc > > data using Flink SQL Client CLI. > > In the flink-examples-table module, a ChangelogSocketExample class[1] > > describes how to ingest delete or insert data by 'nc' command. Can we > > support producing a changelog stream by the new datagen source? > > > > [1] > > > https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogSocketExample.java#L79 > > > > Best regards, > > > > Xianxun > > > > On 06/8/2022 08:10,Alexander Fedulov<alexan...@ververica.com> > > <alexan...@ververica.com> wrote: > > > > I looked a bit further and it seems it should actually be easier than I > > initially thought: SourceReader extends CheckpointListener interface and > > with its custom implementation it should be possible to achieve similar > > results. A prototype that I have for the generator uses an > > IteratorSourceReader > > under the hood by default but we could consider adding the ability to > > supply something like a DataGeneratorSourceReaderFactory that would allow > > provisioning the DataGeneratorSource with customized implementations for > > cases like this. > > > > Best, > > Alexander Fedulov > > > > On Wed, Jun 8, 2022 at 12:58 AM Alexander Fedulov < > alexan...@ververica.com > > > > > wrote: > > > > Hi Steven, > > > > This is going to be tricky since in the new Source API the checkpointing > > aspects that you based your logic on are pushed further away from the > > low-level interfaces responsible for handling data and splits [1]. At the > > same time, the SourceCoordinatorProvider is hardwired into the internals > > of the framework, so I don't think it will be possible to provide a > > customized implementation for testing purposes. > > > > The only chance to tie data generation to checkpointing in the new Source > > API that I see at the moment is via the SplitEnumerator serializer ( > > getEnumeratorCheckpointSerializer() method) [2]. In theory, it should be > > possible to share a variable visible both to the generator function and > to > > the serializer and manipulate it whenever the serialize() method gets > > called upon a checkpoint request. That said, you still won't get > > notifications of successful checkpoints that you currently use (this info > > is only available to the SourceCoordinator). > > > > In general, regardless of the generator implementation itself, the new > > Source > > API does not seem to support the use case of verifying checkpoints > > contents in lockstep with produced data, at least I do not see an > immediate > > solution for this. Can you think of a different way of checking the > > correctness of the Iceberg Sink implementation that does not rely on this > > approach? > > > > Best, > > Alexander Fedulov > > > > [1] > > > > > https://github.com/apache/flink/blob/0f19c2472c54aac97e4067f5398731ab90036d1a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L337 > > > > [2] > > > > > https://github.com/apache/flink/blob/e4b000818c15b5b781c4e5262ba83bfc9d65121a/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java#L97 > > > > On Tue, Jun 7, 2022 at 6:03 PM Steven Wu <stevenz...@gmail.com> wrote: > > > > In Iceberg source, we have a data generator source that can control the > > records per checkpoint cycle. Can we support sth like this in the > > DataGeneratorSource? > > > > > > > > > https://github.com/apache/iceberg/blob/master/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTestSource.java > > public BoundedTestSource(List<List<T>> elementsPerCheckpoint, boolean > > checkpointEnabled) > > > > Thanks, > > Steven > > > > On Tue, Jun 7, 2022 at 8:48 AM Alexander Fedulov < > alexan...@ververica.com > > > > > > wrote: > > > > Hi everyone, > > > > I would like to open a discussion on FLIP-238: Introduce FLIP-27-based > > > > Data > > > > Generator Source [1]. During the discussion about deprecating the > > SourceFunction API [2] it became evident that an easy-to-use > > FLIP-27-compatible data generator source is needed so that the current > > SourceFunction-based data generator implementations could be phased out > > > > for > > > > both Flink demo/PoC applications and for the internal Flink tests. This > > FLIP proposes to introduce a generic DataGeneratorSource capable of > > producing events of an arbitrary type based on a user-supplied > > > > MapFunction. > > > > > > Looking forward to your feedback. > > > > [1] https://cwiki.apache.org/confluence/x/9Av1D > > [2] https://lists.apache.org/thread/d6cwqw9b3105wcpdkwq7rr4s7x4ywqr9 > > > > Best, > > Alexander Fedulov > > > > > > > > >