HI I am trying to use RichAsyncFunction with flink's test harness. My code looks like below
final MetadataEnrichment.AsyncFlowLookup fn = new MetadataEnrichment.AsyncFlowLookup(); final AsyncWaitOperatorFactory<GenericMetric, Tuple2<GenericMetric, EnrichedMinTuple>> operator = new AsyncWaitOperatorFactory<>(fn, 2000, 1, AsyncDataStream.OutputMode.ORDERED); final OneInputStreamOperatorTestHarness<GenericMetric, Tuple2<GenericMetric, EnrichedMinTuple>> harness = new OneInputStreamOperatorTestHarness<>(operator); Configuration config = new Configuration(); config.set(StoreOptions.CONFIG_STORE_TYPE, ConfigStoreFactory.StoreType.MEMORY.name()); harness.getExecutionConfig().setGlobalJobParameters(config); harness.getExecutionConfig().registerKryoType(GenericMetric.class); harness.getExecutionConfig().registerKryoType(EnrichedMinTuple.class); harness.open(); But harness.open() is throwing the below exception java.lang.IllegalStateException at org.apache.flink.util.Preconditions.checkState(Preconditions.java:177) at org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn(StreamConfig.java:290) at org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn1(StreamConfig.java:280) at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:144) at org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory.createStreamOperator(AsyncWaitOperatorFactory.java:72) at org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:80) at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:398) at org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.setup(OneInputStreamOperatorTestHarness.java:180) at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:385) at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:540) at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:428) at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeEmptyState(AbstractStreamOperatorTestHarness.java:436) at org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:634) Can someone suggest what could be going wrong?