Hi Alex, Personally I prefer the latter option, i.e. just add the currentParallelism() method. It is easy to add more stuff to the SourceReaderContext in the future, and it is likely that most of the stuff in the RuntimeContext is not required by the SourceReader implementations. For the purpose of this FLIP, adding the method is probably good enough.
That said, I don't see a consistent pattern adopted in the project to handle similar cases. The FunctionContext wraps the RuntimeContext and only exposes necessary stuff. CEPRuntimeContext extends the RuntimeContext and overrides some methods that it does not want to expose with exception throwing logic. Some internal context classes simply expose the entire RuntimeContext with some additional methods. If we want to make things clean, I'd imagine all these variations of context can become some specific combination of a ReadOnlyRuntimeContext and some "write" methods. But this may require a closer look at all these cases to make sure the ReadOnlyRuntimeContext is generally suitable. I feel that it will take some time and could be a bigger discussion than the data generator source itself. So maybe we can just go with adding a method at the moment. And evolving the SourceReaderContext to use the ReadOnlyRuntimeContext in the future. Thanks, Jiangjie (Becket) Qin On Tue, Jul 5, 2022 at 8:31 PM Alexander Fedulov <alexan...@ververica.com> wrote: > Hi Becket, > > I agree with you. We could introduce a *ReadOnlyRuntimeContext* that would > act as a holder for the *RuntimeContext* data. This would also require > read-only wrappers for the exposed fields, such as *ExecutionConfig*. > Alternatively, we just add the *currentParallelism()* method for now and > see if anything else might actually be needed later on. What do you think? > > Best, > Alexander Fedulov > > On Tue, Jul 5, 2022 at 2:30 AM Becket Qin <becket....@gmail.com> wrote: > > > Hi Alex, > > > > While it is true that the RuntimeContext gives access to all the stuff > the > > framework can provide, it seems a little overkilling for the > SourceReader. > > It is probably OK to expose all the read-only information in the > > RuntimeContext to the SourceReader, but we may want to hide the "write" > > methods, such as creating states, writing stuff to distributed cache, > etc, > > because these methods may not work well with the SourceReader design and > > cause confusion. For example, users may wonder why the snapshotState() > > method exists while they can use the state directly. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > > > > > On Tue, Jul 5, 2022 at 7:37 AM Alexander Fedulov < > alexan...@ververica.com> > > wrote: > > > > > Hi Becket, > > > > > > I updated and extended FLIP-238 accordingly. > > > > > > Here is also my POC branch [1]. > > > DataGeneratorSourceV3 is the class that I currently converged on [2]. > It > > is > > > based on the expanded SourceReaderContext. > > > A couple more relevant classes [3] [4] > > > > > > Would appreciate it if you could take a quick look. > > > > > > [1] > https://github.com/afedulov/flink/tree/FLINK-27919-generator-source > > > [2] > > > > > > > > > https://github.com/afedulov/flink/blob/FLINK-27919-generator-source/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceV3.java > > > [3] > > > > > > > > > https://github.com/afedulov/flink/blob/FLINK-27919-generator-source/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/MappingIteratorSourceReader.java > > > [4] > > > > > > > > > https://github.com/afedulov/flink/blob/FLINK-27919-generator-source/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/RateLimitedSourceReader.java > > > > > > Best, > > > Alexander Fedulov > > > > > > On Mon, Jul 4, 2022 at 12:08 PM Alexander Fedulov < > > alexan...@ververica.com > > > > > > > wrote: > > > > > > > Hi Becket, > > > > > > > > Exposing the RuntimeContext is potentially even more useful. > > > > Do you think it is worth having both currentParallelism() and > > > > getRuntimeContext() methods? > > > > One can always call getNumberOfParallelSubtasks() on the > RuntimeContext > > > > directly if we expose it. > > > > > > > > Best, > > > > Alexander Fedulov > > > > > > > > > > > > On Mon, Jul 4, 2022 at 3:44 AM Becket Qin <becket....@gmail.com> > > wrote: > > > > > > > >> Hi Alex, > > > >> > > > >> Yes, that is what I had in mind. We need to add the method > > > >> getRuntimeContext() to the SourceReaderContext interface as well. > > > >> > > > >> Thanks, > > > >> > > > >> Jiangjie (Becket) Qin > > > >> > > > >> On Mon, Jul 4, 2022 at 3:01 AM Alexander Fedulov < > > > alexan...@ververica.com > > > >> > > > > >> wrote: > > > >> > > > >> > Hi Becket, > > > >> > > > > >> > thanks for your input. I like the idea of adding the parallelism > to > > > the > > > >> > SourceReaderContext. My understanding is that any change of > > > parallelism > > > >> > causes recreation of all readers, so it should be safe to consider > > it > > > >> > "fixed" after the readers' initialization. In that case, it should > > be > > > as > > > >> > simple as adding the following to the anonymous > SourceReaderContext > > > >> > implementation > > > >> > in SourceOperator#initReader(): > > > >> > > > > >> > public int currentParallelism() { > > > >> > return getRuntimeContext().getNumberOfParallelSubtasks(); > > > >> > } > > > >> > > > > >> > Is that what you had in mind? > > > >> > > > > >> > Best, > > > >> > Alexander Fedulov > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > On Fri, Jul 1, 2022 at 11:30 AM Becket Qin <becket....@gmail.com> > > > >> wrote: > > > >> > > > > >> > > Hi Alex, > > > >> > > > > > >> > > In FLIP-27 source, the SourceReader can get a > SourceReaderContext. > > > >> This > > > >> > is > > > >> > > passed in by the TM in Source#createReader(). And supposedly the > > > >> Source > > > >> > > should pass this to the SourceReader if needed. > > > >> > > > > > >> > > In the SourceReaderContext, currently only the index of the > > current > > > >> > subtask > > > >> > > is available, but we can probably add the current parallelism as > > > well. > > > >> > This > > > >> > > would be a change that affects all the Sources, not only for the > > > data > > > >> > > generator source. Perhaps we can have a simple separate FLIP. > > > >> > > > > > >> > > Regarding the semantic of rate limiting, for the rate limit > > source, > > > >> > > personally I feel intuitive to keep the global rate untouched on > > > >> scaling. > > > >> > > > > > >> > > Thanks, > > > >> > > > > > >> > > Jiangjie (Becket) Qin > > > >> > > > > > >> > > On Fri, Jul 1, 2022 at 4:00 AM Alexander Fedulov < > > > >> > alexan...@ververica.com> > > > >> > > wrote: > > > >> > > > > > >> > > > Hi all, > > > >> > > > > > > >> > > > getting back to the idea of reusing FlinkConnectorRateLimiter: > > it > > > is > > > >> > > > designed for the SourceFunction API and has an open() method > > that > > > >> > takes a > > > >> > > > RuntimeContext. Therefore, we need to add a different > interface > > > for > > > >> > > > the new Source > > > >> > > > API. > > > >> > > > > > > >> > > > This is where I see a certain limitation for the rate-limiting > > use > > > >> > case: > > > >> > > in > > > >> > > > the old API the individual readers were able to retrieve the > > > current > > > >> > > > parallelism from the RuntimeContext. In the new API, this is > not > > > >> > > supported, > > > >> > > > the information about the parallelism is only available in the > > > >> > > > SplitEnumeratorContext to which the readers do not have > access. > > > >> > > > > > > >> > > > I see two possibilities: > > > >> > > > 1. Add an optional RateLimiter parameter to the > > > DataGeneratorSource > > > >> > > > constructor. The RateLimiter is then "fixed" and has to be > fully > > > >> > > configured > > > >> > > > by the user in the main method. > > > >> > > > 2. Piggy-back on Splits: add parallelism as a field of a > Split. > > > The > > > >> > > > initialization of this field would happen dynamically upon > > splits > > > >> > > creation > > > >> > > > in the createEnumerator() method where currentParallelism is > > > >> available. > > > >> > > > > > > >> > > > The second approach makes implementation rather significantly > > more > > > >> > > > complex since we cannot simply wrap > > > >> > NumberSequenceSource.SplitSerializer > > > >> > > in > > > >> > > > that case. The advantage of this approach is that with any > kind > > of > > > >> > > > autoscaling, the source rate will match the original > > > configuration. > > > >> But > > > >> > > I'm > > > >> > > > not sure how useful this is. I can even imagine scenarios > where > > > >> scaling > > > >> > > the > > > >> > > > input rate together with parallelism would be better for demo > > > >> purposes. > > > >> > > > > > > >> > > > Would be glad to hear your thoughts on this. > > > >> > > > > > > >> > > > Best, > > > >> > > > Alexander Fedulov > > > >> > > > > > > >> > > > On Mon, Jun 20, 2022 at 4:31 PM David Anderson < > > > >> dander...@apache.org> > > > >> > > > wrote: > > > >> > > > > > > >> > > > > I'm very happy with this. +1 > > > >> > > > > > > > >> > > > > A lot of SourceFunction implementations used in demos/POC > > > >> > > implementations > > > >> > > > > include a call to sleep(), so adding rate limiting is a good > > > >> idea, in > > > >> > > my > > > >> > > > > opinion. > > > >> > > > > > > > >> > > > > Best, > > > >> > > > > David > > > >> > > > > > > > >> > > > > On Mon, Jun 20, 2022 at 10:10 AM Qingsheng Ren < > > > >> renqs...@gmail.com> > > > >> > > > wrote: > > > >> > > > > > > > >> > > > > > Hi Alexander, > > > >> > > > > > > > > >> > > > > > Thanks for creating this FLIP! I’d like to share some > > > thoughts. > > > >> > > > > > > > > >> > > > > > 1. About the “generatorFunction” I’m expecting an > > initializer > > > >> on it > > > >> > > > > > because it’s hard to require all fields in the generator > > > >> function > > > >> > are > > > >> > > > > > serializable in user’s implementation. Providing a > function > > > like > > > >> > > “open” > > > >> > > > > in > > > >> > > > > > the interface could let the function to make some > > > >> initializations > > > >> > in > > > >> > > > the > > > >> > > > > > task initializing stage. > > > >> > > > > > > > > >> > > > > > 2. As of the throttling functinality you mentioned, > there’s > > a > > > >> > > > > > FlinkConnectorRateLimiter under flink-core and maybe we > > could > > > >> reuse > > > >> > > > this > > > >> > > > > > interface. Actually I prefer to make rate limiting as a > > common > > > >> > > feature > > > >> > > > > > provided in the Source API, but this requires another FLIP > > > and a > > > >> > lot > > > >> > > of > > > >> > > > > > discussions so I’m OK to have it in the DataGen source > > first. > > > >> > > > > > > > > >> > > > > > Best regards, > > > >> > > > > > Qingsheng > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > On Jun 17, 2022, at 01:47, Alexander Fedulov < > > > >> > > > alexan...@ververica.com> > > > >> > > > > > wrote: > > > >> > > > > > > > > > >> > > > > > > Hi Jing, > > > >> > > > > > > > > > >> > > > > > > thanks for your thorough analysis. I agree with the > points > > > you > > > >> > make > > > >> > > > and > > > >> > > > > > > also with the idea to approach the larger task of > > providing > > > a > > > >> > > > universal > > > >> > > > > > > (DataStream + SQL) data generator base iteratively. > > > >> > > > > > > Regarding the name, the SourceFunction-based > > > >> > *DataGeneratorSource* > > > >> > > > > > resides > > > >> > > > > > > in the > > > >> > *org.apache.flink.streaming.api.functions.source.datagen*. I > > > >> > > > > think > > > >> > > > > > > it is OK to simply place the new one (with the same > name) > > > >> next to > > > >> > > the > > > >> > > > > > > *NumberSequenceSource* into > > > >> > > > > *org.apache.flink.api.connector.source.lib*. > > > >> > > > > > > > > > >> > > > > > > One more thing I wanted to discuss: I noticed that > > > >> > > > *DataGenTableSource > > > >> > > > > > *has > > > >> > > > > > > built-in throttling functionality (*rowsPerSecond*). I > > > >> believe it > > > >> > > is > > > >> > > > > > > something that could be also useful for the DataStream > > users > > > >> of > > > >> > the > > > >> > > > > > > stateless data generator and since we want to eventually > > > >> converge > > > >> > > on > > > >> > > > > the > > > >> > > > > > > same implementation for DataStream and Table/SQL it > sounds > > > >> like a > > > >> > > > good > > > >> > > > > > idea > > > >> > > > > > > to add it to the FLIP. What do you think? > > > >> > > > > > > > > > >> > > > > > > Best, > > > >> > > > > > > Alexander Fedulov > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > On Tue, Jun 14, 2022 at 7:17 PM Jing Ge < > > j...@ververica.com > > > > > > > >> > > wrote: > > > >> > > > > > > > > > >> > > > > > >> Hi, > > > >> > > > > > >> > > > >> > > > > > >> After reading all discussions posted in this thread and > > the > > > >> > source > > > >> > > > > code > > > >> > > > > > of > > > >> > > > > > >> DataGeneratorSource which unfortunately used "Source" > > > >> instead of > > > >> > > > > > >> "SourceFunction" in its name, issues could summarized > as > > > >> > > following: > > > >> > > > > > >> > > > >> > > > > > >> 1. The current DataGeneratorSource based on > > SourceFunction > > > >> is a > > > >> > > > > stateful > > > >> > > > > > >> source connector and built for Table/SQL. > > > >> > > > > > >> 2. The right name for the new data generator source > i.e. > > > >> > > > > > >> DataGeneratorSource has been used for the current > > > >> implementation > > > >> > > > based > > > >> > > > > > on > > > >> > > > > > >> SourceFunction. > > > >> > > > > > >> 3. A new data generator source should be developed > based > > on > > > >> the > > > >> > > new > > > >> > > > > > Source > > > >> > > > > > >> API. > > > >> > > > > > >> 4. The new data generator source should be used both > for > > > >> > > DataStream > > > >> > > > > and > > > >> > > > > > >> Table/SQL, which means the current DataGeneratorSource > > > >> should be > > > >> > > > > > replaced > > > >> > > > > > >> with the new one. > > > >> > > > > > >> 5. The core event generation logic should be pluggable > to > > > >> > support > > > >> > > > > > various > > > >> > > > > > >> (test) scenarios, e.g. rondom stream, changlog stream, > > > >> > > controllable > > > >> > > > > > events > > > >> > > > > > >> per checkpoint, etc. > > > >> > > > > > >> > > > >> > > > > > >> which turns out that > > > >> > > > > > >> > > > >> > > > > > >> To solve 1+3+4 -> we will have to make a big effort to > > > >> replace > > > >> > the > > > >> > > > > > current > > > >> > > > > > >> DataGeneratorSource since the new Source API has a very > > > >> > different > > > >> > > > > > >> concept, especially for the stateful part. > > > >> > > > > > >> To solve 2+3 -> we have to find another name for the > new > > > >> > > > > implementation. > > > >> > > > > > >> To solve 1+3+4+5 -> It gets even more complicated to > > > support > > > >> > > > stateless > > > >> > > > > > and > > > >> > > > > > >> stateful scenarios simultaneously with one solution. > > > >> > > > > > >> > > > >> > > > > > >> If we want to solve all of these issues in one shot, It > > > might > > > >> > take > > > >> > > > > > months. > > > >> > > > > > >> Therefore, I would suggest starting from small and > > growing > > > up > > > >> > > > > > iteratively. > > > >> > > > > > >> > > > >> > > > > > >> The proposal for the kickoff is to focus on stateless > > event > > > >> > > > generation > > > >> > > > > > >> with e.g. rondom stream and use the name > > > >> > > > > "StatelessDataGeneratoSource". > > > >> > > > > > >> The will be a period of time that both > > DataGeneratorSource > > > >> will > > > >> > be > > > >> > > > > used > > > >> > > > > > by > > > >> > > > > > >> the developer. The current DataGeneratorSource will be > > then > > > >> > > > > deprecated, > > > >> > > > > > >> once we can(iteratively): > > > >> > > > > > >> 1. either enlarge the scope of > > StatelessDataGeneratoSourcer > > > >> to > > > >> > be > > > >> > > > able > > > >> > > > > > to > > > >> > > > > > >> cover stateful scenarios and renaming it to > > > >> > > > > > "DataGeneratorSourceV2"(follow > > > >> > > > > > >> the naming convention of SinkV2) or > > > >> > > > > > >> 2. develop a new "SatefullDataGeneratorSource" based on > > > >> Source > > > >> > API > > > >> > > > > which > > > >> > > > > > >> can handle the stateful scenarios, if it is impossible > to > > > >> > support > > > >> > > > both > > > >> > > > > > >> stateless and stateful scenarios with one > GeneratorSource > > > >> > > > > > implementation. > > > >> > > > > > >> > > > >> > > > > > >> Best regards, > > > >> > > > > > >> Jing > > > >> > > > > > >> > > > >> > > > > > >> On Thu, Jun 9, 2022 at 2:48 PM Martijn Visser < > > > >> > > > > martijnvis...@apache.org > > > >> > > > > > > > > > >> > > > > > >> wrote: > > > >> > > > > > >> > > > >> > > > > > >>> Hey Alex, > > > >> > > > > > >>> > > > >> > > > > > >>> Yes, I think we need to make sure that we're not > causing > > > >> > > confusion > > > >> > > > (I > > > >> > > > > > know > > > >> > > > > > >>> I already was confused). I think the > DataSupplierSource > > is > > > >> > > already > > > >> > > > > > better, > > > >> > > > > > >>> but perhaps there are others who have an even better > > idea. > > > >> > > > > > >>> > > > >> > > > > > >>> Thanks, > > > >> > > > > > >>> > > > >> > > > > > >>> Martijn > > > >> > > > > > >>> > > > >> > > > > > >>> Op do 9 jun. 2022 om 14:28 schreef Alexander Fedulov < > > > >> > > > > > >>> alexan...@ververica.com>: > > > >> > > > > > >>> > > > >> > > > > > >>>> Hi Martijn, > > > >> > > > > > >>>> > > > >> > > > > > >>>> It seems that they serve a bit different purposes > > though. > > > >> The > > > >> > > > > > >>>> DataGenTableSource is for generating random data > > > described > > > >> by > > > >> > > the > > > >> > > > > > Table > > > >> > > > > > >>>> DDL and is tied into the > RowDataGenerator/DataGenerator > > > >> > concept > > > >> > > > > which > > > >> > > > > > is > > > >> > > > > > >>>> implemented as an Iterator<T>. The proposed API in > > > >> contrast > > > >> > is > > > >> > > > > > supposed > > > >> > > > > > >>>> to provide users with an easy way to supply their > > custom > > > >> data. > > > >> > > > > Another > > > >> > > > > > >>>> difference is that a DataGenerator is supposed to be > > > >> stateful > > > >> > > and > > > >> > > > > has > > > >> > > > > > to > > > >> > > > > > >>>> snapshot its state, whereas the proposed API is > purely > > > >> driven > > > >> > by > > > >> > > > the > > > >> > > > > > >>> input > > > >> > > > > > >>>> index IDs and can be stateless yet remain > > deterministic. > > > >> Are > > > >> > you > > > >> > > > > sure > > > >> > > > > > it > > > >> > > > > > >>>> is a good idea to mix them into the same API? We > could > > > >> think > > > >> > of > > > >> > > > > using > > > >> > > > > > a > > > >> > > > > > >>>> different name to make it less confusing for the > users > > > >> > > (something > > > >> > > > > like > > > >> > > > > > >>>> DataSupplierSource). > > > >> > > > > > >>>> > > > >> > > > > > >>>> Best, > > > >> > > > > > >>>> Alexander Fedulov > > > >> > > > > > >>>> > > > >> > > > > > >>>> On Thu, Jun 9, 2022 at 9:14 AM Martijn Visser < > > > >> > > > > > martijnvis...@apache.org > > > >> > > > > > >>>> > > > >> > > > > > >>>> wrote: > > > >> > > > > > >>>> > > > >> > > > > > >>>>> Hi Alex, > > > >> > > > > > >>>>> > > > >> > > > > > >>>>> Thanks for creating the FLIP and opening up the > > > >> discussion. > > > >> > +1 > > > >> > > > > > overall > > > >> > > > > > >>> for > > > >> > > > > > >>>>> getting this in place. > > > >> > > > > > >>>>> > > > >> > > > > > >>>>> One question: you've already mentioned that this > > > focussed > > > >> on > > > >> > > the > > > >> > > > > > >>>>> DataStream > > > >> > > > > > >>>>> API. I think it would be a bit confusing that we > have > > a > > > >> > Datagen > > > >> > > > > > >>> connector > > > >> > > > > > >>>>> (on the Table side) that wouldn't leverage this > target > > > >> > > > interface. I > > > >> > > > > > >>> think > > > >> > > > > > >>>>> it would be good if we could already have one > generic > > > >> Datagen > > > >> > > > > > connector > > > >> > > > > > >>>>> which works for both DataStream API (so that would > be > > a > > > >> new > > > >> > one > > > >> > > > in > > > >> > > > > > the > > > >> > > > > > >>>>> Flink repo) and that the Datagen in the Table > > landscape > > > is > > > >> > > using > > > >> > > > > this > > > >> > > > > > >>>>> target interface too. What do you think? > > > >> > > > > > >>>>> > > > >> > > > > > >>>>> Best regards, > > > >> > > > > > >>>>> > > > >> > > > > > >>>>> Martijn > > > >> > > > > > >>>>> > > > >> > > > > > >>>>> Op wo 8 jun. 2022 om 14:21 schreef Alexander > Fedulov < > > > >> > > > > > >>>>> alexan...@ververica.com>: > > > >> > > > > > >>>>> > > > >> > > > > > >>>>>> Hi Xianxun, > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>>> Thanks for bringing it up. I do believe it would be > > > >> useful > > > >> > to > > > >> > > > have > > > >> > > > > > >>> such > > > >> > > > > > >>>>> a > > > >> > > > > > >>>>>> CDC data generator but I see the > > > >> > > > > > >>>>>> efforts to provide one a bit orthogonal to the > > > >> > > > DataSourceGenerator > > > >> > > > > > >>>>> proposed > > > >> > > > > > >>>>>> in the FLIP. FLIP-238 focuses > > > >> > > > > > >>>>>> on the DataStream API and I could see integration > > into > > > >> the > > > >> > > > > Table/SQL > > > >> > > > > > >>>>>> ecosystem as the next step that I would > > > >> > > > > > >>>>>> prefer to keep separate (see KafkaDynamicSource > > reusing > > > >> > > > > > >>>>>> KafkaSource<RowData> > > > >> > > > > > >>>>>> under the hood [1]). > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>>> [1] > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>> > > > >> > > > > > >>> > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > https://github.com/apache/flink/blob/3adca15859b36c61116fc56fabc24e9647f0ec5f/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java#L223 > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>>> Best, > > > >> > > > > > >>>>>> Alexander Fedulov > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>>> On Wed, Jun 8, 2022 at 11:01 AM Xianxun Ye < > > > >> > yxx_c...@163.com> > > > >> > > > > > wrote: > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>>>> Hey Alexander, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Making datagen source connector easier to use is > > > really > > > >> > > helpful > > > >> > > > > > >>> during > > > >> > > > > > >>>>>>> doing some PoC/Demo. > > > >> > > > > > >>>>>>> And I thought about is it possible to produce a > > > >> changelog > > > >> > > > stream > > > >> > > > > by > > > >> > > > > > >>>>>>> datagen source, so a new flink developer can > > practice > > > >> flink > > > >> > > sql > > > >> > > > > > >>> with > > > >> > > > > > >>>>> cdc > > > >> > > > > > >>>>>>> data using Flink SQL Client CLI. > > > >> > > > > > >>>>>>> In the flink-examples-table module, a > > > >> > ChangelogSocketExample > > > >> > > > > > >>> class[1] > > > >> > > > > > >>>>>>> describes how to ingest delete or insert data by > > 'nc' > > > >> > > command. > > > >> > > > > Can > > > >> > > > > > >>> we > > > >> > > > > > >>>>>>> support producing a changelog stream by the new > > > datagen > > > >> > > source? > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> [1] > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>> > > > >> > > > > > >>> > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogSocketExample.java#L79 > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Best regards, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Xianxun > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> On 06/8/2022 08:10,Alexander Fedulov< > > > >> > alexan...@ververica.com > > > >> > > > > > > >> > > > > > >>>>>>> <alexan...@ververica.com> wrote: > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> I looked a bit further and it seems it should > > actually > > > >> be > > > >> > > > easier > > > >> > > > > > >>> than > > > >> > > > > > >>>>> I > > > >> > > > > > >>>>>>> initially thought: SourceReader extends > > > >> CheckpointListener > > > >> > > > > > >>> interface > > > >> > > > > > >>>>> and > > > >> > > > > > >>>>>>> with its custom implementation it should be > possible > > > to > > > >> > > achieve > > > >> > > > > > >>>>> similar > > > >> > > > > > >>>>>>> results. A prototype that I have for the generator > > > uses > > > >> an > > > >> > > > > > >>>>>>> IteratorSourceReader > > > >> > > > > > >>>>>>> under the hood by default but we could consider > > adding > > > >> the > > > >> > > > > ability > > > >> > > > > > >>> to > > > >> > > > > > >>>>>>> supply something like a > > > DataGeneratorSourceReaderFactory > > > >> > that > > > >> > > > > would > > > >> > > > > > >>>>> allow > > > >> > > > > > >>>>>>> provisioning the DataGeneratorSource with > customized > > > >> > > > > > >>> implementations > > > >> > > > > > >>>>> for > > > >> > > > > > >>>>>>> cases like this. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Best, > > > >> > > > > > >>>>>>> Alexander Fedulov > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> On Wed, Jun 8, 2022 at 12:58 AM Alexander Fedulov > < > > > >> > > > > > >>>>>> alexan...@ververica.com > > > >> > > > > > >>>>>>>> > > > >> > > > > > >>>>>>> wrote: > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Hi Steven, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> This is going to be tricky since in the new Source > > API > > > >> the > > > >> > > > > > >>>>> checkpointing > > > >> > > > > > >>>>>>> aspects that you based your logic on are pushed > > > further > > > >> > away > > > >> > > > from > > > >> > > > > > >>> the > > > >> > > > > > >>>>>>> low-level interfaces responsible for handling data > > and > > > >> > splits > > > >> > > > > [1]. > > > >> > > > > > >>> At > > > >> > > > > > >>>>> the > > > >> > > > > > >>>>>>> same time, the SourceCoordinatorProvider is > > hardwired > > > >> into > > > >> > > the > > > >> > > > > > >>>>> internals > > > >> > > > > > >>>>>>> of the framework, so I don't think it will be > > possible > > > >> to > > > >> > > > > provide a > > > >> > > > > > >>>>>>> customized implementation for testing purposes. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> The only chance to tie data generation to > > > checkpointing > > > >> in > > > >> > > the > > > >> > > > > new > > > >> > > > > > >>>>> Source > > > >> > > > > > >>>>>>> API that I see at the moment is via the > > > SplitEnumerator > > > >> > > > > serializer > > > >> > > > > > >>> ( > > > >> > > > > > >>>>>>> getEnumeratorCheckpointSerializer() method) [2]. > In > > > >> theory, > > > >> > > it > > > >> > > > > > >>> should > > > >> > > > > > >>>>> be > > > >> > > > > > >>>>>>> possible to share a variable visible both to the > > > >> generator > > > >> > > > > function > > > >> > > > > > >>>>> and > > > >> > > > > > >>>>>> to > > > >> > > > > > >>>>>>> the serializer and manipulate it whenever the > > > >> serialize() > > > >> > > > method > > > >> > > > > > >>> gets > > > >> > > > > > >>>>>>> called upon a checkpoint request. That said, you > > still > > > >> > won't > > > >> > > > get > > > >> > > > > > >>>>>>> notifications of successful checkpoints that you > > > >> currently > > > >> > > use > > > >> > > > > > >>> (this > > > >> > > > > > >>>>> info > > > >> > > > > > >>>>>>> is only available to the SourceCoordinator). > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> In general, regardless of the generator > > implementation > > > >> > > itself, > > > >> > > > > the > > > >> > > > > > >>> new > > > >> > > > > > >>>>>>> Source > > > >> > > > > > >>>>>>> API does not seem to support the use case of > > verifying > > > >> > > > > checkpoints > > > >> > > > > > >>>>>>> contents in lockstep with produced data, at least > I > > do > > > >> not > > > >> > > see > > > >> > > > an > > > >> > > > > > >>>>>> immediate > > > >> > > > > > >>>>>>> solution for this. Can you think of a different > way > > of > > > >> > > checking > > > >> > > > > the > > > >> > > > > > >>>>>>> correctness of the Iceberg Sink implementation > that > > > does > > > >> > not > > > >> > > > rely > > > >> > > > > > >>> on > > > >> > > > > > >>>>> this > > > >> > > > > > >>>>>>> approach? > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Best, > > > >> > > > > > >>>>>>> Alexander Fedulov > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> [1] > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>> > > > >> > > > > > >>> > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > https://github.com/apache/flink/blob/0f19c2472c54aac97e4067f5398731ab90036d1a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L337 > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> [2] > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>> > > > >> > > > > > >>> > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > https://github.com/apache/flink/blob/e4b000818c15b5b781c4e5262ba83bfc9d65121a/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java#L97 > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> On Tue, Jun 7, 2022 at 6:03 PM Steven Wu < > > > >> > > stevenz...@gmail.com > > > >> > > > > > > > >> > > > > > >>>>> wrote: > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> In Iceberg source, we have a data generator source > > > that > > > >> can > > > >> > > > > control > > > >> > > > > > >>>>> the > > > >> > > > > > >>>>>>> records per checkpoint cycle. Can we support sth > > like > > > >> this > > > >> > in > > > >> > > > the > > > >> > > > > > >>>>>>> DataGeneratorSource? > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>> > > > >> > > > > > >>> > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > https://github.com/apache/iceberg/blob/master/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTestSource.java > > > >> > > > > > >>>>>>> public BoundedTestSource(List<List<T>> > > > >> > elementsPerCheckpoint, > > > >> > > > > > >>> boolean > > > >> > > > > > >>>>>>> checkpointEnabled) > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Thanks, > > > >> > > > > > >>>>>>> Steven > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> On Tue, Jun 7, 2022 at 8:48 AM Alexander Fedulov < > > > >> > > > > > >>>>>> alexan...@ververica.com > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> wrote: > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Hi everyone, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> I would like to open a discussion on FLIP-238: > > > Introduce > > > >> > > > > > >>> FLIP-27-based > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Data > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Generator Source [1]. During the discussion about > > > >> > deprecating > > > >> > > > the > > > >> > > > > > >>>>>>> SourceFunction API [2] it became evident that an > > > >> > easy-to-use > > > >> > > > > > >>>>>>> FLIP-27-compatible data generator source is needed > > so > > > >> that > > > >> > > the > > > >> > > > > > >>> current > > > >> > > > > > >>>>>>> SourceFunction-based data generator > implementations > > > >> could > > > >> > be > > > >> > > > > phased > > > >> > > > > > >>>>> out > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> for > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> both Flink demo/PoC applications and for the > > internal > > > >> Flink > > > >> > > > > tests. > > > >> > > > > > >>>>> This > > > >> > > > > > >>>>>>> FLIP proposes to introduce a generic > > > DataGeneratorSource > > > >> > > > capable > > > >> > > > > of > > > >> > > > > > >>>>>>> producing events of an arbitrary type based on a > > > >> > > user-supplied > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> MapFunction. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Looking forward to your feedback. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> [1] https://cwiki.apache.org/confluence/x/9Av1D > > > >> > > > > > >>>>>>> [2] > > > >> > > > > > >>> > > > >> > https://lists.apache.org/thread/d6cwqw9b3105wcpdkwq7rr4s7x4ywqr9 > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Best, > > > >> > > > > > >>>>>>> Alexander Fedulov > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>> > > > >> > > > > > >>>> > > > >> > > > > > >>> > > > >> > > > > > >> > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > > > > >