[ https://issues.apache.org/jira/browse/FLINK-12653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16849943#comment-16849943 ]
Maximilian Michels commented on FLINK-12653: -------------------------------------------- Some more information, these are the involved Beam serializers: {noformat} VarIntCoder, KvCoder(org.apache.beam.sdk.coders.SerializableCoder,org.apache.beam.sdk.coders.AvroCoder), WindowedValue$FullWindowedValueCoder(KvCoder(StringUtf8Coder,VarLongCoder),GlobalWindow$Coder), TimerInternals$TimerDataCoder(GlobalWindow$Coder), FlinkKeyUtils$ByteBufferCoder] {noformat} > Keyed state backend fails to restore during rescaling > ----------------------------------------------------- > > Key: FLINK-12653 > URL: https://issues.apache.org/jira/browse/FLINK-12653 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends > Affects Versions: 1.6.4, 1.7.2, 1.8.0 > Environment: Beam 2.12.0 or any other Beam version > Flink >= 1.6 > Heap/Filesystem state backend (RocksDB works fine) > Reporter: Maximilian Michels > Priority: Critical > > The Flink Runner includes a test which verifies checkpoints/savepoints work > correctly with Beam on Flink. When adding additional tests for > scaleup/scaledown [1], I came across a bug with restoring the keyed state > backend. After a fair amount of debugging Beam code and checking any > potential issues with serializers, I think this could be a Flink issue. > Steps to reproduce: > 1. {{git clone https://github.com/mxm/beam}} > 2. {{cd beam && git checkout savepoint-problem}} > 3. {{./gradlew :runners:flink:1.6:test --tests > "**.FlinkSavepointTest.testSavepointRestoreLegacy"}} > Error: > {noformat} > java.lang.Exception: Exception while creating StreamOperatorStateContext. > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:192) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:250) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for DoFnOperator_76375152c4a81d5df72cf49e32c4ecb9_(4/4) from > any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:279) > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:133) > ... 5 more > Caused by: java.lang.RuntimeException: Invalid namespace string: '' > at > org.apache.beam.runners.core.StateNamespaces.fromString(StateNamespaces.java:245) > at > org.apache.beam.runners.core.TimerInternals$TimerDataCoder.decode(TimerInternals.java:246) > at > org.apache.beam.runners.core.TimerInternals$TimerDataCoder.decode(TimerInternals.java:221) > at > org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:92) > at > org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:169) > at > org.apache.flink.streaming.api.operators.TimerSerializer.deserialize(TimerSerializer.java:45) > at > org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readKeyGroupStateData(HeapKeyedStateBackend.java:513) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateHandleStateData(HeapKeyedStateBackend.java:474) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:431) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:370) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:105) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:151) > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:123) > ... 7 more > {noformat} > It is possible to change the {{maxParallelism}} to other values. The > following lead to failure: > {noformat} > options.setMaxParallelism(128); // default value > options.setMaxParallelism(64); > options.setMaxParallelism(118); > {noformat} > The following work fine: > {noformat} > options.setMaxParallelism(110); > options.setMaxParallelism(63); > options.setMaxParallelism(24); > {noformat} > [1] > https://github.com/apache/beam/commit/52d7291144f64eaa417862558d71a443fae3d690 > Everything works fine with RocksDB. -- This message was sent by Atlassian JIRA (v7.6.3#76005)