[ https://issues.apache.org/jira/browse/FLINK-6018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15932357#comment-15932357 ]
ASF GitHub Bot commented on FLINK-6018: --------------------------------------- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3562#discussion_r106854612 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java --- @@ -162,22 +176,422 @@ protected CheckpointStreamFactory createStreamFactory() throws Exception { } @Test + public void testBackendUsesRegisteredKryoDefaultSerializer() throws Exception { + CheckpointStreamFactory streamFactory = createStreamFactory(); + Environment env = new DummyEnvironment("test", 1, 0); + AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env); + + // cast because our test serializer is not typed to TestPojo + env.getExecutionConfig() + .addDefaultKryoSerializer(TestPojo.class, (Class) ExceptionThrowingTestSerializer.class); + + TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class); + + // make sure that we are in fact using the KryoSerializer + assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer); + + pojoType.createSerializer(env.getExecutionConfig()); --- End diff -- Must have missed that when I cleaned up the code after experimenting ... 😓 Fixing. > Properly initialise StateDescriptor in > AbstractStateBackend.getPartitionedState() > --------------------------------------------------------------------------------- > > Key: FLINK-6018 > URL: https://issues.apache.org/jira/browse/FLINK-6018 > Project: Flink > Issue Type: Improvement > Components: DataStream API, State Backends, Checkpointing > Reporter: sunjincheng > Assignee: sunjincheng > Fix For: 1.3.0 > > > The code snippet currently in the `AbstractKeyedStateBackend # > getPartitionedState` method, as follows: > {code} > line 352: // TODO: This is wrong, it should throw an exception that the > initialization has not properly happened > line 353: if (!stateDescriptor.isSerializerInitialized()) { > line 354: stateDescriptor.initializeSerializerUnlessSet(new > ExecutionConfig()); > line 354 } > {code} > Method `isSerializerInitialized`: > {code} > public boolean isSerializerInitialized() { > return serializer != null; > } > {code} > Method `initializeSerializerUnlessSet`: > {code} > public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) { > if (serializer == null) { > if (typeInfo != null) { > serializer = > typeInfo.createSerializer(executionConfig); > } else { > throw new IllegalStateException( > "Cannot initialize serializer > after TypeInformation was dropped during serialization"); > } > } > } > {code} > that is, in the `initializeSerializerUnlessSet` method, The `serializer` has > been checked by `serializer == null`.So I hope this code has a little > improvement to the following: > approach 1: > According to the `TODO` information we throw an exception > {code} > if (!stateDescriptor.isSerializerInitialized()) { > throw new IllegalStateException("The serializer of the > descriptor has not been initialized!"); > } > {code} > approach 2: > Try to initialize and remove `if (!stateDescriptor.isSerializerInitialized()) > {` logic. > {code} > stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig()); > {code} > Meanwhile, If we use the approach 2, I suggest that > `AbstractKeyedStateBackend` add a `private final ExecutionConfig > executionConfig` property. then we can change the code like this: > {code} > stateDescriptor.initializeSerializerUnlessSet(executionConfig); > {code} > Are the above suggestions reasonable for you? > Welcome anybody's feedback and corrections. -- This message was sent by Atlassian JIRA (v6.3.15#6346)