Hey Alex, Yes, I think we need to make sure that we're not causing confusion (I know I already was confused). I think the DataSupplierSource is already better, but perhaps there are others who have an even better idea.
Thanks, Martijn Op do 9 jun. 2022 om 14:28 schreef Alexander Fedulov < alexan...@ververica.com>: > Hi Martijn, > > It seems that they serve a bit different purposes though. The > DataGenTableSource is for generating random data described by the Table > DDL and is tied into the RowDataGenerator/DataGenerator concept which is > implemented as an Iterator<T>. The proposed API in contrast is supposed > to provide users with an easy way to supply their custom data. Another > difference is that a DataGenerator is supposed to be stateful and has to > snapshot its state, whereas the proposed API is purely driven by the input > index IDs and can be stateless yet remain deterministic. Are you sure it > is a good idea to mix them into the same API? We could think of using a > different name to make it less confusing for the users (something like > DataSupplierSource). > > Best, > Alexander Fedulov > > On Thu, Jun 9, 2022 at 9:14 AM Martijn Visser <martijnvis...@apache.org> > wrote: > >> Hi Alex, >> >> Thanks for creating the FLIP and opening up the discussion. +1 overall for >> getting this in place. >> >> One question: you've already mentioned that this focussed on the >> DataStream >> API. I think it would be a bit confusing that we have a Datagen connector >> (on the Table side) that wouldn't leverage this target interface. I think >> it would be good if we could already have one generic Datagen connector >> which works for both DataStream API (so that would be a new one in the >> Flink repo) and that the Datagen in the Table landscape is using this >> target interface too. What do you think? >> >> Best regards, >> >> Martijn >> >> Op wo 8 jun. 2022 om 14:21 schreef Alexander Fedulov < >> alexan...@ververica.com>: >> >> > Hi Xianxun, >> > >> > Thanks for bringing it up. I do believe it would be useful to have such >> a >> > CDC data generator but I see the >> > efforts to provide one a bit orthogonal to the DataSourceGenerator >> proposed >> > in the FLIP. FLIP-238 focuses >> > on the DataStream API and I could see integration into the Table/SQL >> > ecosystem as the next step that I would >> > prefer to keep separate (see KafkaDynamicSource reusing >> > KafkaSource<RowData> >> > under the hood [1]). >> > >> > [1] >> > >> > >> https://github.com/apache/flink/blob/3adca15859b36c61116fc56fabc24e9647f0ec5f/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java#L223 >> > >> > Best, >> > Alexander Fedulov >> > >> > >> > >> > >> > On Wed, Jun 8, 2022 at 11:01 AM Xianxun Ye <yxx_c...@163.com> wrote: >> > >> > > Hey Alexander, >> > > >> > > Making datagen source connector easier to use is really helpful during >> > > doing some PoC/Demo. >> > > And I thought about is it possible to produce a changelog stream by >> > > datagen source, so a new flink developer can practice flink sql with >> cdc >> > > data using Flink SQL Client CLI. >> > > In the flink-examples-table module, a ChangelogSocketExample class[1] >> > > describes how to ingest delete or insert data by 'nc' command. Can we >> > > support producing a changelog stream by the new datagen source? >> > > >> > > [1] >> > > >> > >> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogSocketExample.java#L79 >> > > >> > > Best regards, >> > > >> > > Xianxun >> > > >> > > On 06/8/2022 08:10,Alexander Fedulov<alexan...@ververica.com> >> > > <alexan...@ververica.com> wrote: >> > > >> > > I looked a bit further and it seems it should actually be easier than >> I >> > > initially thought: SourceReader extends CheckpointListener interface >> and >> > > with its custom implementation it should be possible to achieve >> similar >> > > results. A prototype that I have for the generator uses an >> > > IteratorSourceReader >> > > under the hood by default but we could consider adding the ability to >> > > supply something like a DataGeneratorSourceReaderFactory that would >> allow >> > > provisioning the DataGeneratorSource with customized implementations >> for >> > > cases like this. >> > > >> > > Best, >> > > Alexander Fedulov >> > > >> > > On Wed, Jun 8, 2022 at 12:58 AM Alexander Fedulov < >> > alexan...@ververica.com >> > > > >> > > wrote: >> > > >> > > Hi Steven, >> > > >> > > This is going to be tricky since in the new Source API the >> checkpointing >> > > aspects that you based your logic on are pushed further away from the >> > > low-level interfaces responsible for handling data and splits [1]. At >> the >> > > same time, the SourceCoordinatorProvider is hardwired into the >> internals >> > > of the framework, so I don't think it will be possible to provide a >> > > customized implementation for testing purposes. >> > > >> > > The only chance to tie data generation to checkpointing in the new >> Source >> > > API that I see at the moment is via the SplitEnumerator serializer ( >> > > getEnumeratorCheckpointSerializer() method) [2]. In theory, it should >> be >> > > possible to share a variable visible both to the generator function >> and >> > to >> > > the serializer and manipulate it whenever the serialize() method gets >> > > called upon a checkpoint request. That said, you still won't get >> > > notifications of successful checkpoints that you currently use (this >> info >> > > is only available to the SourceCoordinator). >> > > >> > > In general, regardless of the generator implementation itself, the new >> > > Source >> > > API does not seem to support the use case of verifying checkpoints >> > > contents in lockstep with produced data, at least I do not see an >> > immediate >> > > solution for this. Can you think of a different way of checking the >> > > correctness of the Iceberg Sink implementation that does not rely on >> this >> > > approach? >> > > >> > > Best, >> > > Alexander Fedulov >> > > >> > > [1] >> > > >> > > >> > >> https://github.com/apache/flink/blob/0f19c2472c54aac97e4067f5398731ab90036d1a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L337 >> > > >> > > [2] >> > > >> > > >> > >> https://github.com/apache/flink/blob/e4b000818c15b5b781c4e5262ba83bfc9d65121a/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java#L97 >> > > >> > > On Tue, Jun 7, 2022 at 6:03 PM Steven Wu <stevenz...@gmail.com> >> wrote: >> > > >> > > In Iceberg source, we have a data generator source that can control >> the >> > > records per checkpoint cycle. Can we support sth like this in the >> > > DataGeneratorSource? >> > > >> > > >> > > >> > > >> > >> https://github.com/apache/iceberg/blob/master/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTestSource.java >> > > public BoundedTestSource(List<List<T>> elementsPerCheckpoint, boolean >> > > checkpointEnabled) >> > > >> > > Thanks, >> > > Steven >> > > >> > > On Tue, Jun 7, 2022 at 8:48 AM Alexander Fedulov < >> > alexan...@ververica.com >> > > >> > > >> > > wrote: >> > > >> > > Hi everyone, >> > > >> > > I would like to open a discussion on FLIP-238: Introduce FLIP-27-based >> > > >> > > Data >> > > >> > > Generator Source [1]. During the discussion about deprecating the >> > > SourceFunction API [2] it became evident that an easy-to-use >> > > FLIP-27-compatible data generator source is needed so that the current >> > > SourceFunction-based data generator implementations could be phased >> out >> > > >> > > for >> > > >> > > both Flink demo/PoC applications and for the internal Flink tests. >> This >> > > FLIP proposes to introduce a generic DataGeneratorSource capable of >> > > producing events of an arbitrary type based on a user-supplied >> > > >> > > MapFunction. >> > > >> > > >> > > Looking forward to your feedback. >> > > >> > > [1] https://cwiki.apache.org/confluence/x/9Av1D >> > > [2] https://lists.apache.org/thread/d6cwqw9b3105wcpdkwq7rr4s7x4ywqr9 >> > > >> > > Best, >> > > Alexander Fedulov >> > > >> > > >> > > >> > > >> > >> >