Glad to see you have resolved the issue! If you want to learn more about the Source API, the Flink document [1] has a detailed description about it. The original proposal FLIP-27 [2] is also a good reference.
[1] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/sources/ [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface Cheers, Qingsheng > On May 25, 2022, at 17:54, Piotr Domagalski <pi...@domagalski.com> wrote: > > Thank you Qingsheng, this context helps a lot! > > And once again thank you all for being such a helpful community! > > P.S. I actually struggled for a bit trying to understand why my refactored > solution which accepts DataStream<> wouldn't work ("no operators defined in > the streaming topology"). Turns out, my assumption that I can call > StreamExecutionEnvironment.getExecutionEnvironment() multiple times and get > the same environment, was wrong. I had env.addSource and env.fromSource calls > using one instance of the environment, but then called env.execute() on > another instance :facepalm: > > On Wed, May 25, 2022 at 6:04 AM Qingsheng Ren <renqs...@gmail.com> wrote: > Hi Piotr, > > I’d like to share my understanding about this. Source and SourceFunction are > both interfaces to data sources. SourceFunction was designed and introduced > earlier and as the project evolved, many shortcomings emerged. Therefore, the > community re-designed the source interface and introduced the new Source API > in FLIP-27 [1]. > > Finally we will deprecate the SourceFunction and use Source as the only > interface for all data sources, but considering the huge cost of migration > you’ll see SourceFunction and Source co-exist for some time, like the > ParallelTestSource you mentioned is still on SourceFunction, and KafkaSource > as a pioneer has already migrated to the new Source API. > > I think the API to end users didn't change a lot: both > env.addSource(SourceFunction) and env.fromSource(Source) return a DataStream, > and you could apply downstream transformations onto it. > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface > > > Cheers, > > Qingsheng > > > On May 25, 2022, at 03:19, Piotr Domagalski <pi...@domagalski.com> wrote: > > > > Hi Ken, > > > > Thanks Ken. I guess the problem I had was, as a complete newbie to Flink, > > navigating the type system and being still confused about differences > > between Source, SourceFunction, DataStream, DataStreamOperator, etc. > > > > I think the DataStream<> type is what I'm looking for? That is, then I can > > use: > > > > DataStream<EventData> source = env.fromSource(getKafkaSource(params), > > watermarkStrategy, "Kafka"); > > when using KafkaSource in the normal setup > > > > and > > DataStream<EventData> s = env.addSource(new ParallelTestSource<>(...)); > > when using the testing source [1] > > > > Does that sound right? > > > > [1] > > https://github.com/apache/flink-training/blob/master/common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java#L26 > > > > On Tue, May 24, 2022 at 7:57 PM Ken Krugler <kkrugler_li...@transpac.com> > > wrote: > > Hi Piotr, > > > > The way I handle this is via a workflow class that uses a builder approach > > to specifying inputs, outputs, and any other configuration settings. > > > > The inputs are typically DataStream<xxx>. > > > > This way I can separate out the Kafka inputs, and use testing sources that > > give me very precise control over the inputs (e.g. I can hold up on right > > side data to ensure my stateful left join junction is handling deferred > > joins properly). I can also use Kafka unit test support (either kafka-junit > > or Spring embedded Kafka) if needed. > > > > Then in the actual tool class (with a main method) I’ll wire up the real > > Kafka sources, with whatever logic is required to convert the consumer > > records to what the workflow is expecting. > > > > — Ken > > > >> On May 24, 2022, at 8:34 AM, Piotr Domagalski <pi...@domagalski.com> wrote: > >> > >> Hi, > >> > >> I'm wondering: what ithe recommended way to structure the job which one > >> would like to test later on with `MiniCluster`. > >> > >> I've looked at the flink-training repository examples [1] and they tend to > >> expose the main job as a class that accepts a `SourceFunction` and a > >> `SinkFunction`, which make sense. But then, my job is normally constructed > >> with `KafkaSource` which is then passed to `env.fromSource(...`. > >> > >> Is there any recommended way of handling these discrepancies, ie. having > >> to use `env.addSource(sourceFunction)` vs `env.fromSource(source)`? > >> > >> [1] > >> https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61 > >> > >> -- > >> Piotr Domagalski > > > > -------------------------- > > Ken Krugler > > http://www.scaleunlimited.com > > Custom big data solutions > > Flink, Pinot, Solr, Elasticsearch > > > > > > > > > > > > -- > > Piotr Domagalski > > > > -- > Piotr Domagalski