[ https://issues.apache.org/jira/browse/FLINK-2324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14642013#comment-14642013 ]
ASF GitHub Bot commented on FLINK-2324: --------------------------------------- GitHub user gyfora opened a pull request: https://github.com/apache/flink/pull/937 [FLINK-2324] [streaming] Partitioned state checkpointing rework + test update This commit reworks the way partitioned operator states are checkpointed eliminating per-key checkpointing to increase performance and also updates the StreamCheckpointingITCase to use partitioned state to check exactly-once semantics. It also eliminates a double checkpoint commit issue at head operators in chains. **Rework of partitioned state checkpointing** Previously each OperatorState method would return a PartitionedStateHandle object upon snapshotting which would contain a Map of Key -> StateHandle, storing the partitioned states by key. This had two negative implications: * At every checkpoint we stored the states for each key independently (slowing the checkpoints and overloading the storage layer) * We had to keep the keys in-memory (this might cause memory issues with large keys) This has been changed in this commit to store the map of states at each operator in 1 statehandle, minimizing the time it takes to take a snapshot and also minimizing the load on the storage layer. This might change slightly when we start thinking about state repartitioning for operator scaling. **Update StreamCheckpointingITCase with partitioned states** In the StreamCheckpointingITCase I replaced the RichReduceFunction (which was working incorrectly as it is not checkpointed) with a RichFlatMapFunction implementing the stateful group reduce functionality using partitioned states, to count the number of strings received for a specific prefix. Using this we now test that exactly-once processing happens. You can merge this pull request into a Git repository by running: $ git pull https://github.com/gyfora/flink partitioned-state-cleanup Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/937.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #937 ---- commit fd0f77a5523b5819724a7152e27d23dad03d02f3 Author: Gyula Fora <gyf...@apache.org> Date: 2015-07-26T16:02:56Z [FLINK-2324] [streaming] Partitioned state checkpointing rework + test update ---- > Rework partitioned state storage > -------------------------------- > > Key: FLINK-2324 > URL: https://issues.apache.org/jira/browse/FLINK-2324 > Project: Flink > Issue Type: Improvement > Reporter: Gyula Fora > Assignee: Gyula Fora > > Partitioned states are currently stored per-key in statehandles. This is > alright for in-memory storage but is very inefficient for HDFS. > The logic behind the current mechanism is that this approach provides a way > to repartition a state without fetching the data from the external storage > and only manipulating handles. > We should come up with a solution that can achieve both. -- This message was sent by Atlassian JIRA (v6.3.4#6332)