Hi Becket,

I updated and extended FLIP-238 accordingly.

Here is also my POC branch [1].
DataGeneratorSourceV3 is the class that I currently converged on [2]. It is
based on the expanded SourceReaderContext.
A couple more relevant classes [3] [4]

Would appreciate it if you could take a quick look.

[1]  https://github.com/afedulov/flink/tree/FLINK-27919-generator-source
[2]
https://github.com/afedulov/flink/blob/FLINK-27919-generator-source/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/DataGeneratorSourceV3.java
[3]
https://github.com/afedulov/flink/blob/FLINK-27919-generator-source/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/MappingIteratorSourceReader.java
[4]
https://github.com/afedulov/flink/blob/FLINK-27919-generator-source/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/RateLimitedSourceReader.java

Best,
Alexander Fedulov

On Mon, Jul 4, 2022 at 12:08 PM Alexander Fedulov <alexan...@ververica.com>
wrote:

> Hi Becket,
>
> Exposing the RuntimeContext is potentially even more useful.
> Do you think it is worth having both currentParallelism() and
>  getRuntimeContext() methods?
> One can always call getNumberOfParallelSubtasks() on the RuntimeContext
> directly if we expose it.
>
> Best,
> Alexander Fedulov
>
>
> On Mon, Jul 4, 2022 at 3:44 AM Becket Qin <becket....@gmail.com> wrote:
>
>> Hi Alex,
>>
>> Yes, that is what I had in mind. We need to add the method
>> getRuntimeContext() to the SourceReaderContext interface as well.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Mon, Jul 4, 2022 at 3:01 AM Alexander Fedulov <alexan...@ververica.com
>> >
>> wrote:
>>
>> > Hi Becket,
>> >
>> > thanks for your input. I like the idea of adding the parallelism to the
>> > SourceReaderContext. My understanding is that any change of parallelism
>> > causes recreation of all readers, so it should be safe to consider it
>> > "fixed" after the readers' initialization. In that case, it should be as
>> > simple as adding the following to the anonymous SourceReaderContext
>> > implementation
>> > in SourceOperator#initReader():
>> >
>> > public int currentParallelism() {
>> >    return getRuntimeContext().getNumberOfParallelSubtasks();
>> > }
>> >
>> > Is that what you had in mind?
>> >
>> > Best,
>> > Alexander Fedulov
>> >
>> >
>> >
>> >
>> > On Fri, Jul 1, 2022 at 11:30 AM Becket Qin <becket....@gmail.com>
>> wrote:
>> >
>> > > Hi Alex,
>> > >
>> > > In FLIP-27 source, the SourceReader can get a SourceReaderContext.
>> This
>> > is
>> > > passed in by the TM in Source#createReader(). And supposedly the
>> Source
>> > > should pass this to the SourceReader if needed.
>> > >
>> > > In the SourceReaderContext, currently only the index of the current
>> > subtask
>> > > is available, but we can probably add the current parallelism as well.
>> > This
>> > > would be a change that affects all the Sources, not only for the data
>> > > generator source. Perhaps we can have a simple separate FLIP.
>> > >
>> > > Regarding the semantic of rate limiting, for the rate limit source,
>> > > personally I feel intuitive to keep the global rate untouched on
>> scaling.
>> > >
>> > > Thanks,
>> > >
>> > > Jiangjie (Becket) Qin
>> > >
>> > > On Fri, Jul 1, 2022 at 4:00 AM Alexander Fedulov <
>> > alexan...@ververica.com>
>> > > wrote:
>> > >
>> > > > Hi all,
>> > > >
>> > > > getting back to the idea of reusing FlinkConnectorRateLimiter: it is
>> > > > designed for the SourceFunction API and has an open() method that
>> > takes a
>> > > > RuntimeContext. Therefore, we need to add a different interface for
>> > > > the new Source
>> > > > API.
>> > > >
>> > > > This is where I see a certain limitation for the rate-limiting use
>> > case:
>> > > in
>> > > > the old API the individual readers were able to retrieve the current
>> > > > parallelism from the RuntimeContext. In the new API, this is not
>> > > supported,
>> > > > the information about the parallelism is only available in the
>> > > > SplitEnumeratorContext to which the readers do not have access.
>> > > >
>> > > > I see two possibilities:
>> > > > 1. Add an optional RateLimiter parameter to the DataGeneratorSource
>> > > > constructor. The RateLimiter is then "fixed" and has to be fully
>> > > configured
>> > > > by the user in the main method.
>> > > > 2. Piggy-back on Splits: add parallelism as a field of a Split. The
>> > > > initialization of this field would happen dynamically upon splits
>> > > creation
>> > > > in the createEnumerator() method where currentParallelism is
>> available.
>> > > >
>> > > > The second approach makes implementation rather significantly more
>> > > > complex since we cannot simply wrap
>> > NumberSequenceSource.SplitSerializer
>> > > in
>> > > > that case. The advantage of this approach is that with any kind of
>> > > > autoscaling, the source rate will match the original configuration.
>> But
>> > > I'm
>> > > > not sure how useful this is. I can even imagine scenarios where
>> scaling
>> > > the
>> > > > input rate together with parallelism would be better for demo
>> purposes.
>> > > >
>> > > > Would be glad to hear your thoughts on this.
>> > > >
>> > > > Best,
>> > > > Alexander Fedulov
>> > > >
>> > > > On Mon, Jun 20, 2022 at 4:31 PM David Anderson <
>> dander...@apache.org>
>> > > > wrote:
>> > > >
>> > > > > I'm very happy with this. +1
>> > > > >
>> > > > > A lot of SourceFunction implementations used in demos/POC
>> > > implementations
>> > > > > include a call to sleep(), so adding rate limiting is a good
>> idea, in
>> > > my
>> > > > > opinion.
>> > > > >
>> > > > > Best,
>> > > > > David
>> > > > >
>> > > > > On Mon, Jun 20, 2022 at 10:10 AM Qingsheng Ren <
>> renqs...@gmail.com>
>> > > > wrote:
>> > > > >
>> > > > > > Hi Alexander,
>> > > > > >
>> > > > > > Thanks for creating this FLIP! I’d like to share some thoughts.
>> > > > > >
>> > > > > > 1. About the “generatorFunction” I’m expecting an initializer
>> on it
>> > > > > > because it’s hard to require all fields in the generator
>> function
>> > are
>> > > > > > serializable in user’s implementation. Providing a function like
>> > > “open”
>> > > > > in
>> > > > > > the interface could let the function to make some
>> initializations
>> > in
>> > > > the
>> > > > > > task initializing stage.
>> > > > > >
>> > > > > > 2. As of the throttling functinality you mentioned, there’s a
>> > > > > > FlinkConnectorRateLimiter under flink-core and maybe we could
>> reuse
>> > > > this
>> > > > > > interface. Actually I prefer to make rate limiting as a common
>> > > feature
>> > > > > > provided in the Source API, but this requires another FLIP and a
>> > lot
>> > > of
>> > > > > > discussions so I’m OK to have it in the DataGen source first.
>> > > > > >
>> > > > > > Best regards,
>> > > > > > Qingsheng
>> > > > > >
>> > > > > >
>> > > > > > > On Jun 17, 2022, at 01:47, Alexander Fedulov <
>> > > > alexan...@ververica.com>
>> > > > > > wrote:
>> > > > > > >
>> > > > > > > Hi Jing,
>> > > > > > >
>> > > > > > > thanks for your thorough analysis. I agree with the points you
>> > make
>> > > > and
>> > > > > > > also with the idea to approach the larger task of providing a
>> > > > universal
>> > > > > > > (DataStream + SQL) data generator base iteratively.
>> > > > > > > Regarding the name, the SourceFunction-based
>> > *DataGeneratorSource*
>> > > > > > resides
>> > > > > > > in the
>> > *org.apache.flink.streaming.api.functions.source.datagen*. I
>> > > > > think
>> > > > > > > it is OK to simply place the new one (with the same name)
>> next to
>> > > the
>> > > > > > > *NumberSequenceSource* into
>> > > > > *org.apache.flink.api.connector.source.lib*.
>> > > > > > >
>> > > > > > > One more thing I wanted to discuss:  I noticed that
>> > > > *DataGenTableSource
>> > > > > > *has
>> > > > > > > built-in throttling functionality (*rowsPerSecond*). I
>> believe it
>> > > is
>> > > > > > > something that could be also useful for the DataStream users
>> of
>> > the
>> > > > > > > stateless data generator and since we want to eventually
>> converge
>> > > on
>> > > > > the
>> > > > > > > same implementation for DataStream and Table/SQL it sounds
>> like a
>> > > > good
>> > > > > > idea
>> > > > > > > to add it to the FLIP. What do you think?
>> > > > > > >
>> > > > > > > Best,
>> > > > > > > Alexander Fedulov
>> > > > > > >
>> > > > > > >
>> > > > > > > On Tue, Jun 14, 2022 at 7:17 PM Jing Ge <j...@ververica.com>
>> > > wrote:
>> > > > > > >
>> > > > > > >> Hi,
>> > > > > > >>
>> > > > > > >> After reading all discussions posted in this thread and the
>> > source
>> > > > > code
>> > > > > > of
>> > > > > > >> DataGeneratorSource which unfortunately used "Source"
>> instead of
>> > > > > > >> "SourceFunction" in its name, issues could summarized as
>> > > following:
>> > > > > > >>
>> > > > > > >> 1. The current DataGeneratorSource based on SourceFunction
>> is a
>> > > > > stateful
>> > > > > > >> source connector and built for Table/SQL.
>> > > > > > >> 2. The right name for the new data generator source i.e.
>> > > > > > >> DataGeneratorSource has been used for the current
>> implementation
>> > > > based
>> > > > > > on
>> > > > > > >> SourceFunction.
>> > > > > > >> 3. A new data generator source should be developed based on
>> the
>> > > new
>> > > > > > Source
>> > > > > > >> API.
>> > > > > > >> 4. The new data generator source should be used both for
>> > > DataStream
>> > > > > and
>> > > > > > >> Table/SQL, which means the current DataGeneratorSource
>> should be
>> > > > > > replaced
>> > > > > > >> with the new one.
>> > > > > > >> 5. The core event generation logic should be pluggable to
>> > support
>> > > > > > various
>> > > > > > >> (test) scenarios, e.g. rondom stream, changlog stream,
>> > > controllable
>> > > > > > events
>> > > > > > >> per checkpoint, etc.
>> > > > > > >>
>> > > > > > >> which turns out that
>> > > > > > >>
>> > > > > > >> To solve 1+3+4 -> we will have to make a big effort to
>> replace
>> > the
>> > > > > > current
>> > > > > > >> DataGeneratorSource since the new Source API has a very
>> > different
>> > > > > > >> concept, especially for the stateful part.
>> > > > > > >> To solve 2+3 -> we have to find another name for the new
>> > > > > implementation.
>> > > > > > >> To solve 1+3+4+5 -> It gets even more complicated to support
>> > > > stateless
>> > > > > > and
>> > > > > > >> stateful scenarios simultaneously with one solution.
>> > > > > > >>
>> > > > > > >> If we want to solve all of these issues in one shot, It might
>> > take
>> > > > > > months.
>> > > > > > >> Therefore, I would suggest starting from small and growing up
>> > > > > > iteratively.
>> > > > > > >>
>> > > > > > >> The proposal for the kickoff is to focus on stateless event
>> > > > generation
>> > > > > > >> with e.g. rondom stream and use the name
>> > > > > "StatelessDataGeneratoSource".
>> > > > > > >> The will be a period of time that both DataGeneratorSource
>> will
>> > be
>> > > > > used
>> > > > > > by
>> > > > > > >> the developer. The current DataGeneratorSource will be then
>> > > > > deprecated,
>> > > > > > >> once we can(iteratively):
>> > > > > > >> 1. either enlarge the scope of StatelessDataGeneratoSourcer
>> to
>> > be
>> > > > able
>> > > > > > to
>> > > > > > >> cover stateful scenarios and renaming it to
>> > > > > > "DataGeneratorSourceV2"(follow
>> > > > > > >> the naming convention of SinkV2) or
>> > > > > > >> 2. develop a new "SatefullDataGeneratorSource" based on
>> Source
>> > API
>> > > > > which
>> > > > > > >> can handle the stateful scenarios, if it is impossible to
>> > support
>> > > > both
>> > > > > > >> stateless and stateful scenarios with one GeneratorSource
>> > > > > > implementation.
>> > > > > > >>
>> > > > > > >> Best regards,
>> > > > > > >> Jing
>> > > > > > >>
>> > > > > > >> On Thu, Jun 9, 2022 at 2:48 PM Martijn Visser <
>> > > > > martijnvis...@apache.org
>> > > > > > >
>> > > > > > >> wrote:
>> > > > > > >>
>> > > > > > >>> Hey Alex,
>> > > > > > >>>
>> > > > > > >>> Yes, I think we need to make sure that we're not causing
>> > > confusion
>> > > > (I
>> > > > > > know
>> > > > > > >>> I already was confused). I think the DataSupplierSource is
>> > > already
>> > > > > > better,
>> > > > > > >>> but perhaps there are others who have an even better idea.
>> > > > > > >>>
>> > > > > > >>> Thanks,
>> > > > > > >>>
>> > > > > > >>> Martijn
>> > > > > > >>>
>> > > > > > >>> Op do 9 jun. 2022 om 14:28 schreef Alexander Fedulov <
>> > > > > > >>> alexan...@ververica.com>:
>> > > > > > >>>
>> > > > > > >>>> Hi Martijn,
>> > > > > > >>>>
>> > > > > > >>>> It seems that they serve a bit different purposes though.
>> The
>> > > > > > >>>> DataGenTableSource is for generating random data described
>> by
>> > > the
>> > > > > > Table
>> > > > > > >>>> DDL and is tied into the RowDataGenerator/DataGenerator
>> > concept
>> > > > > which
>> > > > > > is
>> > > > > > >>>> implemented as an Iterator<T>.  The proposed API in
>> contrast
>> > is
>> > > > > > supposed
>> > > > > > >>>> to provide users with an easy way to supply their custom
>> data.
>> > > > > Another
>> > > > > > >>>> difference is that a DataGenerator is supposed to be
>> stateful
>> > > and
>> > > > > has
>> > > > > > to
>> > > > > > >>>> snapshot its state, whereas the proposed API is purely
>> driven
>> > by
>> > > > the
>> > > > > > >>> input
>> > > > > > >>>> index IDs and can be stateless yet remain deterministic.
>> Are
>> > you
>> > > > > sure
>> > > > > > it
>> > > > > > >>>> is a good idea to mix them into the same API? We could
>> think
>> > of
>> > > > > using
>> > > > > > a
>> > > > > > >>>> different name to make it less confusing for the users
>> > > (something
>> > > > > like
>> > > > > > >>>> DataSupplierSource).
>> > > > > > >>>>
>> > > > > > >>>> Best,
>> > > > > > >>>> Alexander Fedulov
>> > > > > > >>>>
>> > > > > > >>>> On Thu, Jun 9, 2022 at 9:14 AM Martijn Visser <
>> > > > > > martijnvis...@apache.org
>> > > > > > >>>>
>> > > > > > >>>> wrote:
>> > > > > > >>>>
>> > > > > > >>>>> Hi Alex,
>> > > > > > >>>>>
>> > > > > > >>>>> Thanks for creating the FLIP and opening up the
>> discussion.
>> > +1
>> > > > > > overall
>> > > > > > >>> for
>> > > > > > >>>>> getting this in place.
>> > > > > > >>>>>
>> > > > > > >>>>> One question: you've already mentioned that this focussed
>> on
>> > > the
>> > > > > > >>>>> DataStream
>> > > > > > >>>>> API. I think it would be a bit confusing that we have a
>> > Datagen
>> > > > > > >>> connector
>> > > > > > >>>>> (on the Table side) that wouldn't leverage this target
>> > > > interface. I
>> > > > > > >>> think
>> > > > > > >>>>> it would be good if we could already have one generic
>> Datagen
>> > > > > > connector
>> > > > > > >>>>> which works for both DataStream API (so that would be a
>> new
>> > one
>> > > > in
>> > > > > > the
>> > > > > > >>>>> Flink repo) and that the Datagen in the Table landscape is
>> > > using
>> > > > > this
>> > > > > > >>>>> target interface too. What do you think?
>> > > > > > >>>>>
>> > > > > > >>>>> Best regards,
>> > > > > > >>>>>
>> > > > > > >>>>> Martijn
>> > > > > > >>>>>
>> > > > > > >>>>> Op wo 8 jun. 2022 om 14:21 schreef Alexander Fedulov <
>> > > > > > >>>>> alexan...@ververica.com>:
>> > > > > > >>>>>
>> > > > > > >>>>>> Hi Xianxun,
>> > > > > > >>>>>>
>> > > > > > >>>>>> Thanks for bringing it up. I do believe it would be
>> useful
>> > to
>> > > > have
>> > > > > > >>> such
>> > > > > > >>>>> a
>> > > > > > >>>>>> CDC data generator but I see the
>> > > > > > >>>>>> efforts to provide one a bit orthogonal to the
>> > > > DataSourceGenerator
>> > > > > > >>>>> proposed
>> > > > > > >>>>>> in the FLIP. FLIP-238 focuses
>> > > > > > >>>>>> on the DataStream API and I could see integration into
>> the
>> > > > > Table/SQL
>> > > > > > >>>>>> ecosystem as the next step that I would
>> > > > > > >>>>>> prefer to keep separate (see KafkaDynamicSource reusing
>> > > > > > >>>>>> KafkaSource<RowData>
>> > > > > > >>>>>> under the hood [1]).
>> > > > > > >>>>>>
>> > > > > > >>>>>> [1]
>> > > > > > >>>>>>
>> > > > > > >>>>>>
>> > > > > > >>>>>
>> > > > > > >>>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://github.com/apache/flink/blob/3adca15859b36c61116fc56fabc24e9647f0ec5f/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java#L223
>> > > > > > >>>>>>
>> > > > > > >>>>>> Best,
>> > > > > > >>>>>> Alexander Fedulov
>> > > > > > >>>>>>
>> > > > > > >>>>>>
>> > > > > > >>>>>>
>> > > > > > >>>>>>
>> > > > > > >>>>>> On Wed, Jun 8, 2022 at 11:01 AM Xianxun Ye <
>> > yxx_c...@163.com>
>> > > > > > wrote:
>> > > > > > >>>>>>
>> > > > > > >>>>>>> Hey Alexander,
>> > > > > > >>>>>>>
>> > > > > > >>>>>>> Making datagen source connector easier to use is really
>> > > helpful
>> > > > > > >>> during
>> > > > > > >>>>>>> doing some PoC/Demo.
>> > > > > > >>>>>>> And I thought about is it possible to produce a
>> changelog
>> > > > stream
>> > > > > by
>> > > > > > >>>>>>> datagen source, so a new flink developer can practice
>> flink
>> > > sql
>> > > > > > >>> with
>> > > > > > >>>>> cdc
>> > > > > > >>>>>>> data using Flink SQL Client CLI.
>> > > > > > >>>>>>> In the flink-examples-table module, a
>> > ChangelogSocketExample
>> > > > > > >>> class[1]
>> > > > > > >>>>>>> describes how to ingest delete or insert data by 'nc'
>> > > command.
>> > > > > Can
>> > > > > > >>> we
>> > > > > > >>>>>>> support producing a changelog stream by the new datagen
>> > > source?
>> > > > > > >>>>>>>
>> > > > > > >>>>>>> [1]
>> > > > > > >>>>>>>
>> > > > > > >>>>>>
>> > > > > > >>>>>
>> > > > > > >>>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogSocketExample.java#L79
>> > > > > > >>>>>>>
>> > > > > > >>>>>>> Best regards,
>> > > > > > >>>>>>>
>> > > > > > >>>>>>> Xianxun
>> > > > > > >>>>>>>
>> > > > > > >>>>>>> On 06/8/2022 08:10,Alexander Fedulov<
>> > alexan...@ververica.com
>> > > >
>> > > > > > >>>>>>> <alexan...@ververica.com> wrote:
>> > > > > > >>>>>>>
>> > > > > > >>>>>>> I looked a bit further and it seems it should actually
>> be
>> > > > easier
>> > > > > > >>> than
>> > > > > > >>>>> I
>> > > > > > >>>>>>> initially thought:  SourceReader extends
>> CheckpointListener
>> > > > > > >>> interface
>> > > > > > >>>>> and
>> > > > > > >>>>>>> with its custom implementation it should be possible to
>> > > achieve
>> > > > > > >>>>> similar
>> > > > > > >>>>>>> results. A prototype that I have for the generator uses
>> an
>> > > > > > >>>>>>> IteratorSourceReader
>> > > > > > >>>>>>> under the hood by default but we could consider adding
>> the
>> > > > > ability
>> > > > > > >>> to
>> > > > > > >>>>>>> supply something like a DataGeneratorSourceReaderFactory
>> > that
>> > > > > would
>> > > > > > >>>>> allow
>> > > > > > >>>>>>> provisioning the DataGeneratorSource with customized
>> > > > > > >>> implementations
>> > > > > > >>>>> for
>> > > > > > >>>>>>> cases like this.
>> > > > > > >>>>>>>
>> > > > > > >>>>>>> Best,
>> > > > > > >>>>>>> Alexander Fedulov
>> > > > > > >>>>>>>
>> > > > > > >>>>>>> On Wed, Jun 8, 2022 at 12:58 AM Alexander Fedulov <
>> > > > > > >>>>>> alexan...@ververica.com
>> > > > > > >>>>>>>>
>> > > > > > >>>>>>> wrote:
>> > > > > > >>>>>>>
>> > > > > > >>>>>>> Hi Steven,
>> > > > > > >>>>>>>
>> > > > > > >>>>>>> This is going to be tricky since in the new Source API
>> the
>> > > > > > >>>>> checkpointing
>> > > > > > >>>>>>> aspects that you based your logic on are pushed further
>> > away
>> > > > from
>> > > > > > >>> the
>> > > > > > >>>>>>> low-level interfaces responsible for handling data and
>> > splits
>> > > > > [1].
>> > > > > > >>> At
>> > > > > > >>>>> the
>> > > > > > >>>>>>> same time, the SourceCoordinatorProvider is hardwired
>> into
>> > > the
>> > > > > > >>>>> internals
>> > > > > > >>>>>>> of the framework, so I don't think it will be possible
>> to
>> > > > > provide a
>> > > > > > >>>>>>> customized implementation for testing purposes.
>> > > > > > >>>>>>>
>> > > > > > >>>>>>> The only chance to tie data generation to checkpointing
>> in
>> > > the
>> > > > > new
>> > > > > > >>>>> Source
>> > > > > > >>>>>>> API that I see at the moment is via the SplitEnumerator
>> > > > > serializer
>> > > > > > >>> (
>> > > > > > >>>>>>> getEnumeratorCheckpointSerializer() method) [2]. In
>> theory,
>> > > it
>> > > > > > >>> should
>> > > > > > >>>>> be
>> > > > > > >>>>>>> possible to share a variable visible both to the
>> generator
>> > > > > function
>> > > > > > >>>>> and
>> > > > > > >>>>>> to
>> > > > > > >>>>>>> the serializer and manipulate it whenever the
>> serialize()
>> > > > method
>> > > > > > >>> gets
>> > > > > > >>>>>>> called upon a checkpoint request. That said, you still
>> > won't
>> > > > get
>> > > > > > >>>>>>> notifications of successful checkpoints that you
>> currently
>> > > use
>> > > > > > >>> (this
>> > > > > > >>>>> info
>> > > > > > >>>>>>> is only available to the SourceCoordinator).
>> > > > > > >>>>>>>
>> > > > > > >>>>>>> In general, regardless of the generator implementation
>> > > itself,
>> > > > > the
>> > > > > > >>> new
>> > > > > > >>>>>>> Source
>> > > > > > >>>>>>> API does not seem to support the use case of verifying
>> > > > > checkpoints
>> > > > > > >>>>>>> contents in lockstep with produced data, at least I do
>> not
>> > > see
>> > > > an
>> > > > > > >>>>>> immediate
>> > > > > > >>>>>>> solution for this. Can you think of a different way of
>> > > checking
>> > > > > the
>> > > > > > >>>>>>> correctness of the Iceberg Sink implementation that does
>> > not
>> > > > rely
>> > > > > > >>> on
>> > > > > > >>>>> this
>> > > > > > >>>>>>> approach?
>> > > > > > >>>>>>>
>> > > > > > >>>>>>> Best,
>> > > > > > >>>>>>> Alexander Fedulov
>> > > > > > >>>>>>>
>> > > > > > >>>>>>> [1]
>> > > > > > >>>>>>>
>> > > > > > >>>>>>>
>> > > > > > >>>>>>
>> > > > > > >>>>>
>> > > > > > >>>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://github.com/apache/flink/blob/0f19c2472c54aac97e4067f5398731ab90036d1a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L337
>> > > > > > >>>>>>>
>> > > > > > >>>>>>> [2]
>> > > > > > >>>>>>>
>> > > > > > >>>>>>>
>> > > > > > >>>>>>
>> > > > > > >>>>>
>> > > > > > >>>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://github.com/apache/flink/blob/e4b000818c15b5b781c4e5262ba83bfc9d65121a/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java#L97
>> > > > > > >>>>>>>
>> > > > > > >>>>>>> On Tue, Jun 7, 2022 at 6:03 PM Steven Wu <
>> > > stevenz...@gmail.com
>> > > > >
>> > > > > > >>>>> wrote:
>> > > > > > >>>>>>>
>> > > > > > >>>>>>> In Iceberg source, we have a data generator source that
>> can
>> > > > > control
>> > > > > > >>>>> the
>> > > > > > >>>>>>> records per checkpoint cycle. Can we support sth like
>> this
>> > in
>> > > > the
>> > > > > > >>>>>>> DataGeneratorSource?
>> > > > > > >>>>>>>
>> > > > > > >>>>>>>
>> > > > > > >>>>>>>
>> > > > > > >>>>>>>
>> > > > > > >>>>>>
>> > > > > > >>>>>
>> > > > > > >>>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://github.com/apache/iceberg/blob/master/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTestSource.java
>> > > > > > >>>>>>> public BoundedTestSource(List<List<T>>
>> > elementsPerCheckpoint,
>> > > > > > >>> boolean
>> > > > > > >>>>>>> checkpointEnabled)
>> > > > > > >>>>>>>
>> > > > > > >>>>>>> Thanks,
>> > > > > > >>>>>>> Steven
>> > > > > > >>>>>>>
>> > > > > > >>>>>>> On Tue, Jun 7, 2022 at 8:48 AM Alexander Fedulov <
>> > > > > > >>>>>> alexan...@ververica.com
>> > > > > > >>>>>>>
>> > > > > > >>>>>>>
>> > > > > > >>>>>>> wrote:
>> > > > > > >>>>>>>
>> > > > > > >>>>>>> Hi everyone,
>> > > > > > >>>>>>>
>> > > > > > >>>>>>> I would like to open a discussion on FLIP-238: Introduce
>> > > > > > >>> FLIP-27-based
>> > > > > > >>>>>>>
>> > > > > > >>>>>>> Data
>> > > > > > >>>>>>>
>> > > > > > >>>>>>> Generator Source [1]. During the discussion about
>> > deprecating
>> > > > the
>> > > > > > >>>>>>> SourceFunction API [2] it became evident that an
>> > easy-to-use
>> > > > > > >>>>>>> FLIP-27-compatible data generator source is needed so
>> that
>> > > the
>> > > > > > >>> current
>> > > > > > >>>>>>> SourceFunction-based data generator implementations
>> could
>> > be
>> > > > > phased
>> > > > > > >>>>> out
>> > > > > > >>>>>>>
>> > > > > > >>>>>>> for
>> > > > > > >>>>>>>
>> > > > > > >>>>>>> both Flink demo/PoC applications and for the internal
>> Flink
>> > > > > tests.
>> > > > > > >>>>> This
>> > > > > > >>>>>>> FLIP proposes to introduce a generic DataGeneratorSource
>> > > > capable
>> > > > > of
>> > > > > > >>>>>>> producing events of an arbitrary type based on a
>> > > user-supplied
>> > > > > > >>>>>>>
>> > > > > > >>>>>>> MapFunction.
>> > > > > > >>>>>>>
>> > > > > > >>>>>>>
>> > > > > > >>>>>>> Looking forward to your feedback.
>> > > > > > >>>>>>>
>> > > > > > >>>>>>> [1] https://cwiki.apache.org/confluence/x/9Av1D
>> > > > > > >>>>>>> [2]
>> > > > > > >>>
>> > https://lists.apache.org/thread/d6cwqw9b3105wcpdkwq7rr4s7x4ywqr9
>> > > > > > >>>>>>>
>> > > > > > >>>>>>> Best,
>> > > > > > >>>>>>> Alexander Fedulov
>> > > > > > >>>>>>>
>> > > > > > >>>>>>>
>> > > > > > >>>>>>>
>> > > > > > >>>>>>>
>> > > > > > >>>>>>
>> > > > > > >>>>>
>> > > > > > >>>>
>> > > > > > >>>
>> > > > > > >>
>> > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>

Reply via email to