Hello Buvana, Can you share a bit more details on your operator and how you are using it? For example, are you using keyBy before using you custom operator?
Thanks a lot, Kostas > On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US) > <buvana.rama...@nokia-bell-labs.com> wrote: > > Hello, > > I am utilizing the code snippet in: > https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html > > <https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html> > and particularly ‘open’ function in my code: > @Override > > public void open(Configuration config) { > > ValueStateDescriptor<Tuple2<Long, Long>> descriptor = > > new ValueStateDescriptor<>( > > "average", // the state name > > TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() > {}), // type information > > Tuple2.of(0L, 0L)); // default value of the state, if > nothing was set > > sum = getRuntimeContext().getState(descriptor); > > } > > > When I run, I get the following error: > Caused by: java.lang.RuntimeException: Error while getting state > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:120) > at wikiedits.stateful$Rec2Tuple2.open(stateful.java:103) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91) > at > org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:41) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:314) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:214) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.Exception: State key serializer has not been configured > in the config. This operation cannot use partitioned state. > at > org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedState(AbstractStateBackend.java:199) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:260) > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:118) > ... 8 more > > Where do I define the key & value serializer for state? > > Thanks, > Buvana