Hi Ken,

Thanks Ken. I guess the problem I had was, as a complete newbie to Flink,
navigating the type system and being still confused about differences
between Source, SourceFunction, DataStream, DataStreamOperator, etc.

I think the DataStream<> type is what I'm looking for? That is, then I can
use:

DataStream<EventData> source = env.fromSource(getKafkaSource(params),
watermarkStrategy, "Kafka");
when using KafkaSource in the normal setup

and
DataStream<EventData> s = env.addSource(new ParallelTestSource<>(...));
when using the testing source [1]

Does that sound right?

[1]
https://github.com/apache/flink-training/blob/master/common/src/test/java/org/apache/flink/training/exercises/testing/ParallelTestSource.java#L26

On Tue, May 24, 2022 at 7:57 PM Ken Krugler <kkrugler_li...@transpac.com>
wrote:

> Hi Piotr,
>
> The way I handle this is via a workflow class that uses a builder approach
> to specifying inputs, outputs, and any other configuration settings.
>
> The inputs are typically DataStream<xxx>.
>
> This way I can separate out the Kafka inputs, and use testing sources that
> give me very precise control over the inputs (e.g. I can hold up on right
> side data to ensure my stateful left join junction is handling deferred
> joins properly). I can also use Kafka unit test support (either kafka-junit
> or Spring embedded Kafka) if needed.
>
> Then in the actual tool class (with a main method) I’ll wire up the real
> Kafka sources, with whatever logic is required to convert the consumer
> records to what the workflow is expecting.
>
> — Ken
>
> On May 24, 2022, at 8:34 AM, Piotr Domagalski <pi...@domagalski.com>
> wrote:
>
> Hi,
>
> I'm wondering: what ithe recommended way to structure the job which one
> would like to test later on with `MiniCluster`.
>
> I've looked at the flink-training repository examples [1] and they tend to
> expose the main job as a class that accepts a `SourceFunction` and a
> `SinkFunction`, which make sense. But then, my job is normally constructed
> with `KafkaSource` which is then passed to `env.fromSource(...`.
>
> Is there any recommended way of handling these discrepancies, ie. having
> to use `env.addSource(sourceFunction)` vs `env.fromSource(source)`?
>
> [1]
> https://github.com/apache/flink-training/blob/05791e55ad7ff0358b5c57ea8f40eada4a1f626a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingIntegrationTest.java#L61
>
> --
> Piotr Domagalski
>
>
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
>
>
>
>

-- 
Piotr Domagalski

Reply via email to