[ https://issues.apache.org/jira/browse/FLINK-6018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15931129#comment-15931129 ]
ASF GitHub Bot commented on FLINK-6018: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3562#discussion_r106776658 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java --- @@ -162,22 +176,422 @@ protected CheckpointStreamFactory createStreamFactory() throws Exception { } @Test + public void testBackendUsesRegisteredKryoDefaultSerializer() throws Exception { + CheckpointStreamFactory streamFactory = createStreamFactory(); + Environment env = new DummyEnvironment("test", 1, 0); + AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env); + + // cast because our test serializer is not typed to TestPojo + env.getExecutionConfig() + .addDefaultKryoSerializer(TestPojo.class, (Class) ExceptionThrowingTestSerializer.class); + + TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class); + + // make sure that we are in fact using the KryoSerializer + assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer); + + pojoType.createSerializer(env.getExecutionConfig()); + + ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType); + + ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + + // backends that eagerly serializer will fail when updating state, others will + // fail only when performing the snapshot + int numExceptions = 0; + + backend.setCurrentKey(1); + + try { + state.update(new TestPojo("u1", 1)); + } catch (ExpectedKryoTestException e) { + numExceptions++; + } catch (RuntimeException e) { + if (e.getCause() instanceof ExpectedKryoTestException) { + numExceptions++; + } else { + throw e; + } + } + + try { + runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + } catch (ExpectedKryoTestException e) { + numExceptions++; + } + + assertEquals("Didn't see the expected Kryo exception.", 1, numExceptions); + } + + @Test + public void testBackendUsesRegisteredKryoDefaultSerializerUsingGetOrCreate() throws Exception { + CheckpointStreamFactory streamFactory = createStreamFactory(); + Environment env = new DummyEnvironment("test", 1, 0); + AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env); + + // cast because our test serializer is not typed to TestPojo + env.getExecutionConfig() + .addDefaultKryoSerializer(TestPojo.class, (Class) ExceptionThrowingTestSerializer.class); + + TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class); + + // make sure that we are in fact using the KryoSerializer + assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer); + + pojoType.createSerializer(env.getExecutionConfig()); + + ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType); + + ValueState<TestPojo> state = backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, kvId); + assert state instanceof InternalValueState; + ((InternalValueState) state).setCurrentNamespace(VoidNamespace.INSTANCE); + + // backends that eagerly serializer will fail when updating state, others will + // fail only when performing the snapshot + int numExceptions = 0; + + backend.setCurrentKey(1); + + try { + state.update(new TestPojo("u1", 1)); + } catch (ExpectedKryoTestException e) { + numExceptions++; + } catch (RuntimeException e) { + if (e.getCause() instanceof ExpectedKryoTestException) { + numExceptions++; + } else { + throw e; + } + } + + try { + runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + } catch (ExpectedKryoTestException e) { + numExceptions++; + } + + assertEquals("Didn't see the expected Kryo exception.", 1, numExceptions); + } + + @Test + public void testBackendUsesRegisteredKryoSerializer() throws Exception { + CheckpointStreamFactory streamFactory = createStreamFactory(); + Environment env = new DummyEnvironment("test", 1, 0); + AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env); + + env.getExecutionConfig() + .registerTypeWithKryoSerializer(TestPojo.class, ExceptionThrowingTestSerializer.class); + + TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class); + + // make sure that we are in fact using the KryoSerializer + assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer); + + pojoType.createSerializer(env.getExecutionConfig()); + + ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType); + + ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + + // backends that eagerly serializer will fail when updating state, others will + // fail only when performing the snapshot + int numExceptions = 0; + + backend.setCurrentKey(1); + + try { + state.update(new TestPojo("u1", 1)); + } catch (ExpectedKryoTestException e) { + numExceptions++; + } catch (RuntimeException e) { + if (e.getCause() instanceof ExpectedKryoTestException) { + numExceptions++; + } else { + throw e; + } + } + + try { + runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + } catch (ExpectedKryoTestException e) { + numExceptions++; + } + + assertEquals("Didn't see the expected Kryo exception.", 1, numExceptions); + } + + @Test + public void testBackendUsesRegisteredKryoSerializerUsingGetOrCreate() throws Exception { + CheckpointStreamFactory streamFactory = createStreamFactory(); + Environment env = new DummyEnvironment("test", 1, 0); + AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env); + + env.getExecutionConfig() + .registerTypeWithKryoSerializer(TestPojo.class, ExceptionThrowingTestSerializer.class); + + TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class); + + // make sure that we are in fact using the KryoSerializer + assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer); + + pojoType.createSerializer(env.getExecutionConfig()); + + ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType); + + ValueState<TestPojo> state = backend.getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, kvId); + assert state instanceof InternalValueState; + ((InternalValueState) state).setCurrentNamespace(VoidNamespace.INSTANCE); + + // backends that eagerly serializer will fail when updating state, others will + // fail only when performing the snapshot + int numExceptions = 0; + + backend.setCurrentKey(1); + + try { + state.update(new TestPojo("u1", 1)); + } catch (ExpectedKryoTestException e) { + numExceptions++; + } catch (RuntimeException e) { + if (e.getCause() instanceof ExpectedKryoTestException) { + numExceptions++; + } else { + throw e; + } + } + + try { + runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forFullCheckpoint())); + } catch (ExpectedKryoTestException e) { + numExceptions++; + } + + assertEquals("Didn't see the expected Kryo exception.", 1, numExceptions); + } + + + /** + * Verify that we can restore a snapshot that was done with without registered types + * after registering types. + */ + @Test + public void testKryoRegisteringRestoreResilienceWithRegisteredType() throws Exception { + CheckpointStreamFactory streamFactory = createStreamFactory(); + Environment env = new DummyEnvironment("test", 1, 0); + AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env); + + TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class); + + // make sure that we are in fact using the KryoSerializer + assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer); + + pojoType.createSerializer(env.getExecutionConfig()); + + ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType); + + ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + + // make some more modifications + backend.setCurrentKey(1); + state.update(new TestPojo("u1", 1)); + + backend.setCurrentKey(2); + state.update(new TestPojo("u2", 2)); + + KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot( + 682375462378L, + 2, + streamFactory, + CheckpointOptions.forFullCheckpoint())); + + backend.dispose(); + + env.getExecutionConfig().registerKryoType(TestPojo.class); + + backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env); + + snapshot.discardState(); + + state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); + backend.setCurrentKey(1); + assertEquals(state.value(), new TestPojo("u1", 1)); + + backend.setCurrentKey(2); + assertEquals(state.value(), new TestPojo("u2", 2)); + + backend.dispose(); + } + + /** + * Verify that we can restore a snapshot that was done with without registered default --- End diff -- "with without" redundancy --> "without", I think ;-)? > Properly initialise StateDescriptor in > AbstractStateBackend.getPartitionedState() > --------------------------------------------------------------------------------- > > Key: FLINK-6018 > URL: https://issues.apache.org/jira/browse/FLINK-6018 > Project: Flink > Issue Type: Improvement > Components: DataStream API, State Backends, Checkpointing > Reporter: sunjincheng > Assignee: sunjincheng > Fix For: 1.3.0 > > > The code snippet currently in the `AbstractKeyedStateBackend # > getPartitionedState` method, as follows: > {code} > line 352: // TODO: This is wrong, it should throw an exception that the > initialization has not properly happened > line 353: if (!stateDescriptor.isSerializerInitialized()) { > line 354: stateDescriptor.initializeSerializerUnlessSet(new > ExecutionConfig()); > line 354 } > {code} > Method `isSerializerInitialized`: > {code} > public boolean isSerializerInitialized() { > return serializer != null; > } > {code} > Method `initializeSerializerUnlessSet`: > {code} > public void initializeSerializerUnlessSet(ExecutionConfig executionConfig) { > if (serializer == null) { > if (typeInfo != null) { > serializer = > typeInfo.createSerializer(executionConfig); > } else { > throw new IllegalStateException( > "Cannot initialize serializer > after TypeInformation was dropped during serialization"); > } > } > } > {code} > that is, in the `initializeSerializerUnlessSet` method, The `serializer` has > been checked by `serializer == null`.So I hope this code has a little > improvement to the following: > approach 1: > According to the `TODO` information we throw an exception > {code} > if (!stateDescriptor.isSerializerInitialized()) { > throw new IllegalStateException("The serializer of the > descriptor has not been initialized!"); > } > {code} > approach 2: > Try to initialize and remove `if (!stateDescriptor.isSerializerInitialized()) > {` logic. > {code} > stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig()); > {code} > Meanwhile, If we use the approach 2, I suggest that > `AbstractKeyedStateBackend` add a `private final ExecutionConfig > executionConfig` property. then we can change the code like this: > {code} > stateDescriptor.initializeSerializerUnlessSet(executionConfig); > {code} > Are the above suggestions reasonable for you? > Welcome anybody's feedback and corrections. -- This message was sent by Atlassian JIRA (v6.3.15#6346)