Hi Jacob, the `ListState` abstraction is a state which we call partitioned/key-value state. As such, it is only possible to use it with a keyed stream. This means that you have to call `keyBy` after the `connect` API call.
Cheers, Till On Wed, Jun 22, 2016 at 9:17 PM, Jacob Bay Larsen <m...@jacobbay.dk> wrote: > Hi, > > I am trying to use a ListState in a RichCoFlatMapFunction but when > calling: getRuntimeContext().getListState(descriptor) in the open-function > i am getting a "State key serializer has not .." exception. I am not sure > what i can do to avoid this exception - Are any of you able to provide some > help ? > > Best regards > Jacob > > > private ListState<Tuple2<String, Integer>> deltaPositions; > > @Override > public void open(org.apache.flink.configuration.Configuration > parameters) throws Exception { > // Create state variable > ListStateDescriptor<Tuple2<String, Integer>> descriptor = > new ListStateDescriptor<>( > "deltaPositions", // the state name > TypeInformation.of(new TypeHint<Tuple2<String, > Integer>>() { > })); > > deltaPositions = getRuntimeContext().getListState(descriptor); > }; > > > > 2016-06-22 20:41:38,813 INFO org.apache.flink.runtime.taskmanager.Task > - Stream of Items with collection of meadian times (1/1) > switched to FAILED with exception. > java.lang.RuntimeException: Error while getting state > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:131) > at > crisplant.bigdata.dataanalysis.baggagemonitor.streaming.liveitemeventsstoring.LiveItemEventsStoring$MergeMedianTimesFlatMapFunction.open(LiveItemEventsStoring.java:83) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91) > at > org.apache.flink.streaming.api.operators.co.CoStreamFlatMap.open(CoStreamFlatMap.java:49) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:317) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:215) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.Exception: State key serializer has not been > configured in the config. This operation cannot use partitioned state. > at > org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedState(AbstractStateBackend.java:199) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:260) > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:129) > ... 8 more > 2016-06-22 20:41:38,815 INFO org.apache.flink.runtime.taskmanager.Task > - Freeing ta > > -- > Jacob Bay Larsen > > Phone: +45 6133 1108 > E-mail: m...@jacobbay.dk >