Thanks again for the explanation.

On Fri, 6 Aug 2021, 15:39 Arvid Heise, <ar...@apache.org> wrote:

> User function from the point of view of Flink is any function that you as
> a user write. In this case, MetadataEnrichment.AsyncFlowLookup is a
> user-function of the Async Operator.
>
> I don't recommend harness for several reasons:
> - It's not Public API, we will adjust it and we will introduce breaking
> changes with each minor release.
> - You need to know internal implementation details, especially with the
> threading model, to invoke the harness methods in the correct order. We
> will change internals and your test will stop working in newer Flink
> versions.
> - Harness is meant as a way for Flink devs to perform unit tests of
> operators or parts thereof. A unit test for user-defined function should
> not use any Flink classes (e.g. you really just test the logic of some
> methods of your AsyncFlowLookup). If you want to test how it interacts with
> Flink, you get an integration test by definition.
>
>
>
> On Fri, Aug 6, 2021 at 10:11 AM Debraj Manna <subharaj.ma...@gmail.com>
> wrote:
>
>> Thanks it worked.
>>
>> Can you please let me know what are you referring to as user function and
>> why it is not recommended to not use harness with it?
>>
>> On Wed, 4 Aug 2021, 22:43 Arvid Heise, <ar...@apache.org> wrote:
>>
>>> I would strongly recommend not using the harness for testing user
>>> functions.
>>>
>>> Instead I'd just create an ITCase like this:
>>>
>>> @ClassRule
>>> public static final MiniClusterWithClientResource MINI_CLUSTER =
>>>         new MiniClusterWithClientResource(
>>>                 new MiniClusterResourceConfiguration.Builder()
>>>                         .setNumberTaskManagers(1)
>>>                         .setNumberSlotsPerTaskManager(PARALLELISM)
>>>                         .build());
>>>
>>> // ------------------------------------------------------------------------
>>>
>>> @Test
>>> public void testAsyncFunction() throws Exception {
>>>     final StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>     env.setParallelism(PARALLELISM);
>>>
>>>     final DataStream<Long> stream = env.fromSequence(1L, 1_000L); // or 
>>> fromElements
>>>     stream = AsyncDataStream.orderedWait(stream, ...);
>>>
>>>     final List<Long> result = stream.executeAndCollect(10000);
>>>     assertThat(result, containsInAnyOrder(LongStream.rangeClosed(1, 
>>> 1000).boxed().toArray()));
>>> }
>>>
>>> See also
>>> https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceITCase.java#L62-L62
>>>
>>> On Wed, Aug 4, 2021 at 7:00 PM Debraj Manna <subharaj.ma...@gmail.com>
>>> wrote:
>>>
>>>> HI
>>>>
>>>> I am trying to use RichAsyncFunction with flink's test harness. My code
>>>> looks like below
>>>>
>>>> final MetadataEnrichment.AsyncFlowLookup fn = new 
>>>> MetadataEnrichment.AsyncFlowLookup();
>>>> final AsyncWaitOperatorFactory<GenericMetric, Tuple2<GenericMetric, 
>>>> EnrichedMinTuple>> operator = new AsyncWaitOperatorFactory<>(fn, 2000, 1, 
>>>> AsyncDataStream.OutputMode.ORDERED);
>>>> final OneInputStreamOperatorTestHarness<GenericMetric, 
>>>> Tuple2<GenericMetric, EnrichedMinTuple>> harness = new 
>>>> OneInputStreamOperatorTestHarness<>(operator);
>>>> Configuration config = new Configuration();
>>>> config.set(StoreOptions.CONFIG_STORE_TYPE, 
>>>> ConfigStoreFactory.StoreType.MEMORY.name());
>>>> harness.getExecutionConfig().setGlobalJobParameters(config);
>>>> harness.getExecutionConfig().registerKryoType(GenericMetric.class);
>>>> harness.getExecutionConfig().registerKryoType(EnrichedMinTuple.class);
>>>> harness.open();
>>>>
>>>> But harness.open() is throwing the below exception
>>>>
>>>> java.lang.IllegalStateException
>>>>    at 
>>>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:177)
>>>>    at 
>>>> org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn(StreamConfig.java:290)
>>>>    at 
>>>> org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn1(StreamConfig.java:280)
>>>>    at 
>>>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:144)
>>>>    at 
>>>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory.createStreamOperator(AsyncWaitOperatorFactory.java:72)
>>>>    at 
>>>> org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:80)
>>>>    at 
>>>> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:398)
>>>>    at 
>>>> org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.setup(OneInputStreamOperatorTestHarness.java:180)
>>>>    at 
>>>> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:385)
>>>>    at 
>>>> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:540)
>>>>    at 
>>>> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:428)
>>>>    at 
>>>> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeEmptyState(AbstractStreamOperatorTestHarness.java:436)
>>>>    at 
>>>> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:634)
>>>>
>>>> Can someone suggest what could be going wrong?
>>>>
>>>>
>>>>

Reply via email to