Hi Martijn, It seems that they serve a bit different purposes though. The DataGenTableSource is for generating random data described by the Table DDL and is tied into the RowDataGenerator/DataGenerator concept which is implemented as an Iterator<T>. The proposed API in contrast is supposed to provide users with an easy way to supply their custom data. Another difference is that a DataGenerator is supposed to be stateful and has to snapshot its state, whereas the proposed API is purely driven by the input index IDs and can be stateless yet remain deterministic. Are you sure it is a good idea to mix them into the same API? We could think of using a different name to make it less confusing for the users (something like DataSupplierSource).
Best, Alexander Fedulov On Thu, Jun 9, 2022 at 9:14 AM Martijn Visser <martijnvis...@apache.org> wrote: > Hi Alex, > > Thanks for creating the FLIP and opening up the discussion. +1 overall for > getting this in place. > > One question: you've already mentioned that this focussed on the DataStream > API. I think it would be a bit confusing that we have a Datagen connector > (on the Table side) that wouldn't leverage this target interface. I think > it would be good if we could already have one generic Datagen connector > which works for both DataStream API (so that would be a new one in the > Flink repo) and that the Datagen in the Table landscape is using this > target interface too. What do you think? > > Best regards, > > Martijn > > Op wo 8 jun. 2022 om 14:21 schreef Alexander Fedulov < > alexan...@ververica.com>: > > > Hi Xianxun, > > > > Thanks for bringing it up. I do believe it would be useful to have such a > > CDC data generator but I see the > > efforts to provide one a bit orthogonal to the DataSourceGenerator > proposed > > in the FLIP. FLIP-238 focuses > > on the DataStream API and I could see integration into the Table/SQL > > ecosystem as the next step that I would > > prefer to keep separate (see KafkaDynamicSource reusing > > KafkaSource<RowData> > > under the hood [1]). > > > > [1] > > > > > https://github.com/apache/flink/blob/3adca15859b36c61116fc56fabc24e9647f0ec5f/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java#L223 > > > > Best, > > Alexander Fedulov > > > > > > > > > > On Wed, Jun 8, 2022 at 11:01 AM Xianxun Ye <yxx_c...@163.com> wrote: > > > > > Hey Alexander, > > > > > > Making datagen source connector easier to use is really helpful during > > > doing some PoC/Demo. > > > And I thought about is it possible to produce a changelog stream by > > > datagen source, so a new flink developer can practice flink sql with > cdc > > > data using Flink SQL Client CLI. > > > In the flink-examples-table module, a ChangelogSocketExample class[1] > > > describes how to ingest delete or insert data by 'nc' command. Can we > > > support producing a changelog stream by the new datagen source? > > > > > > [1] > > > > > > https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogSocketExample.java#L79 > > > > > > Best regards, > > > > > > Xianxun > > > > > > On 06/8/2022 08:10,Alexander Fedulov<alexan...@ververica.com> > > > <alexan...@ververica.com> wrote: > > > > > > I looked a bit further and it seems it should actually be easier than I > > > initially thought: SourceReader extends CheckpointListener interface > and > > > with its custom implementation it should be possible to achieve similar > > > results. A prototype that I have for the generator uses an > > > IteratorSourceReader > > > under the hood by default but we could consider adding the ability to > > > supply something like a DataGeneratorSourceReaderFactory that would > allow > > > provisioning the DataGeneratorSource with customized implementations > for > > > cases like this. > > > > > > Best, > > > Alexander Fedulov > > > > > > On Wed, Jun 8, 2022 at 12:58 AM Alexander Fedulov < > > alexan...@ververica.com > > > > > > > wrote: > > > > > > Hi Steven, > > > > > > This is going to be tricky since in the new Source API the > checkpointing > > > aspects that you based your logic on are pushed further away from the > > > low-level interfaces responsible for handling data and splits [1]. At > the > > > same time, the SourceCoordinatorProvider is hardwired into the > internals > > > of the framework, so I don't think it will be possible to provide a > > > customized implementation for testing purposes. > > > > > > The only chance to tie data generation to checkpointing in the new > Source > > > API that I see at the moment is via the SplitEnumerator serializer ( > > > getEnumeratorCheckpointSerializer() method) [2]. In theory, it should > be > > > possible to share a variable visible both to the generator function and > > to > > > the serializer and manipulate it whenever the serialize() method gets > > > called upon a checkpoint request. That said, you still won't get > > > notifications of successful checkpoints that you currently use (this > info > > > is only available to the SourceCoordinator). > > > > > > In general, regardless of the generator implementation itself, the new > > > Source > > > API does not seem to support the use case of verifying checkpoints > > > contents in lockstep with produced data, at least I do not see an > > immediate > > > solution for this. Can you think of a different way of checking the > > > correctness of the Iceberg Sink implementation that does not rely on > this > > > approach? > > > > > > Best, > > > Alexander Fedulov > > > > > > [1] > > > > > > > > > https://github.com/apache/flink/blob/0f19c2472c54aac97e4067f5398731ab90036d1a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L337 > > > > > > [2] > > > > > > > > > https://github.com/apache/flink/blob/e4b000818c15b5b781c4e5262ba83bfc9d65121a/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java#L97 > > > > > > On Tue, Jun 7, 2022 at 6:03 PM Steven Wu <stevenz...@gmail.com> wrote: > > > > > > In Iceberg source, we have a data generator source that can control the > > > records per checkpoint cycle. Can we support sth like this in the > > > DataGeneratorSource? > > > > > > > > > > > > > > > https://github.com/apache/iceberg/blob/master/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTestSource.java > > > public BoundedTestSource(List<List<T>> elementsPerCheckpoint, boolean > > > checkpointEnabled) > > > > > > Thanks, > > > Steven > > > > > > On Tue, Jun 7, 2022 at 8:48 AM Alexander Fedulov < > > alexan...@ververica.com > > > > > > > > > wrote: > > > > > > Hi everyone, > > > > > > I would like to open a discussion on FLIP-238: Introduce FLIP-27-based > > > > > > Data > > > > > > Generator Source [1]. During the discussion about deprecating the > > > SourceFunction API [2] it became evident that an easy-to-use > > > FLIP-27-compatible data generator source is needed so that the current > > > SourceFunction-based data generator implementations could be phased out > > > > > > for > > > > > > both Flink demo/PoC applications and for the internal Flink tests. This > > > FLIP proposes to introduce a generic DataGeneratorSource capable of > > > producing events of an arbitrary type based on a user-supplied > > > > > > MapFunction. > > > > > > > > > Looking forward to your feedback. > > > > > > [1] https://cwiki.apache.org/confluence/x/9Av1D > > > [2] https://lists.apache.org/thread/d6cwqw9b3105wcpdkwq7rr4s7x4ywqr9 > > > > > > Best, > > > Alexander Fedulov > > > > > > > > > > > > > > >