User function from the point of view of Flink is any function that you as a user write. In this case, MetadataEnrichment.AsyncFlowLookup is a user-function of the Async Operator.
I don't recommend harness for several reasons: - It's not Public API, we will adjust it and we will introduce breaking changes with each minor release. - You need to know internal implementation details, especially with the threading model, to invoke the harness methods in the correct order. We will change internals and your test will stop working in newer Flink versions. - Harness is meant as a way for Flink devs to perform unit tests of operators or parts thereof. A unit test for user-defined function should not use any Flink classes (e.g. you really just test the logic of some methods of your AsyncFlowLookup). If you want to test how it interacts with Flink, you get an integration test by definition. On Fri, Aug 6, 2021 at 10:11 AM Debraj Manna <subharaj.ma...@gmail.com> wrote: > Thanks it worked. > > Can you please let me know what are you referring to as user function and > why it is not recommended to not use harness with it? > > On Wed, 4 Aug 2021, 22:43 Arvid Heise, <ar...@apache.org> wrote: > >> I would strongly recommend not using the harness for testing user >> functions. >> >> Instead I'd just create an ITCase like this: >> >> @ClassRule >> public static final MiniClusterWithClientResource MINI_CLUSTER = >> new MiniClusterWithClientResource( >> new MiniClusterResourceConfiguration.Builder() >> .setNumberTaskManagers(1) >> .setNumberSlotsPerTaskManager(PARALLELISM) >> .build()); >> >> // ------------------------------------------------------------------------ >> >> @Test >> public void testAsyncFunction() throws Exception { >> final StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> env.setParallelism(PARALLELISM); >> >> final DataStream<Long> stream = env.fromSequence(1L, 1_000L); // or >> fromElements >> stream = AsyncDataStream.orderedWait(stream, ...); >> >> final List<Long> result = stream.executeAndCollect(10000); >> assertThat(result, containsInAnyOrder(LongStream.rangeClosed(1, >> 1000).boxed().toArray())); >> } >> >> See also >> https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceITCase.java#L62-L62 >> >> On Wed, Aug 4, 2021 at 7:00 PM Debraj Manna <subharaj.ma...@gmail.com> >> wrote: >> >>> HI >>> >>> I am trying to use RichAsyncFunction with flink's test harness. My code >>> looks like below >>> >>> final MetadataEnrichment.AsyncFlowLookup fn = new >>> MetadataEnrichment.AsyncFlowLookup(); >>> final AsyncWaitOperatorFactory<GenericMetric, Tuple2<GenericMetric, >>> EnrichedMinTuple>> operator = new AsyncWaitOperatorFactory<>(fn, 2000, 1, >>> AsyncDataStream.OutputMode.ORDERED); >>> final OneInputStreamOperatorTestHarness<GenericMetric, >>> Tuple2<GenericMetric, EnrichedMinTuple>> harness = new >>> OneInputStreamOperatorTestHarness<>(operator); >>> Configuration config = new Configuration(); >>> config.set(StoreOptions.CONFIG_STORE_TYPE, >>> ConfigStoreFactory.StoreType.MEMORY.name()); >>> harness.getExecutionConfig().setGlobalJobParameters(config); >>> harness.getExecutionConfig().registerKryoType(GenericMetric.class); >>> harness.getExecutionConfig().registerKryoType(EnrichedMinTuple.class); >>> harness.open(); >>> >>> But harness.open() is throwing the below exception >>> >>> java.lang.IllegalStateException >>> at >>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:177) >>> at >>> org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn(StreamConfig.java:290) >>> at >>> org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn1(StreamConfig.java:280) >>> at >>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:144) >>> at >>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory.createStreamOperator(AsyncWaitOperatorFactory.java:72) >>> at >>> org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:80) >>> at >>> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:398) >>> at >>> org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.setup(OneInputStreamOperatorTestHarness.java:180) >>> at >>> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:385) >>> at >>> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:540) >>> at >>> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:428) >>> at >>> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeEmptyState(AbstractStreamOperatorTestHarness.java:436) >>> at >>> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:634) >>> >>> Can someone suggest what could be going wrong? >>> >>> >>>