[ https://issues.apache.org/jira/browse/FLINK-6018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15930051#comment-15930051 ]
ASF GitHub Bot commented on FLINK-6018: --------------------------------------- GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/3562 [FLINK-6018] Add tests for KryoSerializer restore with registered types This is the result of the discussion in #3534. I changed `TypeSerializer.isCompatibleWith()` to `TypeSerializer.canRestoreFrom` because the relation is not necessarily symmetric. I added a `KryoSerializer.canRestoreFrom()` that only allows restoring when we previously didn't have registered types/serializers. I added a whole bunch of tests in `StateBackendTestBase`, this should be review most thoroughly. R: @StephanEwen and @tzulitai because this probably is interesting with the serialiser update story that you're working on. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jira-6018-state-init-fixups Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3562.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3562 ---- commit f0c3af53d24a3eac914cf1ceb3b1761a40553dfe Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Date: 2017-03-16T14:17:05Z [FLINK-6018] Add tests for KryoSerializer restore with registered types commit b90cf5cad5176d8edcbd189a9b65cc4999cddd53 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Date: 2017-03-17T09:56:13Z [FLINK-6018] Rename isCompatibleWith() to canRestoreFrom() This make the method asymetric because in the case of KryoSerializer we can restore from state that was stored using no registed types/serializers while the other way around is not possible. ---- > Properly initialise StateDescriptor in > AbstractStateBackend.getPartitionedState() > --------------------------------------------------------------------------------- > > Key: FLINK-6018 > URL: https://issues.apache.org/jira/browse/FLINK-6018 > Project: Flink > Issue Type: Improvement > Components: DataStream API, State Backends, Checkpointing > Reporter: sunjincheng > Assignee: sunjincheng > Fix For: 1.3.0 > > > The code snippet currently in the `AbstractKeyedStateBackend # > getPartitionedState` method, as follows: > {code} > line 352: // TODO: This is wrong, it should throw an exception that the > initialization has not properly happened > line 353: if (!stateDescriptor.isSerializerInitialized()) { > line 354: stateDescriptor.initializeSerializerUnlessSet(new > ExecutionConfig()); > line 354 } > {code} > Method `isSerializerInitialized`: > {code} > public boolean isSerializerInitialized() { > return serializer != null; > } > {code} > Method `initializeSerializerUnlessSet`: > {code} > public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) { > if (serializer == null) { > if (typeInfo != null) { > serializer = > typeInfo.createSerializer(executionConfig); > } else { > throw new IllegalStateException( > "Cannot initialize serializer > after TypeInformation was dropped during serialization"); > } > } > } > {code} > that is, in the `initializeSerializerUnlessSet` method, The `serializer` has > been checked by `serializer == null`.So I hope this code has a little > improvement to the following: > approach 1: > According to the `TODO` information we throw an exception > {code} > if (!stateDescriptor.isSerializerInitialized()) { > throw new IllegalStateException("The serializer of the > descriptor has not been initialized!"); > } > {code} > approach 2: > Try to initialize and remove `if (!stateDescriptor.isSerializerInitialized()) > {` logic. > {code} > stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig()); > {code} > Meanwhile, If we use the approach 2, I suggest that > `AbstractKeyedStateBackend` add a `private final ExecutionConfig > executionConfig` property. then we can change the code like this: > {code} > stateDescriptor.initializeSerializerUnlessSet(executionConfig); > {code} > Are the above suggestions reasonable for you? > Welcome anybody's feedback and corrections. -- This message was sent by Atlassian JIRA (v6.3.15#6346)