GitHub user aljoscha opened a pull request:

    https://github.com/apache/flink/pull/1562

    Enhance Partitioned State and use it in WindowOperator

    The commits in this are self-contained. The first one enhances the 
partitioned state such that it can be used by the WindowOperator. The second 
commit adds the required changes in WindowOperator. The third commit adds a 
State Backend based on RocksDB. This means that window operator can not run on 
different types of state backends transparently.
    
    @StephanEwen has some things that he wants to change, those will be added 
here (I think). I'm opening this now so that people can have a look at it.
    
    ## [FLINK-3201] Enhance Partitioned State Interface with State Types
    Add new state types ValueState, ListState and ReducingState, where
    ListState and ReducingState derive from interface MergingState.
    
    ValueState behaves exactly the same as OperatorState. MergingState is a
    stateful list to which elements can be added and for which the elements
    that it contains can be obtained. If using a ListState the list of
    elements is actually kept, for a ReducingState a reduce function is used
    to combine all added elements into one. To create a ValueState the user
    passes a ValueStateIdentifier to
    StreamingRuntimeContext.getPartitionedState() while they would pass a
    ListStateIdentifier or ReducingStateIdentifier for the other state
    types.
    
    This change is necessary to give the system more information about the
    nature of the operator state. We want this to be able to do incremental
    snapshots. This would not be possible, for example, if the user had a
    List as a state. Inside OperatorState this list would be opaque and
    Flink could not create good incremental snapshots.
    
    This also refactors the StateBackend. Before, the logic for partitioned
    state was spread out over StreamingRuntimeContext,
    AbstractStreamOperator and StateBackend. Now it is consolidated in
    StateBackend.
    
    This also adds support for partitioned state in two-input operators.
    
    ## [FLINK-3200] Use Partitioned State in WindowOperator
    
    This changes window operator to use the new partitioned state
    abstraction for keeping window contents instead of custom internal
    state and the checkpointed interface.
    
    For now, timers are still kept as custom checkpointed state, however.
    
    WindowOperator now expects a StateIdentifier for MergingState, this can
    either be for ReducingState or ListState but WindowOperator is agnostic
    to the type of State. Also the signature of WindowFunction is changed to
    include the type of intermediate input. For example, if a ReducingState
    is used the input of the WindowFunction is T (where T is the input
    type). If using a ListState the input of the WindowFunction would be of
    type Iterable[T].

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/aljoscha/flink window-on-state

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1562.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1562
    
----
commit dc440ba4d97cecd297e432f14342dba5382cab50
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Date:   2016-01-25T11:33:51Z

    [FLINK-3201] Enhance Partitioned State Interface with State Types
    
    Add new state types ValueState, ListState and ReducingState, where
    ListState and ReducingState derive from interface MergingState.
    
    ValueState behaves exactly the same as OperatorState. MergingState is a
    stateful list to which elements can be added and for which the elements
    that it contains can be obtained. If using a ListState the list of
    elements is actually kept, for a ReducingState a reduce function is used
    to combine all added elements into one. To create a ValueState the user
    passes a ValueStateIdentifier to
    StreamingRuntimeContext.getPartitionedState() while they would pass a
    ListStateIdentifier or ReducingStateIdentifier for the other state
    types.
    
    This change is necessary to give the system more information about the
    nature of the operator state. We want this to be able to do incremental
    snapshots. This would not be possible, for example, if the user had a
    List as a state. Inside OperatorState this list would be opaque and
    Flink could not create good incremental snapshots.
    
    This also refactors the StateBackend. Before, the logic for partitioned
    state was spread out over StreamingRuntimeContext,
    AbstractStreamOperator and StateBackend. Now it is consolidated in
    StateBackend.
    
    This also adds support for partitioned state in two-input operators.

commit 865723e3ff0133ba9d921907c298c3545fdfe32c
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Date:   2016-01-25T11:34:05Z

    [FLINK-3200] Use Partitioned State in WindowOperator
    
    This changes window operator to use the new partitioned state
    abstraction for keeping window contents instead of custom internal
    state and the checkpointed interface.
    
    For now, timers are still kept as custom checkpointed state, however.
    
    WindowOperator now expects a StateIdentifier for MergingState, this can
    either be for ReducingState or ListState but WindowOperator is agnostic
    to the type of State. Also the signature of WindowFunction is changed to
    include the type of intermediate input. For example, if a ReducingState
    is used the input of the WindowFunction is T (where T is the input
    type). If using a ListState the input of the WindowFunction would be of
    type Iterable[T].

commit 1034a02380652ef5184e0094a41ef073c7c9b4fd
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Date:   2016-01-21T09:56:47Z

    [FLINK-3278] Add Partitioned State Backend Based on RocksDB

commit 13d1f541a48f82b1a10854addbdd5ea9bde7b079
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Date:   2016-01-28T17:18:26Z

    Move RocksDB backup to external processes

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to