Thanks again for the explanation. On Fri, 6 Aug 2021, 15:39 Arvid Heise, <ar...@apache.org> wrote:
> User function from the point of view of Flink is any function that you as > a user write. In this case, MetadataEnrichment.AsyncFlowLookup is a > user-function of the Async Operator. > > I don't recommend harness for several reasons: > - It's not Public API, we will adjust it and we will introduce breaking > changes with each minor release. > - You need to know internal implementation details, especially with the > threading model, to invoke the harness methods in the correct order. We > will change internals and your test will stop working in newer Flink > versions. > - Harness is meant as a way for Flink devs to perform unit tests of > operators or parts thereof. A unit test for user-defined function should > not use any Flink classes (e.g. you really just test the logic of some > methods of your AsyncFlowLookup). If you want to test how it interacts with > Flink, you get an integration test by definition. > > > > On Fri, Aug 6, 2021 at 10:11 AM Debraj Manna <subharaj.ma...@gmail.com> > wrote: > >> Thanks it worked. >> >> Can you please let me know what are you referring to as user function and >> why it is not recommended to not use harness with it? >> >> On Wed, 4 Aug 2021, 22:43 Arvid Heise, <ar...@apache.org> wrote: >> >>> I would strongly recommend not using the harness for testing user >>> functions. >>> >>> Instead I'd just create an ITCase like this: >>> >>> @ClassRule >>> public static final MiniClusterWithClientResource MINI_CLUSTER = >>> new MiniClusterWithClientResource( >>> new MiniClusterResourceConfiguration.Builder() >>> .setNumberTaskManagers(1) >>> .setNumberSlotsPerTaskManager(PARALLELISM) >>> .build()); >>> >>> // ------------------------------------------------------------------------ >>> >>> @Test >>> public void testAsyncFunction() throws Exception { >>> final StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> env.setParallelism(PARALLELISM); >>> >>> final DataStream<Long> stream = env.fromSequence(1L, 1_000L); // or >>> fromElements >>> stream = AsyncDataStream.orderedWait(stream, ...); >>> >>> final List<Long> result = stream.executeAndCollect(10000); >>> assertThat(result, containsInAnyOrder(LongStream.rangeClosed(1, >>> 1000).boxed().toArray())); >>> } >>> >>> See also >>> https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceITCase.java#L62-L62 >>> >>> On Wed, Aug 4, 2021 at 7:00 PM Debraj Manna <subharaj.ma...@gmail.com> >>> wrote: >>> >>>> HI >>>> >>>> I am trying to use RichAsyncFunction with flink's test harness. My code >>>> looks like below >>>> >>>> final MetadataEnrichment.AsyncFlowLookup fn = new >>>> MetadataEnrichment.AsyncFlowLookup(); >>>> final AsyncWaitOperatorFactory<GenericMetric, Tuple2<GenericMetric, >>>> EnrichedMinTuple>> operator = new AsyncWaitOperatorFactory<>(fn, 2000, 1, >>>> AsyncDataStream.OutputMode.ORDERED); >>>> final OneInputStreamOperatorTestHarness<GenericMetric, >>>> Tuple2<GenericMetric, EnrichedMinTuple>> harness = new >>>> OneInputStreamOperatorTestHarness<>(operator); >>>> Configuration config = new Configuration(); >>>> config.set(StoreOptions.CONFIG_STORE_TYPE, >>>> ConfigStoreFactory.StoreType.MEMORY.name()); >>>> harness.getExecutionConfig().setGlobalJobParameters(config); >>>> harness.getExecutionConfig().registerKryoType(GenericMetric.class); >>>> harness.getExecutionConfig().registerKryoType(EnrichedMinTuple.class); >>>> harness.open(); >>>> >>>> But harness.open() is throwing the below exception >>>> >>>> java.lang.IllegalStateException >>>> at >>>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:177) >>>> at >>>> org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn(StreamConfig.java:290) >>>> at >>>> org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn1(StreamConfig.java:280) >>>> at >>>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:144) >>>> at >>>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory.createStreamOperator(AsyncWaitOperatorFactory.java:72) >>>> at >>>> org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:80) >>>> at >>>> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:398) >>>> at >>>> org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.setup(OneInputStreamOperatorTestHarness.java:180) >>>> at >>>> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:385) >>>> at >>>> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:540) >>>> at >>>> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:428) >>>> at >>>> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeEmptyState(AbstractStreamOperatorTestHarness.java:436) >>>> at >>>> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:634) >>>> >>>> Can someone suggest what could be going wrong? >>>> >>>> >>>>