Sry for wrong post. > This can probably be confirmed by looking at the exception stack trace. > Can you post a full copy of that? I missed the history jobs, but I think u r right. When I debug the program to find reason, came into these code snippet.
``` TypeSerializerSchemaCompatibility<T> result = previousSerializerSnapshot.resolveSchemaCompatibility(registeredSerializer); if (result.isIncompatible()) { invalidateCurrentSchemaSerializerAccess(); } ``` I remember one is `org.apache.flink.api.common.typeutils.base.LongSerializer$LongSerializerSnapshot`, another is just `Kryo`. > Can you open a JIRA for this? I think it'll be a reasonable extension to > the API. I'll do that, 3q. > I'm not sure what you mean here. Where is this keyBy happening? In the > Scala DataStream job, or the State Processor API? In the Scala DataStream job, same with the examples of link-1 in the origial post。 I change keyBy(_._1) to keyBy(0), then the program will throw an exception. The full copy from job Exceptions: ``` java.io.IOException: Failed to restore state backend at org.apache.flink.state.api.input.KeyedStateInputFormat.getStreamOperatorStateContext(KeyedStateInputFormat.java:231) at org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:177) at org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:79) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195) at org.apache.flink.state.api.input.KeyedStateInputFormat.getStreamOperatorStateContext(KeyedStateInputFormat.java:223) ... 6 more Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for 8f89af64b0cf95ff20b8dda264c66f81_8f89af64b0cf95ff20b8dda264c66f81_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135) ... 7 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore heap backend at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:116) at org.apache.flink.runtime.state.memory.MemoryStateBackend.createKeyedStateBackend(MemoryStateBackend.java:347) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 9 more Caused by: org.apache.flink.util.StateMigrationException: The new key serializer must be compatible. at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:142) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114) ... 13 more ``` Maybe it's explainable with 「inferred and serialized as their Java counterparts」, not sure, I am a triple beginner with Java & Scala & Flink. Thanks a lot. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/