[ 
https://issues.apache.org/jira/browse/FLINK-3755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15449027#comment-15449027
 ] 

ASF GitHub Bot commented on FLINK-3755:
---------------------------------------

GitHub user StefanRRichter opened a pull request:

    https://github.com/apache/flink/pull/2440

    Keyed backend refactor

    This pull request is a followup to the preliminary pull request #2376 and 
addresses all issues subsumed under [FLINK-3755]
    
    In addition to the changes from PR #2376, this PR adds:
    
    # 1) Refactoring of Key Value State
    
    Before, `AbstractStateBackend` was responsible both for checkpointing to 
streams and
    for keyed state. Now, this functionality is split up into 
`CheckpointStreamFactory` and
    `KeyedStateBackend`. The former is responsible for providing streams for 
writing checkpoints
    while the latter is only responsible for keeping keyed state. A 
`KeyedStateBackend` can
    write its content to a checkpoint. For this it uses a 
`CheckpointStreamFactory`.
    
    # 2) Introduction of key-group aware `KeyedStateBackend`s
    
    ## a) HeapKeyedStateBackend
    `HeapKeyedStateBackend` subsumes the keyed state part of both 
`MemStateBackend` and 
    `FsStateBackend` and `MemoryStateBackend`. The only difference between the 
two now
    is that one produces a `CheckpointStreamFactory` that produces streams for 
writing to files
    while the other provides streams that write to in-memory byte arrays.
    
    Also, this introduces another layer of lookup in the 
`HeapKeyedStateBackend` to accomodate
    storing state per key group. Upon checkpointing the data is written out in 
a format that
    is very similar to the new RocksDB backend. We should make these 100 % 
compatible as
    a follow up.
    
    ## b) RocksDBKeyedStateBackend
    
    The RocksDB state backend is now also key-group aware. This happens through 
an additional 
    1-2 byte key-group prefix that is added to each key. On snapshots, the key 
value states 
    for different key-groups are combined through `RocksDBMergeIterator`. All 
snapshots from
    this backend are now running fully asynchronous using an implimentation of
    `AbstractAsyncIOCallable`.
    
    # Refactoring of asynchrounous snapshot facilities
    
    Snapshots are now driven by `AsyncCheckpointRunnable`s in `StreamTask`, 
which are executed 
    through a threadpool. `AsyncCheckpointRunnable` is created with a 
    `RunnableFuture<KeyGroupsStateHandle>` that is obtained from 
`KeyedStateBackend` through
    
    ```
    public abstract RunnableFuture<KeyGroupsStateHandle> snapshot(
                long checkpointId,
                long timestamp,
                CheckpointStreamFactory streamFactory) throws Exception;
    ``` 
    
    # Review comments on #2376
    
    From the comments on this PR, which can be found under PR #2376, we 
introduced the following changes:
    
    ## In comparison to PR #2376, we dropped the KeyGroupAssigner interface in 
favor of static methods. 
    The reason for this is that the code relies on a consistent key group 
assignment in several places.
    ## By default, the max parallelism is chosen as 
    ```
    Math.min(128 , roundToNextPowerOfTwo(parallelism + parallelism / 2))
    ```
    ## No blocking on the termination of async snapshot threads.
    ## Reduced logging.
    
    # Limitiations
    
    Currently, queryable state is not key-group aware and 
`QueryableStateITCase` is ignored. This will 
    be solved in a folloup work.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StefanRRichter/flink keyed-backend-refactor

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2440.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2440
    
----
commit 40484f3a66558b40bcf5bcaae3e3dba28d73f8dd
Author: Till Rohrmann <trohrm...@apache.org>
Date:   2016-07-28T13:08:24Z

    [FLINK-4380] Introduce KeyGroupAssigner and Max-Parallelism Parameter
    
    This introduces a new KeySelector that assigns keys to key groups and
    also adds the max parallelism parameter throughout all API levels.
    
    This also adds tests for the newly introduced features.

commit 3609f29076dd504ab9790d874fe3e3d4828f6b77
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Date:   2016-08-10T16:44:50Z

    [FLINK-3761] Refactor State Backends/Make Keyed State Key-Group Aware
    
    The biggest change in this is that functionality that used to be in
    AbstractStateBackend is now moved to CheckpointStreamFactory and
    KeyedStateBackend. The former is responsible for providing streams that
    can be used to checkpoint data while the latter is responsible for
    keeping keyed state. A keyed backend can checkpoint the state that it
    keeps by using a CheckpointStreamFactory.
    
    This also refactors how asynchronous keyed state snapshots work. They
    are not implemented using a Future/RunnableFuture.
    
    Also, this changes the keyed state backends to be key-group aware and to
    snapshot the state in key-groups with an index for restoring.

commit 9d675ca0707b31923108ea78908b63fc46798c97
Author: Stefan Richter <s.rich...@data-artisans.com>
Date:   2016-08-11T09:59:07Z

    [FLINK-4381] Refactor State to Prepare For Key-Group State Backends

commit d99b75c70bb15fe6ee5c06968d92b075e9b6c772
Author: Till Rohrmann <trohrm...@apache.org>
Date:   2016-08-11T10:14:18Z

    [FLINK-4380] Add tests for new Key-Group/Max-Parallelism
    
    This tests the rescaling features in CheckpointCoordinator and
    SavepointCoordinator.

commit 1d514b8d5a8db663e1be293b9653c42d45787e36
Author: Stefan Richter <s.rich...@data-artisans.com>
Date:   2016-08-17T12:50:18Z

    [FLINK-3761] Refactor RocksDB Backend/Make Key-Group Aware
    
    This change makes the RocksDB backend key-group aware by building on the
    changes in the previous commit.

commit 4f791d17f727a2fead12224652747b73d98ffaa1
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Date:   2016-08-25T12:09:12Z

    Ignore QueryableStateITCase
    
    This doesn't work yet because the state query machinery is not yet
    properly aware of key-grouped state.

commit f47af43bd7b5e54086ab2ce87b9471cb99a38421
Author: Stefan Richter <s.rich...@data-artisans.com>
Date:   2016-08-29T08:02:31Z

    Introduced timout of thread pool for testing. Removed legacy code path from 
HashKeyGroupAssigner

commit 52061346e39cbc591e79bad2b6a4d9ce272bb558
Author: Stefan Richter <s.rich...@data-artisans.com>
Date:   2016-08-29T09:53:22Z

    Stephan's feedback: remove KeyGroupAssigner in favor of a static method and 
have default max. parallelism at 128

commit d77475a69dfdfae092107562af5c96af8de370cb
Author: Stefan Richter <s.rich...@data-artisans.com>
Date:   2016-08-29T14:10:15Z

    Improved test stability

commit 1252a246bee6bc181b2def83966d121b1ca5688e
Author: Stefan Richter <s.rich...@data-artisans.com>
Date:   2016-08-30T10:26:48Z

    Improved HeapKeyedStateBackend for more compact snapshots.

commit 3f282619c6dbf48a246bf848be2e92a4cad8bd2d
Author: Stefan Richter <s.rich...@data-artisans.com>
Date:   2016-08-30T11:45:40Z

    Expose max parallelism through StreamExecutionEnvironment

commit 1fa0e02f33722029159b39625127758fcb3623d3
Author: Stefan Richter <s.rich...@data-artisans.com>
Date:   2016-08-30T12:47:34Z

    test fix

commit 257992bf3ead38d12775b077478b84f8690c7fb9
Author: Stefan Richter <s.rich...@data-artisans.com>
Date:   2016-08-30T12:59:12Z

    Extended EventTimeWindowCheckpointITCase to test the boundaries of 
maxParallelism.

commit ea26c0f2e9f1687b5ae89a1acaeaea681c19bd80
Author: Stefan Richter <s.rich...@data-artisans.com>
Date:   2016-08-30T13:27:25Z

    Reset WindowCheckpointingITCase to (sometimes) failing version.

----


> Introduce key groups for key-value state to support dynamic scaling
> -------------------------------------------------------------------
>
>                 Key: FLINK-3755
>                 URL: https://issues.apache.org/jira/browse/FLINK-3755
>             Project: Flink
>          Issue Type: New Feature
>    Affects Versions: 1.1.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>
> In order to support dynamic scaling, it is necessary to sub-partition the 
> key-value states of each operator. This sub-partitioning, which produces a 
> set of key groups, allows to easily scale in and out Flink jobs by simply 
> reassigning the different key groups to the new set of sub tasks. The idea of 
> key groups is described in this design document [1]. 
> [1] 
> https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to