[ https://issues.apache.org/jira/browse/FLINK-3755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15449027#comment-15449027 ]
ASF GitHub Bot commented on FLINK-3755: --------------------------------------- GitHub user StefanRRichter opened a pull request: https://github.com/apache/flink/pull/2440 Keyed backend refactor This pull request is a followup to the preliminary pull request #2376 and addresses all issues subsumed under [FLINK-3755] In addition to the changes from PR #2376, this PR adds: # 1) Refactoring of Key Value State Before, `AbstractStateBackend` was responsible both for checkpointing to streams and for keyed state. Now, this functionality is split up into `CheckpointStreamFactory` and `KeyedStateBackend`. The former is responsible for providing streams for writing checkpoints while the latter is only responsible for keeping keyed state. A `KeyedStateBackend` can write its content to a checkpoint. For this it uses a `CheckpointStreamFactory`. # 2) Introduction of key-group aware `KeyedStateBackend`s ## a) HeapKeyedStateBackend `HeapKeyedStateBackend` subsumes the keyed state part of both `MemStateBackend` and `FsStateBackend` and `MemoryStateBackend`. The only difference between the two now is that one produces a `CheckpointStreamFactory` that produces streams for writing to files while the other provides streams that write to in-memory byte arrays. Also, this introduces another layer of lookup in the `HeapKeyedStateBackend` to accomodate storing state per key group. Upon checkpointing the data is written out in a format that is very similar to the new RocksDB backend. We should make these 100 % compatible as a follow up. ## b) RocksDBKeyedStateBackend The RocksDB state backend is now also key-group aware. This happens through an additional 1-2 byte key-group prefix that is added to each key. On snapshots, the key value states for different key-groups are combined through `RocksDBMergeIterator`. All snapshots from this backend are now running fully asynchronous using an implimentation of `AbstractAsyncIOCallable`. # Refactoring of asynchrounous snapshot facilities Snapshots are now driven by `AsyncCheckpointRunnable`s in `StreamTask`, which are executed through a threadpool. `AsyncCheckpointRunnable` is created with a `RunnableFuture<KeyGroupsStateHandle>` that is obtained from `KeyedStateBackend` through ``` public abstract RunnableFuture<KeyGroupsStateHandle> snapshot( long checkpointId, long timestamp, CheckpointStreamFactory streamFactory) throws Exception; ``` # Review comments on #2376 From the comments on this PR, which can be found under PR #2376, we introduced the following changes: ## In comparison to PR #2376, we dropped the KeyGroupAssigner interface in favor of static methods. The reason for this is that the code relies on a consistent key group assignment in several places. ## By default, the max parallelism is chosen as ``` Math.min(128 , roundToNextPowerOfTwo(parallelism + parallelism / 2)) ``` ## No blocking on the termination of async snapshot threads. ## Reduced logging. # Limitiations Currently, queryable state is not key-group aware and `QueryableStateITCase` is ignored. This will be solved in a folloup work. You can merge this pull request into a Git repository by running: $ git pull https://github.com/StefanRRichter/flink keyed-backend-refactor Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2440.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2440 ---- commit 40484f3a66558b40bcf5bcaae3e3dba28d73f8dd Author: Till Rohrmann <trohrm...@apache.org> Date: 2016-07-28T13:08:24Z [FLINK-4380] Introduce KeyGroupAssigner and Max-Parallelism Parameter This introduces a new KeySelector that assigns keys to key groups and also adds the max parallelism parameter throughout all API levels. This also adds tests for the newly introduced features. commit 3609f29076dd504ab9790d874fe3e3d4828f6b77 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Date: 2016-08-10T16:44:50Z [FLINK-3761] Refactor State Backends/Make Keyed State Key-Group Aware The biggest change in this is that functionality that used to be in AbstractStateBackend is now moved to CheckpointStreamFactory and KeyedStateBackend. The former is responsible for providing streams that can be used to checkpoint data while the latter is responsible for keeping keyed state. A keyed backend can checkpoint the state that it keeps by using a CheckpointStreamFactory. This also refactors how asynchronous keyed state snapshots work. They are not implemented using a Future/RunnableFuture. Also, this changes the keyed state backends to be key-group aware and to snapshot the state in key-groups with an index for restoring. commit 9d675ca0707b31923108ea78908b63fc46798c97 Author: Stefan Richter <s.rich...@data-artisans.com> Date: 2016-08-11T09:59:07Z [FLINK-4381] Refactor State to Prepare For Key-Group State Backends commit d99b75c70bb15fe6ee5c06968d92b075e9b6c772 Author: Till Rohrmann <trohrm...@apache.org> Date: 2016-08-11T10:14:18Z [FLINK-4380] Add tests for new Key-Group/Max-Parallelism This tests the rescaling features in CheckpointCoordinator and SavepointCoordinator. commit 1d514b8d5a8db663e1be293b9653c42d45787e36 Author: Stefan Richter <s.rich...@data-artisans.com> Date: 2016-08-17T12:50:18Z [FLINK-3761] Refactor RocksDB Backend/Make Key-Group Aware This change makes the RocksDB backend key-group aware by building on the changes in the previous commit. commit 4f791d17f727a2fead12224652747b73d98ffaa1 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Date: 2016-08-25T12:09:12Z Ignore QueryableStateITCase This doesn't work yet because the state query machinery is not yet properly aware of key-grouped state. commit f47af43bd7b5e54086ab2ce87b9471cb99a38421 Author: Stefan Richter <s.rich...@data-artisans.com> Date: 2016-08-29T08:02:31Z Introduced timout of thread pool for testing. Removed legacy code path from HashKeyGroupAssigner commit 52061346e39cbc591e79bad2b6a4d9ce272bb558 Author: Stefan Richter <s.rich...@data-artisans.com> Date: 2016-08-29T09:53:22Z Stephan's feedback: remove KeyGroupAssigner in favor of a static method and have default max. parallelism at 128 commit d77475a69dfdfae092107562af5c96af8de370cb Author: Stefan Richter <s.rich...@data-artisans.com> Date: 2016-08-29T14:10:15Z Improved test stability commit 1252a246bee6bc181b2def83966d121b1ca5688e Author: Stefan Richter <s.rich...@data-artisans.com> Date: 2016-08-30T10:26:48Z Improved HeapKeyedStateBackend for more compact snapshots. commit 3f282619c6dbf48a246bf848be2e92a4cad8bd2d Author: Stefan Richter <s.rich...@data-artisans.com> Date: 2016-08-30T11:45:40Z Expose max parallelism through StreamExecutionEnvironment commit 1fa0e02f33722029159b39625127758fcb3623d3 Author: Stefan Richter <s.rich...@data-artisans.com> Date: 2016-08-30T12:47:34Z test fix commit 257992bf3ead38d12775b077478b84f8690c7fb9 Author: Stefan Richter <s.rich...@data-artisans.com> Date: 2016-08-30T12:59:12Z Extended EventTimeWindowCheckpointITCase to test the boundaries of maxParallelism. commit ea26c0f2e9f1687b5ae89a1acaeaea681c19bd80 Author: Stefan Richter <s.rich...@data-artisans.com> Date: 2016-08-30T13:27:25Z Reset WindowCheckpointingITCase to (sometimes) failing version. ---- > Introduce key groups for key-value state to support dynamic scaling > ------------------------------------------------------------------- > > Key: FLINK-3755 > URL: https://issues.apache.org/jira/browse/FLINK-3755 > Project: Flink > Issue Type: New Feature > Affects Versions: 1.1.0 > Reporter: Till Rohrmann > Assignee: Till Rohrmann > > In order to support dynamic scaling, it is necessary to sub-partition the > key-value states of each operator. This sub-partitioning, which produces a > set of key groups, allows to easily scale in and out Flink jobs by simply > reassigning the different key groups to the new set of sub tasks. The idea of > key groups is described in this design document [1]. > [1] > https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit?usp=sharing -- This message was sent by Atlassian JIRA (v6.3.4#6332)