[ 
https://issues.apache.org/jira/browse/FLINK-2324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14642013#comment-14642013
 ] 

ASF GitHub Bot commented on FLINK-2324:
---------------------------------------

GitHub user gyfora opened a pull request:

    https://github.com/apache/flink/pull/937

    [FLINK-2324] [streaming] Partitioned state checkpointing rework + test 
update

    This commit reworks the way partitioned operator states are checkpointed 
eliminating per-key checkpointing to increase performance and also updates the 
StreamCheckpointingITCase to use partitioned state to check exactly-once 
semantics. 
    
    It also eliminates a double checkpoint commit issue at head operators in 
chains.
    
    **Rework of partitioned state checkpointing**
    Previously each OperatorState method would return a PartitionedStateHandle 
object upon snapshotting which would contain a Map of Key -> StateHandle, 
storing the partitioned states by key. This had two negative implications:
    * At every checkpoint we stored the states for each key independently 
(slowing the checkpoints and overloading the storage layer)
    * We had to keep the keys in-memory (this might cause memory issues with 
large keys)
    
    This has been changed in this commit to store the map of states at each 
operator in 1 statehandle, minimizing the time it takes to take a snapshot and 
also minimizing the load on the storage layer. This might change slightly when 
we start thinking about state repartitioning for operator scaling.
    
    **Update StreamCheckpointingITCase with partitioned states**
    In the StreamCheckpointingITCase I replaced the RichReduceFunction (which 
was working incorrectly as it is not checkpointed) with a RichFlatMapFunction 
implementing the stateful group reduce functionality using partitioned states, 
to count the number of strings received for a specific prefix.
    Using this we now test that exactly-once processing happens.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/gyfora/flink partitioned-state-cleanup

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/937.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #937
    
----
commit fd0f77a5523b5819724a7152e27d23dad03d02f3
Author: Gyula Fora <gyf...@apache.org>
Date:   2015-07-26T16:02:56Z

    [FLINK-2324] [streaming] Partitioned state checkpointing rework + test 
update

----


> Rework partitioned state storage
> --------------------------------
>
>                 Key: FLINK-2324
>                 URL: https://issues.apache.org/jira/browse/FLINK-2324
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: Gyula Fora
>            Assignee: Gyula Fora
>
> Partitioned states are currently stored per-key in statehandles. This is 
> alright for in-memory storage but is very inefficient for HDFS. 
> The logic behind the current mechanism is that this approach provides a way 
> to repartition a state without fetching the data from the external storage 
> and only manipulating handles.
> We should come up with a solution that can achieve both.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to