I would strongly recommend not using the harness for testing user functions.
Instead I'd just create an ITCase like this: @ClassRule public static final MiniClusterWithClientResource MINI_CLUSTER = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setNumberTaskManagers(1) .setNumberSlotsPerTaskManager(PARALLELISM) .build()); // ------------------------------------------------------------------------ @Test public void testAsyncFunction() throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(PARALLELISM); final DataStream<Long> stream = env.fromSequence(1L, 1_000L); // or fromElements stream = AsyncDataStream.orderedWait(stream, ...); final List<Long> result = stream.executeAndCollect(10000); assertThat(result, containsInAnyOrder(LongStream.rangeClosed(1, 1000).boxed().toArray())); } See also https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/api/connector/source/lib/NumberSequenceSourceITCase.java#L62-L62 On Wed, Aug 4, 2021 at 7:00 PM Debraj Manna <subharaj.ma...@gmail.com> wrote: > HI > > I am trying to use RichAsyncFunction with flink's test harness. My code > looks like below > > final MetadataEnrichment.AsyncFlowLookup fn = new > MetadataEnrichment.AsyncFlowLookup(); > final AsyncWaitOperatorFactory<GenericMetric, Tuple2<GenericMetric, > EnrichedMinTuple>> operator = new AsyncWaitOperatorFactory<>(fn, 2000, 1, > AsyncDataStream.OutputMode.ORDERED); > final OneInputStreamOperatorTestHarness<GenericMetric, Tuple2<GenericMetric, > EnrichedMinTuple>> harness = new > OneInputStreamOperatorTestHarness<>(operator); > Configuration config = new Configuration(); > config.set(StoreOptions.CONFIG_STORE_TYPE, > ConfigStoreFactory.StoreType.MEMORY.name()); > harness.getExecutionConfig().setGlobalJobParameters(config); > harness.getExecutionConfig().registerKryoType(GenericMetric.class); > harness.getExecutionConfig().registerKryoType(EnrichedMinTuple.class); > harness.open(); > > But harness.open() is throwing the below exception > > java.lang.IllegalStateException > at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:177) > at > org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn(StreamConfig.java:290) > at > org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn1(StreamConfig.java:280) > at > org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:144) > at > org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory.createStreamOperator(AsyncWaitOperatorFactory.java:72) > at > org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:80) > at > org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:398) > at > org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.setup(OneInputStreamOperatorTestHarness.java:180) > at > org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:385) > at > org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:540) > at > org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:428) > at > org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeEmptyState(AbstractStreamOperatorTestHarness.java:436) > at > org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:634) > > Can someone suggest what could be going wrong? > > >