HI

I am trying to use RichAsyncFunction with flink's test harness. My code
looks like below

final MetadataEnrichment.AsyncFlowLookup fn = new
MetadataEnrichment.AsyncFlowLookup();
final AsyncWaitOperatorFactory<GenericMetric, Tuple2<GenericMetric,
EnrichedMinTuple>> operator = new AsyncWaitOperatorFactory<>(fn, 2000,
1, AsyncDataStream.OutputMode.ORDERED);
final OneInputStreamOperatorTestHarness<GenericMetric,
Tuple2<GenericMetric, EnrichedMinTuple>> harness = new
OneInputStreamOperatorTestHarness<>(operator);
Configuration config = new Configuration();
config.set(StoreOptions.CONFIG_STORE_TYPE,
ConfigStoreFactory.StoreType.MEMORY.name());
harness.getExecutionConfig().setGlobalJobParameters(config);
harness.getExecutionConfig().registerKryoType(GenericMetric.class);
harness.getExecutionConfig().registerKryoType(EnrichedMinTuple.class);
harness.open();

But harness.open() is throwing the below exception

java.lang.IllegalStateException
        at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:177)
        at 
org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn(StreamConfig.java:290)
        at 
org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerIn1(StreamConfig.java:280)
        at 
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.setup(AsyncWaitOperator.java:144)
        at 
org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory.createStreamOperator(AsyncWaitOperatorFactory.java:72)
        at 
org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:80)
        at 
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:398)
        at 
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness.setup(OneInputStreamOperatorTestHarness.java:180)
        at 
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.setup(AbstractStreamOperatorTestHarness.java:385)
        at 
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:540)
        at 
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeState(AbstractStreamOperatorTestHarness.java:428)
        at 
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.initializeEmptyState(AbstractStreamOperatorTestHarness.java:436)
        at 
org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness.open(AbstractStreamOperatorTestHarness.java:634)

Can someone suggest what could be going wrong?

Reply via email to