Ying Z created FLINK-15719: ------------------------------ Summary: Exceptions when using scala types directly with the State Process API Key: FLINK-15719 URL: https://issues.apache.org/jira/browse/FLINK-15719 Project: Flink Issue Type: Bug Components: API / State Processor Affects Versions: 1.9.1 Reporter: Ying Z
I followed these steps to generate and read states: # implements the example[1] `CountWindowAverage` in Scala(exactly same), and run jobA => that makes good. # execute `flink cancel -s ${JobID}` => savepoints was generated as expected. # implements the example[2] `StatefulFunctionWithTime` in Scala(code below), and run jobB => failed, exceptions shows that "Caused by: org.apache.flink.util.StateMigrationException: The new key serializer must be compatible." ReaderFunction code as below: {code:java} // code placeholder class ReaderFunction extends KeyedStateReaderFunction[Long, (Long, Long)] { var countState: ValueState[(Long, Long)] = _ override def open(parameters: Configuration): Unit = { val stateDescriptor = new ValueStateDescriptor("average", createTypeInformation[(Long, Long)]) countState = getRuntimeContext().getState(stateDescriptor) } override def readKey(key: Long, ctx: KeyedStateReaderFunction.Context, out: Collector[(Long, Long)]): Unit = { out.collect(countState.value()) } } {code} 1: [https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state] 2: [https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html#keyed-state] -- This message was sent by Atlassian Jira (v8.3.4#803005)