Hi Becket, interesting points about the discrepancies in the *RuntimeContext* "wrapping" throughout the framework, but I agree - this is something that needs to be tackled separately. For now, I adjusted the FLIP and the PoC implementation to only expose the parallelism.
Best, Alexander Fedulov On Wed, Jul 6, 2022 at 2:42 AM Becket Qin <becket....@gmail.com> wrote: > Hi Alex, > > Personally I prefer the latter option, i.e. just add the > currentParallelism() method. It is easy to add more stuff to the > SourceReaderContext in the future, and it is likely that most of the stuff > in the RuntimeContext is not required by the SourceReader implementations. > For the purpose of this FLIP, adding the method is probably good enough. > > That said, I don't see a consistent pattern adopted in the project to > handle similar cases. The FunctionContext wraps the RuntimeContext and only > exposes necessary stuff. CEPRuntimeContext extends the RuntimeContext and > overrides some methods that it does not want to expose with exception > throwing logic. Some internal context classes simply expose the entire > RuntimeContext with some additional methods. If we want to make things > clean, I'd imagine all these variations of context can become some specific > combination of a ReadOnlyRuntimeContext and some "write" methods. But this > may require a closer look at all these cases to make sure the > ReadOnlyRuntimeContext is generally suitable. I feel that it will take some > time and could be a bigger discussion than the data generator source > itself. So maybe we can just go with adding a method at the moment. And > evolving the SourceReaderContext to use the ReadOnlyRuntimeContext in the > future. > > Thanks, > > Jiangjie (Becket) Qin > > On Tue, Jul 5, 2022 at 8:31 PM Alexander Fedulov <alexan...@ververica.com> > wrote: > > > Hi Becket, > > > > I agree with you. We could introduce a *ReadOnlyRuntimeContext* that > would > > act as a holder for the *RuntimeContext* data. This would also require > > read-only wrappers for the exposed fields, such as *ExecutionConfig*. > > Alternatively, we just add the *currentParallelism()* method for now and > > see if anything else might actually be needed later on. What do you > think? > > > > Best, > > Alexander Fedulov > > > > On Tue, Jul 5, 2022 at 2:30 AM Becket Qin <becket....@gmail.com> wrote: > > > > > Hi Alex, > > > > > > While it is true that the RuntimeContext gives access to all the stuff > > the > > > framework can provide, it seems a little overkilling for the > > SourceReader. > > > It is probably OK to expose all the read-only information in the > > > RuntimeContext to the SourceReader, but we may want to hide the "write" > > > methods, such as creating states, writing stuff to distributed cache, > > etc, > > > because these methods may not work well with the SourceReader design > and > > > cause confusion. For example, users may wonder why the snapshotState() > > > method exists while they can use the state directly. > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > On Tue, Jul 5, 2022 at 7:37 AM Alexander Fedulov < > > alexan...@ververica.com> > > > wrote: > > > > > > > Hi Becket, > > > > > > > > I updated and extended FLIP-238 accordingly. > > > > > > > > Here is also my POC branch [1]. > > > > DataGeneratorSourceV3 is the class that I currently converged on [2]. > > It > > > is > > > > based on the expanded SourceReaderContext. > > > > A couple more relevant classes [3] [4] > > > > > > > > Would appreciate it if you could take a quick look. > > > > > > > > [1] > > https://github.com/afedulov/flink/tree/FLINK-27919-generator-source > > > > [2] > > > > > > > > > > > > > > https://github.com/afedulov/flink/blob/FLINK-27919-generator-source/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceV3.java > > > > [3] > > > > > > > > > > > > > > https://github.com/afedulov/flink/blob/FLINK-27919-generator-source/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/MappingIteratorSourceReader.java > > > > [4] > > > > > > > > > > > > > > https://github.com/afedulov/flink/blob/FLINK-27919-generator-source/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/RateLimitedSourceReader.java > > > > > > > > Best, > > > > Alexander Fedulov > > > > > > > > On Mon, Jul 4, 2022 at 12:08 PM Alexander Fedulov < > > > alexan...@ververica.com > > > > > > > > > wrote: > > > > > > > > > Hi Becket, > > > > > > > > > > Exposing the RuntimeContext is potentially even more useful. > > > > > Do you think it is worth having both currentParallelism() and > > > > > getRuntimeContext() methods? > > > > > One can always call getNumberOfParallelSubtasks() on the > > RuntimeContext > > > > > directly if we expose it. > > > > > > > > > > Best, > > > > > Alexander Fedulov > > > > > > > > > > > > > > > On Mon, Jul 4, 2022 at 3:44 AM Becket Qin <becket....@gmail.com> > > > wrote: > > > > > > > > > >> Hi Alex, > > > > >> > > > > >> Yes, that is what I had in mind. We need to add the method > > > > >> getRuntimeContext() to the SourceReaderContext interface as well. > > > > >> > > > > >> Thanks, > > > > >> > > > > >> Jiangjie (Becket) Qin > > > > >> > > > > >> On Mon, Jul 4, 2022 at 3:01 AM Alexander Fedulov < > > > > alexan...@ververica.com > > > > >> > > > > > >> wrote: > > > > >> > > > > >> > Hi Becket, > > > > >> > > > > > >> > thanks for your input. I like the idea of adding the parallelism > > to > > > > the > > > > >> > SourceReaderContext. My understanding is that any change of > > > > parallelism > > > > >> > causes recreation of all readers, so it should be safe to > consider > > > it > > > > >> > "fixed" after the readers' initialization. In that case, it > should > > > be > > > > as > > > > >> > simple as adding the following to the anonymous > > SourceReaderContext > > > > >> > implementation > > > > >> > in SourceOperator#initReader(): > > > > >> > > > > > >> > public int currentParallelism() { > > > > >> > return getRuntimeContext().getNumberOfParallelSubtasks(); > > > > >> > } > > > > >> > > > > > >> > Is that what you had in mind? > > > > >> > > > > > >> > Best, > > > > >> > Alexander Fedulov > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > > >> > On Fri, Jul 1, 2022 at 11:30 AM Becket Qin < > becket....@gmail.com> > > > > >> wrote: > > > > >> > > > > > >> > > Hi Alex, > > > > >> > > > > > > >> > > In FLIP-27 source, the SourceReader can get a > > SourceReaderContext. > > > > >> This > > > > >> > is > > > > >> > > passed in by the TM in Source#createReader(). And supposedly > the > > > > >> Source > > > > >> > > should pass this to the SourceReader if needed. > > > > >> > > > > > > >> > > In the SourceReaderContext, currently only the index of the > > > current > > > > >> > subtask > > > > >> > > is available, but we can probably add the current parallelism > as > > > > well. > > > > >> > This > > > > >> > > would be a change that affects all the Sources, not only for > the > > > > data > > > > >> > > generator source. Perhaps we can have a simple separate FLIP. > > > > >> > > > > > > >> > > Regarding the semantic of rate limiting, for the rate limit > > > source, > > > > >> > > personally I feel intuitive to keep the global rate untouched > on > > > > >> scaling. > > > > >> > > > > > > >> > > Thanks, > > > > >> > > > > > > >> > > Jiangjie (Becket) Qin > > > > >> > > > > > > >> > > On Fri, Jul 1, 2022 at 4:00 AM Alexander Fedulov < > > > > >> > alexan...@ververica.com> > > > > >> > > wrote: > > > > >> > > > > > > >> > > > Hi all, > > > > >> > > > > > > > >> > > > getting back to the idea of reusing > FlinkConnectorRateLimiter: > > > it > > > > is > > > > >> > > > designed for the SourceFunction API and has an open() method > > > that > > > > >> > takes a > > > > >> > > > RuntimeContext. Therefore, we need to add a different > > interface > > > > for > > > > >> > > > the new Source > > > > >> > > > API. > > > > >> > > > > > > > >> > > > This is where I see a certain limitation for the > rate-limiting > > > use > > > > >> > case: > > > > >> > > in > > > > >> > > > the old API the individual readers were able to retrieve the > > > > current > > > > >> > > > parallelism from the RuntimeContext. In the new API, this is > > not > > > > >> > > supported, > > > > >> > > > the information about the parallelism is only available in > the > > > > >> > > > SplitEnumeratorContext to which the readers do not have > > access. > > > > >> > > > > > > > >> > > > I see two possibilities: > > > > >> > > > 1. Add an optional RateLimiter parameter to the > > > > DataGeneratorSource > > > > >> > > > constructor. The RateLimiter is then "fixed" and has to be > > fully > > > > >> > > configured > > > > >> > > > by the user in the main method. > > > > >> > > > 2. Piggy-back on Splits: add parallelism as a field of a > > Split. > > > > The > > > > >> > > > initialization of this field would happen dynamically upon > > > splits > > > > >> > > creation > > > > >> > > > in the createEnumerator() method where currentParallelism is > > > > >> available. > > > > >> > > > > > > > >> > > > The second approach makes implementation rather > significantly > > > more > > > > >> > > > complex since we cannot simply wrap > > > > >> > NumberSequenceSource.SplitSerializer > > > > >> > > in > > > > >> > > > that case. The advantage of this approach is that with any > > kind > > > of > > > > >> > > > autoscaling, the source rate will match the original > > > > configuration. > > > > >> But > > > > >> > > I'm > > > > >> > > > not sure how useful this is. I can even imagine scenarios > > where > > > > >> scaling > > > > >> > > the > > > > >> > > > input rate together with parallelism would be better for > demo > > > > >> purposes. > > > > >> > > > > > > > >> > > > Would be glad to hear your thoughts on this. > > > > >> > > > > > > > >> > > > Best, > > > > >> > > > Alexander Fedulov > > > > >> > > > > > > > >> > > > On Mon, Jun 20, 2022 at 4:31 PM David Anderson < > > > > >> dander...@apache.org> > > > > >> > > > wrote: > > > > >> > > > > > > > >> > > > > I'm very happy with this. +1 > > > > >> > > > > > > > > >> > > > > A lot of SourceFunction implementations used in demos/POC > > > > >> > > implementations > > > > >> > > > > include a call to sleep(), so adding rate limiting is a > good > > > > >> idea, in > > > > >> > > my > > > > >> > > > > opinion. > > > > >> > > > > > > > > >> > > > > Best, > > > > >> > > > > David > > > > >> > > > > > > > > >> > > > > On Mon, Jun 20, 2022 at 10:10 AM Qingsheng Ren < > > > > >> renqs...@gmail.com> > > > > >> > > > wrote: > > > > >> > > > > > > > > >> > > > > > Hi Alexander, > > > > >> > > > > > > > > > >> > > > > > Thanks for creating this FLIP! I’d like to share some > > > > thoughts. > > > > >> > > > > > > > > > >> > > > > > 1. About the “generatorFunction” I’m expecting an > > > initializer > > > > >> on it > > > > >> > > > > > because it’s hard to require all fields in the generator > > > > >> function > > > > >> > are > > > > >> > > > > > serializable in user’s implementation. Providing a > > function > > > > like > > > > >> > > “open” > > > > >> > > > > in > > > > >> > > > > > the interface could let the function to make some > > > > >> initializations > > > > >> > in > > > > >> > > > the > > > > >> > > > > > task initializing stage. > > > > >> > > > > > > > > > >> > > > > > 2. As of the throttling functinality you mentioned, > > there’s > > > a > > > > >> > > > > > FlinkConnectorRateLimiter under flink-core and maybe we > > > could > > > > >> reuse > > > > >> > > > this > > > > >> > > > > > interface. Actually I prefer to make rate limiting as a > > > common > > > > >> > > feature > > > > >> > > > > > provided in the Source API, but this requires another > FLIP > > > > and a > > > > >> > lot > > > > >> > > of > > > > >> > > > > > discussions so I’m OK to have it in the DataGen source > > > first. > > > > >> > > > > > > > > > >> > > > > > Best regards, > > > > >> > > > > > Qingsheng > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > On Jun 17, 2022, at 01:47, Alexander Fedulov < > > > > >> > > > alexan...@ververica.com> > > > > >> > > > > > wrote: > > > > >> > > > > > > > > > > >> > > > > > > Hi Jing, > > > > >> > > > > > > > > > > >> > > > > > > thanks for your thorough analysis. I agree with the > > points > > > > you > > > > >> > make > > > > >> > > > and > > > > >> > > > > > > also with the idea to approach the larger task of > > > providing > > > > a > > > > >> > > > universal > > > > >> > > > > > > (DataStream + SQL) data generator base iteratively. > > > > >> > > > > > > Regarding the name, the SourceFunction-based > > > > >> > *DataGeneratorSource* > > > > >> > > > > > resides > > > > >> > > > > > > in the > > > > >> > *org.apache.flink.streaming.api.functions.source.datagen*. I > > > > >> > > > > think > > > > >> > > > > > > it is OK to simply place the new one (with the same > > name) > > > > >> next to > > > > >> > > the > > > > >> > > > > > > *NumberSequenceSource* into > > > > >> > > > > *org.apache.flink.api.connector.source.lib*. > > > > >> > > > > > > > > > > >> > > > > > > One more thing I wanted to discuss: I noticed that > > > > >> > > > *DataGenTableSource > > > > >> > > > > > *has > > > > >> > > > > > > built-in throttling functionality (*rowsPerSecond*). I > > > > >> believe it > > > > >> > > is > > > > >> > > > > > > something that could be also useful for the DataStream > > > users > > > > >> of > > > > >> > the > > > > >> > > > > > > stateless data generator and since we want to > eventually > > > > >> converge > > > > >> > > on > > > > >> > > > > the > > > > >> > > > > > > same implementation for DataStream and Table/SQL it > > sounds > > > > >> like a > > > > >> > > > good > > > > >> > > > > > idea > > > > >> > > > > > > to add it to the FLIP. What do you think? > > > > >> > > > > > > > > > > >> > > > > > > Best, > > > > >> > > > > > > Alexander Fedulov > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > On Tue, Jun 14, 2022 at 7:17 PM Jing Ge < > > > j...@ververica.com > > > > > > > > > >> > > wrote: > > > > >> > > > > > > > > > > >> > > > > > >> Hi, > > > > >> > > > > > >> > > > > >> > > > > > >> After reading all discussions posted in this thread > and > > > the > > > > >> > source > > > > >> > > > > code > > > > >> > > > > > of > > > > >> > > > > > >> DataGeneratorSource which unfortunately used "Source" > > > > >> instead of > > > > >> > > > > > >> "SourceFunction" in its name, issues could summarized > > as > > > > >> > > following: > > > > >> > > > > > >> > > > > >> > > > > > >> 1. The current DataGeneratorSource based on > > > SourceFunction > > > > >> is a > > > > >> > > > > stateful > > > > >> > > > > > >> source connector and built for Table/SQL. > > > > >> > > > > > >> 2. The right name for the new data generator source > > i.e. > > > > >> > > > > > >> DataGeneratorSource has been used for the current > > > > >> implementation > > > > >> > > > based > > > > >> > > > > > on > > > > >> > > > > > >> SourceFunction. > > > > >> > > > > > >> 3. A new data generator source should be developed > > based > > > on > > > > >> the > > > > >> > > new > > > > >> > > > > > Source > > > > >> > > > > > >> API. > > > > >> > > > > > >> 4. The new data generator source should be used both > > for > > > > >> > > DataStream > > > > >> > > > > and > > > > >> > > > > > >> Table/SQL, which means the current > DataGeneratorSource > > > > >> should be > > > > >> > > > > > replaced > > > > >> > > > > > >> with the new one. > > > > >> > > > > > >> 5. The core event generation logic should be > pluggable > > to > > > > >> > support > > > > >> > > > > > various > > > > >> > > > > > >> (test) scenarios, e.g. rondom stream, changlog > stream, > > > > >> > > controllable > > > > >> > > > > > events > > > > >> > > > > > >> per checkpoint, etc. > > > > >> > > > > > >> > > > > >> > > > > > >> which turns out that > > > > >> > > > > > >> > > > > >> > > > > > >> To solve 1+3+4 -> we will have to make a big effort > to > > > > >> replace > > > > >> > the > > > > >> > > > > > current > > > > >> > > > > > >> DataGeneratorSource since the new Source API has a > very > > > > >> > different > > > > >> > > > > > >> concept, especially for the stateful part. > > > > >> > > > > > >> To solve 2+3 -> we have to find another name for the > > new > > > > >> > > > > implementation. > > > > >> > > > > > >> To solve 1+3+4+5 -> It gets even more complicated to > > > > support > > > > >> > > > stateless > > > > >> > > > > > and > > > > >> > > > > > >> stateful scenarios simultaneously with one solution. > > > > >> > > > > > >> > > > > >> > > > > > >> If we want to solve all of these issues in one shot, > It > > > > might > > > > >> > take > > > > >> > > > > > months. > > > > >> > > > > > >> Therefore, I would suggest starting from small and > > > growing > > > > up > > > > >> > > > > > iteratively. > > > > >> > > > > > >> > > > > >> > > > > > >> The proposal for the kickoff is to focus on stateless > > > event > > > > >> > > > generation > > > > >> > > > > > >> with e.g. rondom stream and use the name > > > > >> > > > > "StatelessDataGeneratoSource". > > > > >> > > > > > >> The will be a period of time that both > > > DataGeneratorSource > > > > >> will > > > > >> > be > > > > >> > > > > used > > > > >> > > > > > by > > > > >> > > > > > >> the developer. The current DataGeneratorSource will > be > > > then > > > > >> > > > > deprecated, > > > > >> > > > > > >> once we can(iteratively): > > > > >> > > > > > >> 1. either enlarge the scope of > > > StatelessDataGeneratoSourcer > > > > >> to > > > > >> > be > > > > >> > > > able > > > > >> > > > > > to > > > > >> > > > > > >> cover stateful scenarios and renaming it to > > > > >> > > > > > "DataGeneratorSourceV2"(follow > > > > >> > > > > > >> the naming convention of SinkV2) or > > > > >> > > > > > >> 2. develop a new "SatefullDataGeneratorSource" based > on > > > > >> Source > > > > >> > API > > > > >> > > > > which > > > > >> > > > > > >> can handle the stateful scenarios, if it is > impossible > > to > > > > >> > support > > > > >> > > > both > > > > >> > > > > > >> stateless and stateful scenarios with one > > GeneratorSource > > > > >> > > > > > implementation. > > > > >> > > > > > >> > > > > >> > > > > > >> Best regards, > > > > >> > > > > > >> Jing > > > > >> > > > > > >> > > > > >> > > > > > >> On Thu, Jun 9, 2022 at 2:48 PM Martijn Visser < > > > > >> > > > > martijnvis...@apache.org > > > > >> > > > > > > > > > > >> > > > > > >> wrote: > > > > >> > > > > > >> > > > > >> > > > > > >>> Hey Alex, > > > > >> > > > > > >>> > > > > >> > > > > > >>> Yes, I think we need to make sure that we're not > > causing > > > > >> > > confusion > > > > >> > > > (I > > > > >> > > > > > know > > > > >> > > > > > >>> I already was confused). I think the > > DataSupplierSource > > > is > > > > >> > > already > > > > >> > > > > > better, > > > > >> > > > > > >>> but perhaps there are others who have an even better > > > idea. > > > > >> > > > > > >>> > > > > >> > > > > > >>> Thanks, > > > > >> > > > > > >>> > > > > >> > > > > > >>> Martijn > > > > >> > > > > > >>> > > > > >> > > > > > >>> Op do 9 jun. 2022 om 14:28 schreef Alexander > Fedulov < > > > > >> > > > > > >>> alexan...@ververica.com>: > > > > >> > > > > > >>> > > > > >> > > > > > >>>> Hi Martijn, > > > > >> > > > > > >>>> > > > > >> > > > > > >>>> It seems that they serve a bit different purposes > > > though. > > > > >> The > > > > >> > > > > > >>>> DataGenTableSource is for generating random data > > > > described > > > > >> by > > > > >> > > the > > > > >> > > > > > Table > > > > >> > > > > > >>>> DDL and is tied into the > > RowDataGenerator/DataGenerator > > > > >> > concept > > > > >> > > > > which > > > > >> > > > > > is > > > > >> > > > > > >>>> implemented as an Iterator<T>. The proposed API in > > > > >> contrast > > > > >> > is > > > > >> > > > > > supposed > > > > >> > > > > > >>>> to provide users with an easy way to supply their > > > custom > > > > >> data. > > > > >> > > > > Another > > > > >> > > > > > >>>> difference is that a DataGenerator is supposed to > be > > > > >> stateful > > > > >> > > and > > > > >> > > > > has > > > > >> > > > > > to > > > > >> > > > > > >>>> snapshot its state, whereas the proposed API is > > purely > > > > >> driven > > > > >> > by > > > > >> > > > the > > > > >> > > > > > >>> input > > > > >> > > > > > >>>> index IDs and can be stateless yet remain > > > deterministic. > > > > >> Are > > > > >> > you > > > > >> > > > > sure > > > > >> > > > > > it > > > > >> > > > > > >>>> is a good idea to mix them into the same API? We > > could > > > > >> think > > > > >> > of > > > > >> > > > > using > > > > >> > > > > > a > > > > >> > > > > > >>>> different name to make it less confusing for the > > users > > > > >> > > (something > > > > >> > > > > like > > > > >> > > > > > >>>> DataSupplierSource). > > > > >> > > > > > >>>> > > > > >> > > > > > >>>> Best, > > > > >> > > > > > >>>> Alexander Fedulov > > > > >> > > > > > >>>> > > > > >> > > > > > >>>> On Thu, Jun 9, 2022 at 9:14 AM Martijn Visser < > > > > >> > > > > > martijnvis...@apache.org > > > > >> > > > > > >>>> > > > > >> > > > > > >>>> wrote: > > > > >> > > > > > >>>> > > > > >> > > > > > >>>>> Hi Alex, > > > > >> > > > > > >>>>> > > > > >> > > > > > >>>>> Thanks for creating the FLIP and opening up the > > > > >> discussion. > > > > >> > +1 > > > > >> > > > > > overall > > > > >> > > > > > >>> for > > > > >> > > > > > >>>>> getting this in place. > > > > >> > > > > > >>>>> > > > > >> > > > > > >>>>> One question: you've already mentioned that this > > > > focussed > > > > >> on > > > > >> > > the > > > > >> > > > > > >>>>> DataStream > > > > >> > > > > > >>>>> API. I think it would be a bit confusing that we > > have > > > a > > > > >> > Datagen > > > > >> > > > > > >>> connector > > > > >> > > > > > >>>>> (on the Table side) that wouldn't leverage this > > target > > > > >> > > > interface. I > > > > >> > > > > > >>> think > > > > >> > > > > > >>>>> it would be good if we could already have one > > generic > > > > >> Datagen > > > > >> > > > > > connector > > > > >> > > > > > >>>>> which works for both DataStream API (so that would > > be > > > a > > > > >> new > > > > >> > one > > > > >> > > > in > > > > >> > > > > > the > > > > >> > > > > > >>>>> Flink repo) and that the Datagen in the Table > > > landscape > > > > is > > > > >> > > using > > > > >> > > > > this > > > > >> > > > > > >>>>> target interface too. What do you think? > > > > >> > > > > > >>>>> > > > > >> > > > > > >>>>> Best regards, > > > > >> > > > > > >>>>> > > > > >> > > > > > >>>>> Martijn > > > > >> > > > > > >>>>> > > > > >> > > > > > >>>>> Op wo 8 jun. 2022 om 14:21 schreef Alexander > > Fedulov < > > > > >> > > > > > >>>>> alexan...@ververica.com>: > > > > >> > > > > > >>>>> > > > > >> > > > > > >>>>>> Hi Xianxun, > > > > >> > > > > > >>>>>> > > > > >> > > > > > >>>>>> Thanks for bringing it up. I do believe it would > be > > > > >> useful > > > > >> > to > > > > >> > > > have > > > > >> > > > > > >>> such > > > > >> > > > > > >>>>> a > > > > >> > > > > > >>>>>> CDC data generator but I see the > > > > >> > > > > > >>>>>> efforts to provide one a bit orthogonal to the > > > > >> > > > DataSourceGenerator > > > > >> > > > > > >>>>> proposed > > > > >> > > > > > >>>>>> in the FLIP. FLIP-238 focuses > > > > >> > > > > > >>>>>> on the DataStream API and I could see integration > > > into > > > > >> the > > > > >> > > > > Table/SQL > > > > >> > > > > > >>>>>> ecosystem as the next step that I would > > > > >> > > > > > >>>>>> prefer to keep separate (see KafkaDynamicSource > > > reusing > > > > >> > > > > > >>>>>> KafkaSource<RowData> > > > > >> > > > > > >>>>>> under the hood [1]). > > > > >> > > > > > >>>>>> > > > > >> > > > > > >>>>>> [1] > > > > >> > > > > > >>>>>> > > > > >> > > > > > >>>>>> > > > > >> > > > > > >>>>> > > > > >> > > > > > >>> > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > https://github.com/apache/flink/blob/3adca15859b36c61116fc56fabc24e9647f0ec5f/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java#L223 > > > > >> > > > > > >>>>>> > > > > >> > > > > > >>>>>> Best, > > > > >> > > > > > >>>>>> Alexander Fedulov > > > > >> > > > > > >>>>>> > > > > >> > > > > > >>>>>> > > > > >> > > > > > >>>>>> > > > > >> > > > > > >>>>>> > > > > >> > > > > > >>>>>> On Wed, Jun 8, 2022 at 11:01 AM Xianxun Ye < > > > > >> > yxx_c...@163.com> > > > > >> > > > > > wrote: > > > > >> > > > > > >>>>>> > > > > >> > > > > > >>>>>>> Hey Alexander, > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> Making datagen source connector easier to use is > > > > really > > > > >> > > helpful > > > > >> > > > > > >>> during > > > > >> > > > > > >>>>>>> doing some PoC/Demo. > > > > >> > > > > > >>>>>>> And I thought about is it possible to produce a > > > > >> changelog > > > > >> > > > stream > > > > >> > > > > by > > > > >> > > > > > >>>>>>> datagen source, so a new flink developer can > > > practice > > > > >> flink > > > > >> > > sql > > > > >> > > > > > >>> with > > > > >> > > > > > >>>>> cdc > > > > >> > > > > > >>>>>>> data using Flink SQL Client CLI. > > > > >> > > > > > >>>>>>> In the flink-examples-table module, a > > > > >> > ChangelogSocketExample > > > > >> > > > > > >>> class[1] > > > > >> > > > > > >>>>>>> describes how to ingest delete or insert data by > > > 'nc' > > > > >> > > command. > > > > >> > > > > Can > > > > >> > > > > > >>> we > > > > >> > > > > > >>>>>>> support producing a changelog stream by the new > > > > datagen > > > > >> > > source? > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> [1] > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>> > > > > >> > > > > > >>>>> > > > > >> > > > > > >>> > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogSocketExample.java#L79 > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> Best regards, > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> Xianxun > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> On 06/8/2022 08:10,Alexander Fedulov< > > > > >> > alexan...@ververica.com > > > > >> > > > > > > > >> > > > > > >>>>>>> <alexan...@ververica.com> wrote: > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> I looked a bit further and it seems it should > > > actually > > > > >> be > > > > >> > > > easier > > > > >> > > > > > >>> than > > > > >> > > > > > >>>>> I > > > > >> > > > > > >>>>>>> initially thought: SourceReader extends > > > > >> CheckpointListener > > > > >> > > > > > >>> interface > > > > >> > > > > > >>>>> and > > > > >> > > > > > >>>>>>> with its custom implementation it should be > > possible > > > > to > > > > >> > > achieve > > > > >> > > > > > >>>>> similar > > > > >> > > > > > >>>>>>> results. A prototype that I have for the > generator > > > > uses > > > > >> an > > > > >> > > > > > >>>>>>> IteratorSourceReader > > > > >> > > > > > >>>>>>> under the hood by default but we could consider > > > adding > > > > >> the > > > > >> > > > > ability > > > > >> > > > > > >>> to > > > > >> > > > > > >>>>>>> supply something like a > > > > DataGeneratorSourceReaderFactory > > > > >> > that > > > > >> > > > > would > > > > >> > > > > > >>>>> allow > > > > >> > > > > > >>>>>>> provisioning the DataGeneratorSource with > > customized > > > > >> > > > > > >>> implementations > > > > >> > > > > > >>>>> for > > > > >> > > > > > >>>>>>> cases like this. > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> Best, > > > > >> > > > > > >>>>>>> Alexander Fedulov > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> On Wed, Jun 8, 2022 at 12:58 AM Alexander > Fedulov > > < > > > > >> > > > > > >>>>>> alexan...@ververica.com > > > > >> > > > > > >>>>>>>> > > > > >> > > > > > >>>>>>> wrote: > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> Hi Steven, > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> This is going to be tricky since in the new > Source > > > API > > > > >> the > > > > >> > > > > > >>>>> checkpointing > > > > >> > > > > > >>>>>>> aspects that you based your logic on are pushed > > > > further > > > > >> > away > > > > >> > > > from > > > > >> > > > > > >>> the > > > > >> > > > > > >>>>>>> low-level interfaces responsible for handling > data > > > and > > > > >> > splits > > > > >> > > > > [1]. > > > > >> > > > > > >>> At > > > > >> > > > > > >>>>> the > > > > >> > > > > > >>>>>>> same time, the SourceCoordinatorProvider is > > > hardwired > > > > >> into > > > > >> > > the > > > > >> > > > > > >>>>> internals > > > > >> > > > > > >>>>>>> of the framework, so I don't think it will be > > > possible > > > > >> to > > > > >> > > > > provide a > > > > >> > > > > > >>>>>>> customized implementation for testing purposes. > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> The only chance to tie data generation to > > > > checkpointing > > > > >> in > > > > >> > > the > > > > >> > > > > new > > > > >> > > > > > >>>>> Source > > > > >> > > > > > >>>>>>> API that I see at the moment is via the > > > > SplitEnumerator > > > > >> > > > > serializer > > > > >> > > > > > >>> ( > > > > >> > > > > > >>>>>>> getEnumeratorCheckpointSerializer() method) [2]. > > In > > > > >> theory, > > > > >> > > it > > > > >> > > > > > >>> should > > > > >> > > > > > >>>>> be > > > > >> > > > > > >>>>>>> possible to share a variable visible both to the > > > > >> generator > > > > >> > > > > function > > > > >> > > > > > >>>>> and > > > > >> > > > > > >>>>>> to > > > > >> > > > > > >>>>>>> the serializer and manipulate it whenever the > > > > >> serialize() > > > > >> > > > method > > > > >> > > > > > >>> gets > > > > >> > > > > > >>>>>>> called upon a checkpoint request. That said, you > > > still > > > > >> > won't > > > > >> > > > get > > > > >> > > > > > >>>>>>> notifications of successful checkpoints that you > > > > >> currently > > > > >> > > use > > > > >> > > > > > >>> (this > > > > >> > > > > > >>>>> info > > > > >> > > > > > >>>>>>> is only available to the SourceCoordinator). > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> In general, regardless of the generator > > > implementation > > > > >> > > itself, > > > > >> > > > > the > > > > >> > > > > > >>> new > > > > >> > > > > > >>>>>>> Source > > > > >> > > > > > >>>>>>> API does not seem to support the use case of > > > verifying > > > > >> > > > > checkpoints > > > > >> > > > > > >>>>>>> contents in lockstep with produced data, at > least > > I > > > do > > > > >> not > > > > >> > > see > > > > >> > > > an > > > > >> > > > > > >>>>>> immediate > > > > >> > > > > > >>>>>>> solution for this. Can you think of a different > > way > > > of > > > > >> > > checking > > > > >> > > > > the > > > > >> > > > > > >>>>>>> correctness of the Iceberg Sink implementation > > that > > > > does > > > > >> > not > > > > >> > > > rely > > > > >> > > > > > >>> on > > > > >> > > > > > >>>>> this > > > > >> > > > > > >>>>>>> approach? > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> Best, > > > > >> > > > > > >>>>>>> Alexander Fedulov > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> [1] > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>> > > > > >> > > > > > >>>>> > > > > >> > > > > > >>> > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > https://github.com/apache/flink/blob/0f19c2472c54aac97e4067f5398731ab90036d1a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L337 > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> [2] > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>> > > > > >> > > > > > >>>>> > > > > >> > > > > > >>> > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > https://github.com/apache/flink/blob/e4b000818c15b5b781c4e5262ba83bfc9d65121a/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java#L97 > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> On Tue, Jun 7, 2022 at 6:03 PM Steven Wu < > > > > >> > > stevenz...@gmail.com > > > > >> > > > > > > > > >> > > > > > >>>>> wrote: > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> In Iceberg source, we have a data generator > source > > > > that > > > > >> can > > > > >> > > > > control > > > > >> > > > > > >>>>> the > > > > >> > > > > > >>>>>>> records per checkpoint cycle. Can we support sth > > > like > > > > >> this > > > > >> > in > > > > >> > > > the > > > > >> > > > > > >>>>>>> DataGeneratorSource? > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>> > > > > >> > > > > > >>>>> > > > > >> > > > > > >>> > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > https://github.com/apache/iceberg/blob/master/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTestSource.java > > > > >> > > > > > >>>>>>> public BoundedTestSource(List<List<T>> > > > > >> > elementsPerCheckpoint, > > > > >> > > > > > >>> boolean > > > > >> > > > > > >>>>>>> checkpointEnabled) > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> Thanks, > > > > >> > > > > > >>>>>>> Steven > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> On Tue, Jun 7, 2022 at 8:48 AM Alexander > Fedulov < > > > > >> > > > > > >>>>>> alexan...@ververica.com > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> wrote: > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> Hi everyone, > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> I would like to open a discussion on FLIP-238: > > > > Introduce > > > > >> > > > > > >>> FLIP-27-based > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> Data > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> Generator Source [1]. During the discussion > about > > > > >> > deprecating > > > > >> > > > the > > > > >> > > > > > >>>>>>> SourceFunction API [2] it became evident that an > > > > >> > easy-to-use > > > > >> > > > > > >>>>>>> FLIP-27-compatible data generator source is > needed > > > so > > > > >> that > > > > >> > > the > > > > >> > > > > > >>> current > > > > >> > > > > > >>>>>>> SourceFunction-based data generator > > implementations > > > > >> could > > > > >> > be > > > > >> > > > > phased > > > > >> > > > > > >>>>> out > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> for > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> both Flink demo/PoC applications and for the > > > internal > > > > >> Flink > > > > >> > > > > tests. > > > > >> > > > > > >>>>> This > > > > >> > > > > > >>>>>>> FLIP proposes to introduce a generic > > > > DataGeneratorSource > > > > >> > > > capable > > > > >> > > > > of > > > > >> > > > > > >>>>>>> producing events of an arbitrary type based on a > > > > >> > > user-supplied > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> MapFunction. > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> Looking forward to your feedback. > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> [1] https://cwiki.apache.org/confluence/x/9Av1D > > > > >> > > > > > >>>>>>> [2] > > > > >> > > > > > >>> > > > > >> > > https://lists.apache.org/thread/d6cwqw9b3105wcpdkwq7rr4s7x4ywqr9 > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> Best, > > > > >> > > > > > >>>>>>> Alexander Fedulov > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>>> > > > > >> > > > > > >>>>>> > > > > >> > > > > > >>>>> > > > > >> > > > > > >>>> > > > > >> > > > > > >>> > > > > >> > > > > > >> > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > > > > > >