Thanks it worked. Can you please let me know what are you referring to as user function and why it is not recommended to not use harness with it?
On Wed, 4 Aug 2021, 22:43 Arvid Heise, <ar...@apache.org> wrote: > I would strongly recommend not using the harness for testing user > functions. > > Instead I'd just create an ITCase like this: > > @ClassRule > public static final MiniClusterWithClientResource MINI_CLUSTER = > new MiniClusterWithClientResource( > new MiniClusterResourceConfiguration.Builder() > .setNumberTaskManagers(1) > .setNumberSlotsPerTaskManager(PARALLELISM) > .build()); > > // ------------------------------------------------------------------------ > > @Test > public void testAsyncFunction() throws Exception { > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(PARALLELISM); > > final DataStream<Long> stream = env.fromSequence(1L, 1_000L); // or > fromElements > stream = AsyncDataStream.orderedWait(stream, ...); > > final List<Long> result = stream.executeAndCollect(10000); > assertThat(result, containsInAnyOrder(LongStream.rangeClosed(1, > 1000).boxed().toArray())); > } > > See also > https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceITCase.java#L62-L62 > > On Wed, Aug 4, 2021 at 7:00 PM Debraj Manna <subharaj.ma...@gmail.com> > wrote: > >> HI >> >> I am trying to use RichAsyncFunction with flink's test harness. My code >> looks like below >> >> final MetadataEnrichment.AsyncFlowLookup fn = new >> MetadataEnrichment.AsyncFlowLookup(); >> final AsyncWaitOperatorFactory<GenericMetric, Tuple2<GenericMetric, >> EnrichedMinTuple>> operator = new AsyncWaitOperatorFactory<>(fn, 2000, 1, >> AsyncDataStream.OutputMode.ORDERED); >> final OneInputStreamOperatorTestHarness<GenericMetric, Tuple2<GenericMetric, >> EnrichedMinTuple>> harness = new >> OneInputStreamOperatorTestHarness<>(operator); >> Configuration config = new Configuration(); >> config.set(StoreOptions.CONFIG_STORE_TYPE, >> ConfigStoreFactory.StoreType.MEMORY.name()); >> harness.getExecutionConfig().setGlobalJobParameters(config); >> harness.getExecutionConfig().registerKryoType(GenericMetric.class); >> harness.getExecutionConfig().registerKryoType(EnrichedMinTuple.class); >> harness.open(); >> >> But harness.open() is throwing the below exception >> >> java.lang.IllegalStateException >> at >> org.apache.flink.util.Preconditions.checkState(Preconditions.java:177) >> at >> org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn(StreamConfig.java:290) >> at >> org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn1(StreamConfig.java:280) >> at >> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:144) >> at >> org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory.createStreamOperator(AsyncWaitOperatorFactory.java:72) >> at >> org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:80) >> at >> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:398) >> at >> org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.setup(OneInputStreamOperatorTestHarness.java:180) >> at >> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:385) >> at >> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:540) >> at >> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:428) >> at >> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeEmptyState(AbstractStreamOperatorTestHarness.java:436) >> at >> org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:634) >> >> Can someone suggest what could be going wrong? >> >> >>