Hi Ken, Thanks Ken. I guess the problem I had was, as a complete newbie to Flink, navigating the type system and being still confused about differences between Source, SourceFunction, DataStream, DataStreamOperator, etc.
I think the DataStream<> type is what I'm looking for? That is, then I can use: DataStream<EventData> source = env.fromSource(getKafkaSource(params), watermarkStrategy, "Kafka"); when using KafkaSource in the normal setup and DataStream<EventData> s = env.addSource(new ParallelTestSource<>(...)); when using the testing source [1] Does that sound right? [1] https://github.com/apache/flink-training/blob/master/common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java#L26 On Tue, May 24, 2022 at 7:57 PM Ken Krugler <kkrugler_li...@transpac.com> wrote: > Hi Piotr, > > The way I handle this is via a workflow class that uses a builder approach > to specifying inputs, outputs, and any other configuration settings. > > The inputs are typically DataStream<xxx>. > > This way I can separate out the Kafka inputs, and use testing sources that > give me very precise control over the inputs (e.g. I can hold up on right > side data to ensure my stateful left join junction is handling deferred > joins properly). I can also use Kafka unit test support (either kafka-junit > or Spring embedded Kafka) if needed. > > Then in the actual tool class (with a main method) I’ll wire up the real > Kafka sources, with whatever logic is required to convert the consumer > records to what the workflow is expecting. > > — Ken > > On May 24, 2022, at 8:34 AM, Piotr Domagalski <pi...@domagalski.com> > wrote: > > Hi, > > I'm wondering: what ithe recommended way to structure the job which one > would like to test later on with `MiniCluster`. > > I've looked at the flink-training repository examples [1] and they tend to > expose the main job as a class that accepts a `SourceFunction` and a > `SinkFunction`, which make sense. But then, my job is normally constructed > with `KafkaSource` which is then passed to `env.fromSource(...`. > > Is there any recommended way of handling these discrepancies, ie. having > to use `env.addSource(sourceFunction)` vs `env.fromSource(source)`? > > [1] > https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61 > > -- > Piotr Domagalski > > > -------------------------- > Ken Krugler > http://www.scaleunlimited.com > Custom big data solutions > Flink, Pinot, Solr, Elasticsearch > > > > -- Piotr Domagalski