[ 
https://issues.apache.org/jira/browse/FLINK-4821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15943180#comment-15943180
 ] 

ASF GitHub Bot commented on FLINK-4821:
---------------------------------------

Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/3001
  
    Hi @tony810430!
    
    Sorry for the long pause on this PR. After some back and forth offline 
discussions with others on how exactly we want to proceed with this, we decided 
to stick with using union state to cope with the shard discovery on restore 
problem (at least for 1.3.0). Therefore, we can finally continue work here :-D
    
    First of all, to use union state, instead of `ListCheckpointed`, we should 
use `CheckpointedFunction` instead. There is a PR for exposing union state to 
the public API (#3508), but in case that isn't merged yet within the next few 
days, I suggest that you don't need to be blocked when you continue your work 
on this PR. For now, you can cast the operator state store instance retrieved 
through the `FunctionInitializationContext` to `DefaultOperatorStateStore` to 
use broadcast state.
    
    One thing to also note, which is missing in you previous work on this, is 
that we need a migration path from the old state access (i.e., via 
`CheckpointedAsynchronously`) to the new state (i.e. `CheckpointedFunction`).
    
    The `FlinkKafkaConsumerBase` class in the Kafka connector provides a very 
good example of how to do this. Simply put, in the end, the 
`FlinkKinesisConsumer` should implement both `CheckpointedRestoring` and 
`CheckpointedFunction`, and bridge the old state read from the legacy 
`restoreState(...)` method to the new `initializeState(...)` method. The bridge 
would simply be a field variable in the consumer class.
    
    The `FlinkKafkaConsumerBase` also serves as a good example of how to use 
the `CheckpointedFunction` if you have questions there.
    
    Let me know if you have any questions with this, and feel free to ping me 
any time!


> Implement rescalable non-partitioned state for Kinesis Connector
> ----------------------------------------------------------------
>
>                 Key: FLINK-4821
>                 URL: https://issues.apache.org/jira/browse/FLINK-4821
>             Project: Flink
>          Issue Type: New Feature
>          Components: Kinesis Connector
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Wei-Che Wei
>
> FLINK-4379 added the rescalable non-partitioned state feature, along with the 
> implementation for the Kafka connector.
> The AWS Kinesis connector will benefit from the feature and should implement 
> it too. This ticket tracks progress for this.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to