Hi Xianxun,

Thanks for bringing it up. I do believe it would be useful to have such a
CDC data generator but I see the
efforts to provide one a bit orthogonal to the DataSourceGenerator proposed
in the FLIP. FLIP-238 focuses
on the DataStream API and I could see integration into the Table/SQL
ecosystem as the next step that I would
prefer to keep separate (see KafkaDynamicSource reusing KafkaSource<RowData>
under the hood [1]).

[1]
https://github.com/apache/flink/blob/3adca15859b36c61116fc56fabc24e9647f0ec5f/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java#L223

Best,
Alexander Fedulov




On Wed, Jun 8, 2022 at 11:01 AM Xianxun Ye <yxx_c...@163.com> wrote:

> Hey Alexander,
>
> Making datagen source connector easier to use is really helpful during
> doing some PoC/Demo.
> And I thought about is it possible to produce a changelog stream by
> datagen source, so a new flink developer can practice flink sql with cdc
> data using Flink SQL Client CLI.
> In the flink-examples-table module, a ChangelogSocketExample class[1]
> describes how to ingest delete or insert data by 'nc' command. Can we
> support producing a changelog stream by the new datagen source?
>
> [1]
> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogSocketExample.java#L79
>
> Best regards,
>
> Xianxun
>
> On 06/8/2022 08:10,Alexander Fedulov<alexan...@ververica.com>
> <alexan...@ververica.com> wrote:
>
> I looked a bit further and it seems it should actually be easier than I
> initially thought:  SourceReader extends CheckpointListener interface and
> with its custom implementation it should be possible to achieve similar
> results. A prototype that I have for the generator uses an
> IteratorSourceReader
> under the hood by default but we could consider adding the ability to
> supply something like a DataGeneratorSourceReaderFactory that would allow
> provisioning the DataGeneratorSource with customized implementations for
> cases like this.
>
> Best,
> Alexander Fedulov
>
> On Wed, Jun 8, 2022 at 12:58 AM Alexander Fedulov <alexan...@ververica.com
> >
> wrote:
>
> Hi Steven,
>
> This is going to be tricky since in the new Source API the checkpointing
> aspects that you based your logic on are pushed further away from the
> low-level interfaces responsible for handling data and splits [1]. At the
> same time, the SourceCoordinatorProvider is hardwired into the internals
> of the framework, so I don't think it will be possible to provide a
> customized implementation for testing purposes.
>
> The only chance to tie data generation to checkpointing in the new Source
> API that I see at the moment is via the SplitEnumerator serializer (
> getEnumeratorCheckpointSerializer() method) [2]. In theory, it should be
> possible to share a variable visible both to the generator function and to
> the serializer and manipulate it whenever the serialize() method gets
> called upon a checkpoint request. That said, you still won't get
> notifications of successful checkpoints that you currently use (this info
> is only available to the SourceCoordinator).
>
> In general, regardless of the generator implementation itself, the new
> Source
> API does not seem to support the use case of verifying checkpoints
> contents in lockstep with produced data, at least I do not see an immediate
> solution for this. Can you think of a different way of checking the
> correctness of the Iceberg Sink implementation that does not rely on this
> approach?
>
> Best,
> Alexander Fedulov
>
> [1]
>
> https://github.com/apache/flink/blob/0f19c2472c54aac97e4067f5398731ab90036d1a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java#L337
>
> [2]
>
> https://github.com/apache/flink/blob/e4b000818c15b5b781c4e5262ba83bfc9d65121a/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java#L97
>
> On Tue, Jun 7, 2022 at 6:03 PM Steven Wu <stevenz...@gmail.com> wrote:
>
> In Iceberg source, we have a data generator source that can control the
> records per checkpoint cycle. Can we support sth like this in the
> DataGeneratorSource?
>
>
>
> https://github.com/apache/iceberg/blob/master/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/BoundedTestSource.java
> public BoundedTestSource(List<List<T>> elementsPerCheckpoint, boolean
> checkpointEnabled)
>
> Thanks,
> Steven
>
> On Tue, Jun 7, 2022 at 8:48 AM Alexander Fedulov <alexan...@ververica.com
>
>
> wrote:
>
> Hi everyone,
>
> I would like to open a discussion on FLIP-238: Introduce FLIP-27-based
>
> Data
>
> Generator Source [1]. During the discussion about deprecating the
> SourceFunction API [2] it became evident that an easy-to-use
> FLIP-27-compatible data generator source is needed so that the current
> SourceFunction-based data generator implementations could be phased out
>
> for
>
> both Flink demo/PoC applications and for the internal Flink tests. This
> FLIP proposes to introduce a generic DataGeneratorSource capable of
> producing events of an arbitrary type based on a user-supplied
>
> MapFunction.
>
>
> Looking forward to your feedback.
>
> [1] https://cwiki.apache.org/confluence/x/9Av1D
> [2] https://lists.apache.org/thread/d6cwqw9b3105wcpdkwq7rr4s7x4ywqr9
>
> Best,
> Alexander Fedulov
>
>
>
>

Reply via email to