I am happy to announce that the blocker has been resolved and SourceFunction
is now marked as @Deprecated [1].
The work continues to remove the dependencies on the SourceFunction API in
Flink internals in order to prepare for dropping it completely in Flink 2.0.

I'd like to get some opinions on an open question I currently have:
StreamExecutionEnvironment#fromCollection() methods need to be modified to
use
the new FLIP-27 DataGeneratorSource [2]. This presents an issue because
ITCases in DataGeneratorSource rely on StreamExecutionEnvironment, so we end
up with a circular dependency.

I see two main options here:

1. Split the tests from the DataGeneratorSource into a separate module
called
   flink-connector-datagen-tests
   This is a rather straightforward solution that breaks the cycle, but so
far
   we managed to avoid such workarounds and I'd like to know if anyone has a
   strong opinion against it

2. Move #fromCollection() methods into flink-connector-datagen, so
   StreamExecutionEnvironment#fromCollection() becomes
   DataGeneratorSource#fromCollection()
   While this deviates from the familiar pattern, it should be acceptable
given
   the major version change.The key question here is whether we should also
   introduce a dependency from flink-connector-datagen to
flink-streaming-java.
   This dependency does not exist in other connectors, but it would enhance
   usability. Without it, the user code would look somewhat like
   this:

   Collection<Integer> data = ...;
   DataGeneratorSource<Integer> collectionSource =
       DataGeneratorSource.fromCollection(data);
   DataStreamSource<Integer> source = env.fromSource(collectionSource,
       WatermarkStrategy.forMonotonousTimestamps(), "Collection source")
       .forceNonParallel();

   Especially the necessity for the forceNonParallel()/setParallelism(1)
call is
  concerning because it is easy to forget.

   With the dependency, we can hide the internal details and achieve an API
  closer to the current #fromCollection() implementation:

   Collection<Integer> data = ...;
   DataStreamSource<Integer> source =
       DataGeneratorSource.fromCollection(env, data);

I would appreciate hearing your thoughts and suggestions on this matter.

[1] https://github.com/apache/flink/pull/20049
[2] https://github.com/apache/flink/pull/22850

Best,
Alex




On Wed, 21 Jun 2023 at 19:27, Alexander Fedulov <alexander.fedu...@gmail.com>
wrote:

> I'd like to revive the efforts to deprecate the SourceFunction API.
>
> It would be great to get a review for this PR:
> https://github.com/apache/flink/pull/21774
>
> It immediately unblocks marking the actual SourceFunction as deprecated.
> https://github.com/apache/flink/pull/20049
>
> There is also this work thread related
> to StreamExecutionEnvironment#fromCollection() methods.
> The discussion seem to have stalled:
> https://github.com/apache/flink/pull/21028
>
> Thanks,
> Alex
>
> On 2022/06/15 19:30:31 Alexander Fedulov wrote:
> > Thank you all for your valuable input and participation in the discussion
> >
> > The vote is open now [1]
> >
> > [1] https://lists.apache.org/thread/kv9rj3w2rmkb8jtss5bqffhw57or7v8v
> >
> > Best,
> > Alexander Fedulov
>
>

Reply via email to