Github user gyfora commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1305#discussion_r45377370
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java ---
    @@ -64,33 +64,35 @@
        /**
         * Closes the state backend, releasing all internal resources, but does 
not delete any persistent
         * checkpoint data.
    -    * 
    +    *
         * @throws Exception Exceptions can be forwarded and will be logged by 
the system
         */
        public abstract void close() throws Exception;
    -   
    +
        // 
------------------------------------------------------------------------
        //  key/value state
        // 
------------------------------------------------------------------------
     
        /**
         * Creates a key/value state backed by this state backend.
    -    * 
    +    *
    +    * @param operatorId Unique id for the operator creating the state
    +    * @param stateName Name of the created state
         * @param keySerializer The serializer for the key.
         * @param valueSerializer The serializer for the value.
         * @param defaultValue The value that is returned when no other value 
has been associated with a key, yet.
         * @param <K> The type of the key.
         * @param <V> The type of the value.
    -    * 
    +    *
         * @return A new key/value state backed by this backend.
    -    * 
    +    *
         * @throws Exception Exceptions may occur during initialization of the 
state and should be forwarded.
         */
    -   public abstract <K, V> KvState<K, V, Backend> createKvState(
    +   public abstract <K, V> KvState<K, V, Backend> createKvState(int 
operatorId, String stateName,
    --- End diff --
    
    Let me first describe how sharding works than I will give a concrete 
example. 
    Key-Value pairs are sharded by key not by the subtask. This means that each 
parallel subtask maintains a connection to all the shards and partitions the 
states before writing them to the appropriate shards according to the user 
defined partitioner (in the backend config). This is much better than sharding 
by subtask because we can later change the parallelism of the job without 
affecting the state and also lets us defined a more elaborate sharding strategy 
through the partitioner.
    
    This means, when a kv state is created we create a table for that kvstate 
in each shard. If we would do it according to your suggestion we would need to 
create numShards number of tables for each parallel instance (total of p*ns) 
for each kvstate. Furthermore this makes the fancy sharding useless because we 
cannot change the job parallelism. So we need to make sure that parallel 
subtasks of a given operator write to the same state tables (so we only have ns 
number of tables regardless of the parallelism).
    
    In order to do this we need something that uniqely identifies a given state 
in the streaming program (and parallel instances should have the same id).
    
    The information required to create such unique state id is an identifier 
for the operator that has the state + the name of the state. (The information 
obtained from the environment is not enough because chained operators have the 
same environment, therefore if they have conflicting state names the id is not 
unique). The only thing that identifies an operator in the logical streaming 
program is the operator id assigned by the jobgraphbuilder (thats the whole 
point of having it). 
    
    An example job with p=2 and numshards = 3:
    
    chained map -> filter, both the mapper and filter has a state named 
"count", and let's assume that mapper has opid 1 and filter 2.
    
    In this case the mapper would create 3 db tables (1 on each shard) with the 
same name kvstate_count_1_jobId. The filter would also create 3 tables with 
names: kvstate_count_2_jobId
    
    All mapper instances would write to all three database shards, and the same 
goes for all the filters.
    
    I hope you get what I am trying to say. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to