GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/1562
Enhance Partitioned State and use it in WindowOperator The commits in this are self-contained. The first one enhances the partitioned state such that it can be used by the WindowOperator. The second commit adds the required changes in WindowOperator. The third commit adds a State Backend based on RocksDB. This means that window operator can not run on different types of state backends transparently. @StephanEwen has some things that he wants to change, those will be added here (I think). I'm opening this now so that people can have a look at it. ## [FLINK-3201] Enhance Partitioned State Interface with State Types Add new state types ValueState, ListState and ReducingState, where ListState and ReducingState derive from interface MergingState. ValueState behaves exactly the same as OperatorState. MergingState is a stateful list to which elements can be added and for which the elements that it contains can be obtained. If using a ListState the list of elements is actually kept, for a ReducingState a reduce function is used to combine all added elements into one. To create a ValueState the user passes a ValueStateIdentifier to StreamingRuntimeContext.getPartitionedState() while they would pass a ListStateIdentifier or ReducingStateIdentifier for the other state types. This change is necessary to give the system more information about the nature of the operator state. We want this to be able to do incremental snapshots. This would not be possible, for example, if the user had a List as a state. Inside OperatorState this list would be opaque and Flink could not create good incremental snapshots. This also refactors the StateBackend. Before, the logic for partitioned state was spread out over StreamingRuntimeContext, AbstractStreamOperator and StateBackend. Now it is consolidated in StateBackend. This also adds support for partitioned state in two-input operators. ## [FLINK-3200] Use Partitioned State in WindowOperator This changes window operator to use the new partitioned state abstraction for keeping window contents instead of custom internal state and the checkpointed interface. For now, timers are still kept as custom checkpointed state, however. WindowOperator now expects a StateIdentifier for MergingState, this can either be for ReducingState or ListState but WindowOperator is agnostic to the type of State. Also the signature of WindowFunction is changed to include the type of intermediate input. For example, if a ReducingState is used the input of the WindowFunction is T (where T is the input type). If using a ListState the input of the WindowFunction would be of type Iterable[T]. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink window-on-state Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1562.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1562 ---- commit dc440ba4d97cecd297e432f14342dba5382cab50 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Date: 2016-01-25T11:33:51Z [FLINK-3201] Enhance Partitioned State Interface with State Types Add new state types ValueState, ListState and ReducingState, where ListState and ReducingState derive from interface MergingState. ValueState behaves exactly the same as OperatorState. MergingState is a stateful list to which elements can be added and for which the elements that it contains can be obtained. If using a ListState the list of elements is actually kept, for a ReducingState a reduce function is used to combine all added elements into one. To create a ValueState the user passes a ValueStateIdentifier to StreamingRuntimeContext.getPartitionedState() while they would pass a ListStateIdentifier or ReducingStateIdentifier for the other state types. This change is necessary to give the system more information about the nature of the operator state. We want this to be able to do incremental snapshots. This would not be possible, for example, if the user had a List as a state. Inside OperatorState this list would be opaque and Flink could not create good incremental snapshots. This also refactors the StateBackend. Before, the logic for partitioned state was spread out over StreamingRuntimeContext, AbstractStreamOperator and StateBackend. Now it is consolidated in StateBackend. This also adds support for partitioned state in two-input operators. commit 865723e3ff0133ba9d921907c298c3545fdfe32c Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Date: 2016-01-25T11:34:05Z [FLINK-3200] Use Partitioned State in WindowOperator This changes window operator to use the new partitioned state abstraction for keeping window contents instead of custom internal state and the checkpointed interface. For now, timers are still kept as custom checkpointed state, however. WindowOperator now expects a StateIdentifier for MergingState, this can either be for ReducingState or ListState but WindowOperator is agnostic to the type of State. Also the signature of WindowFunction is changed to include the type of intermediate input. For example, if a ReducingState is used the input of the WindowFunction is T (where T is the input type). If using a ListState the input of the WindowFunction would be of type Iterable[T]. commit 1034a02380652ef5184e0094a41ef073c7c9b4fd Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Date: 2016-01-21T09:56:47Z [FLINK-3278] Add Partitioned State Backend Based on RocksDB commit 13d1f541a48f82b1a10854addbdd5ea9bde7b079 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Date: 2016-01-28T17:18:26Z Move RocksDB backup to external processes ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---