[ https://issues.apache.org/jira/browse/FLINK-11083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Tzu-Li (Gordon) Tai resolved FLINK-11083. ----------------------------------------- Resolution: Fixed > CRowSerializerConfigSnapshot is not instantiable > ------------------------------------------------ > > Key: FLINK-11083 > URL: https://issues.apache.org/jira/browse/FLINK-11083 > Project: Flink > Issue Type: Bug > Components: Table API & SQL, Type Serialization System > Reporter: boshu Zheng > Assignee: boshu Zheng > Priority: Major > Labels: pull-request-available > Fix For: 1.6.3, 1.7.2, 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > > An exception was encountered when restarting a job with savepoint in our > production env, > {code:java} > 2018-12-04 20:28:25,091 INFO 10595 org.apache.flink.runtime.taskmanager.Task > :917 - _OurCustomOperator_ -> select: () -> to: Tuple2 -> > Sink: Unnamed (3/20) (61c4fa7339bf152157e8e1dd0f8fd97b) switched from RUNNING > to FAILED. > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:227) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.flink.util.FlinkException: Could not restore operator > state backend for AsyncWaitOperator_90bea66de1c231edf33913ecd54406c1_(3/20) > from any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:242) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:140) > ... 5 more > Caused by: java.lang.RuntimeException: The class > 'org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot' > is not instantiable: The class has no (implicit) public nullary constructor, > i.e. a constructor without arguments. > at > org.apache.flink.util.InstantiationUtil.checkForInstantiation(InstantiationUtil.java:412) > at > org.apache.flink.util.InstantiationUtil.instantiate(InstantiationUtil.java:337) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:433) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:211) > at > org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.read(CompositeTypeSerializerConfigSnapshot.java:71) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:435) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:255) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:211) > at > org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters$CurrentReaderImpl.readStateMetaInfoSnapshot(StateMetaInfoSnapshotReadersWriters.java:218) > at > org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:105) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:505) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:64) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123) > ... 7 more > {code} > I add tests to CRowSerializerTest to make sure this is definitely a bug, > {code:java} > @Test > def testDefaultConstructor(): Unit = { > new CRowSerializer.CRowSerializerConfigSnapshot() > /////// This would fail the test > val serializerConfigSnapshotClass = > > Class.forName("org.apache.flink.table.runtime.types.CRowSerializer$CRowSerializerConfigSnapshot") > InstantiationUtil.instantiate(serializerConfigSnapshotClass) > } > @Test > def testStateRestore(): Unit = { > class IKeyedProcessFunction extends KeyedProcessFunction[Integer, > Integer, Integer] { > var state: ListState[CRow] = _ > override def open(parameters: Configuration): Unit = { > val stateDesc = new ListStateDescriptor[CRow]("CRow", > new CRowTypeInfo(new RowTypeInfo(Types.INT))) > state = getRuntimeContext.getListState(stateDesc) > } > override def processElement(value: Integer, > ctx: KeyedProcessFunction[Integer, Integer, Integer]#Context, > out: Collector[Integer]): Unit = { > state.add(new CRow(Row.of(value), true)) > } > } > val operator = new KeyedProcessOperator[Integer, Integer, Integer](new > IKeyedProcessFunction) > var testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer, > Integer, Integer]( > operator, > new KeySelector[Integer, Integer] { > override def getKey(value: Integer): Integer= -1 > }, > Types.INT, 1, 1, 0) > testHarness.setup() > testHarness.open() > testHarness.processElement(new StreamRecord[Integer](1, 1L)) > testHarness.processElement(new StreamRecord[Integer](2, 1L)) > testHarness.processElement(new StreamRecord[Integer](3, 1L)) > assertEquals(1, numKeyedStateEntries(operator)) > val snapshot = testHarness.snapshot(0L, 0L) > testHarness.close() > testHarness = new KeyedOneInputStreamOperatorTestHarness[Integer, > Integer, Integer]( > operator, > new KeySelector[Integer, Integer] { > override def getKey(value: Integer): Integer= -1 > }, > Types.INT, 1, 1, 0) > testHarness.setup() > /////// This would throw the same exception as our production app do. > testHarness.initializeState(snapshot) > testHarness.open() > assertEquals(1, numKeyedStateEntries(operator)) > testHarness.close() > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)