Hi Ken,

Thanks Ken. I guess the problem I had was, as a complete newbie to Flink,
navigating the type system and being still confused about differences
between Source, SourceFunction, DataStream, DataStreamOperator, etc.

I think the DataStream<> type is what I'm looking for? That is, then I can

DataStream<EventData> source = env.fromSource(getKafkaSource(params),
watermarkStrategy, "Kafka");
when using KafkaSource in the normal setup

DataStream<EventData> s = env.addSource(new ParallelTestSource<>(...));
when using the testing source [1]

Does that sound right?


On Tue, May 24, 2022 at 7:57 PM Ken Krugler <kkrugler_li...@transpac.com>

> Hi Piotr,
> The way I handle this is via a workflow class that uses a builder approach
> to specifying inputs, outputs, and any other configuration settings.
> The inputs are typically DataStream<xxx>.
> This way I can separate out the Kafka inputs, and use testing sources that
> give me very precise control over the inputs (e.g. I can hold up on right
> side data to ensure my stateful left join junction is handling deferred
> joins properly). I can also use Kafka unit test support (either kafka-junit
> or Spring embedded Kafka) if needed.
> Then in the actual tool class (with a main method) I’ll wire up the real
> Kafka sources, with whatever logic is required to convert the consumer
> records to what the workflow is expecting.
> — Ken
> On May 24, 2022, at 8:34 AM, Piotr Domagalski <pi...@domagalski.com>
> wrote:
> Hi,
> I'm wondering: what ithe recommended way to structure the job which one
> would like to test later on with `MiniCluster`.
> I've looked at the flink-training repository examples [1] and they tend to
> expose the main job as a class that accepts a `SourceFunction` and a
> `SinkFunction`, which make sense. But then, my job is normally constructed
> with `KafkaSource` which is then passed to `env.fromSource(...`.
> Is there any recommended way of handling these discrepancies, ie. having
> to use `env.addSource(sourceFunction)` vs `env.fromSource(source)`?
> [1]
> https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61
> --
> Piotr Domagalski
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch

Piotr Domagalski

Reply via email to