GitHub user StephanEwen opened a pull request:

    https://github.com/apache/flink/pull/1239

    [FLINK-2808] Rework state abstraction and clean up task / operator internals

    This pull request fixes many related/intermixed issues. It was hard to 
split this into individual issues.
    
    ### Crucial bug fixes
    
      - State snapshots for memory backed state previously copied a reference 
into the StateHandle, after which the streaming program continued. If the state 
was mutated prior to serialization by Akka, the mutated state was checkpointed, 
rather than the state at the point of drawing the snapshot.
      
      - Key/value state is checkpointed as a whole, rather than individually 
per key.
      
      - Memory-backed state now has a maximum size that is checked upon 
checkpointing. Exceeding that size fails the checkpoint. Before, too large 
state simply resulted in an oversized Akka frame that was dropped, silently 
letting the program run without ever completing a checkpoint.
    
    
    ### User-facing changes
    
      - The state backend is not only responsible for storing snapshots of the 
user state, but they also define how exactly the key/value state is represented 
in the first place. This allows us to plug in external key/value stores to 
store the Flink key/value state. Default implementations store the state in 
memory / files.
      
      - State backend offers additional methods to checkpoint directly into 
streams.
      
      - One can configure arbitrary default state backends via a factory 
interface that creates them from the TaskManager configuration.
      
      - Key/value state supports arbitrary types without extra checkpointer 
logic, but user needs to supply type of state (via class or TypeInformation)
      
      - Removed the `OperatorState` that is non-partitioned. The only type of 
state remaining through the ´OperatorState` abstraction is partitioned 
key/value state in functions that are applied on a KeyedStream. Consequently, 
the `mapWithState()` and related methods are only available on the `KeyedStream`
      
    
    ### Internal cleanups
    
      - Checkpoint barriers are forwarded earlier, to reduce latency introduced 
by checkpoints.
    
      - Fewer in-memory copies when checkpointing to the file system state 
backend
    
      - The StreamingRuntimeContext is used purely for UDF interaction, not to 
hand over components to the operators. 
    
      - The infinite reduce and aggregations work properly on key/value state, 
rather than maintaining their own maps
      
      - made the OutputHandler (not OperatorChain) type safe and simpler
      
      - made clear distinction between responsibilities of StreamTasks 
(input/output streams, setup of operator chain, checkpoint coordination) and 
operators (scope of one function and runtime context)
      
      - clean up checkpointing logic between operator (checkpoints generic 
key/value state) and UDF operators (checkpoint UDFs)
      
      - removed Configuration from operator open() method (was used in 
confusion with UDF open(Configuration())

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StephanEwen/incubator-flink statebackend

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1239.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1239
    
----
commit 3d633f0d608d91cfa69455fa9a47c53bf753a677
Author: Stephan Ewen <se...@apache.org>
Date:   2015-10-05T13:57:49Z

    [hotfix] Correct name of HDFS tests from 'org.apache.flink.tachyon' to 
'org.apache.flink.hdfstests'

commit 441c089552b3045062e8620ad9d2c8411fb387a8
Author: Stephan Ewen <se...@apache.org>
Date:   2015-10-05T13:57:04Z

    [FLINK-2808] [streaming] Refactor and extend state backend abstraction

commit 73b65e2196576b0e36730bd0c8d8d3ced56f9f4f
Author: Stephan Ewen <se...@apache.org>
Date:   2015-10-07T11:54:05Z

    [FLINK-2808] [streaming] Integrate extended state backend abstraction with 
streaming state handling

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to