[ https://issues.apache.org/jira/browse/FLINK-6653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16022800#comment-16022800 ]
Wei-Che Wei edited comment on FLINK-6653 at 5/24/17 12:33 PM: -------------------------------------------------------------- Hi [~tzulitai], I would like to take over this issue and here is my proposal. Description: # Consumer will read two possible states: {{KinesisStreamShardV2}} or {{KinesisStreamShard}} and merge to {{KinesisStreamShardV2}} # Convert {{KinesisStreamShardV2}} to {{KinesisStreamShardHandle}} for the internal class to interact with AWS library. # Convert {{KinesisStreamShardHandle}} to {{KinesisStreamShardV2}} and write the new states. Proposed Changes: # Introduces two models : ## {{KinesisStreamShardV2}}: It stores the stream name and all the information in {{Shard}} to decouple with AWS library, and will be the new state. ## {{KinesisStreamShardHandle}}: It is same as {{KinesisStreamShard}} but will not be able to be serialized. It is used in {{KinesisFetcher}} and {{ShardConsumer}} to distinguish from the legacy {{KinesisStreamShard}} state so that it can be changed along with any update in AWS library. # Add {{KinesisStreamShardHandle}} in {{KinesisStreamShardState}}. # Make {{KinesisStreamShardV2}} and {{SequenceNumber}} be POJO type. # Two util functions to convert between {{KinesisStreamShardV2}} and {{KinesisStreamShardHandle}} # An util function to convert {{KinesisStreamShard}} to {{KinesisStreamShardV2}} Test Plan: # Update all tests with the whole new models and modified models. # Migrate test that makes sure states will be restored if there is only legacy state in state backend. # Unit test for those util functions. # Test the new state will be serialized by POJO serializer. was (Author: tonywei): Hi [~tzulitai], I would like to take over this issue and here is my proposal. Description: # Consumer will read two possible states: {{KinesisStreamShardV2}} or {KinesisStreamShard}} and merge to {{KinesisStreamShardV2}} # Convert {{KinesisStreamShardV2}} to {{KinesisStreamShardHandle}} for the internal class to interact with AWS library. # Convert {{KinesisStreamShardHandle}} to {{KinesisStreamShardV2}} and write the new states. Proposed Changes: # Introduces two models : ## {{KinesisStreamShardV2}}: It stores the stream name and all the information in {{Shard}} to decouple with AWS library, and will be the new state. ## {{KinesisStreamShardHandle}}: It is same as {{KinesisStreamShard}} but will not be able to be serialized. It is used in {{KinesisFetcher}} and {{ShardConsumer}} to distinguish from the legacy {{KinesisStreamShard}} state so that it can be changed along with any update in AWS library. # Add {{KinesisStreamShardHandle}} in {{KinesisStreamShardState}}. # Make {{KinesisStreamShardV2}} and {{SequenceNumber}} be POJO type. # Two util functions to convert between {{KinesisStreamShardV2}} and {{KinesisStreamShardHandle}} # An util function to convert {{KinesisStreamShard}} to {{KinesisStreamShardV2}} Test Plan: # Update all tests with the whole new models and modified models. # Migrate test that makes sure states will be restored if there is only legacy state in state backend. # Unit test for those util functions. # Test the new state will be serialized by POJO serializer. > Avoid directly serializing AWS's Shard class in Kinesis consumer's checkpoints > ------------------------------------------------------------------------------ > > Key: FLINK-6653 > URL: https://issues.apache.org/jira/browse/FLINK-6653 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector > Reporter: Tzu-Li (Gordon) Tai > > Currently, the Kinesis consumer's checkpoints directly serialize AWS's > {{Shard}} instances in checkpoints. This makes bumping AWS library versions > hard, since any change to the {{Shard}} class by AWS will break checkpoint > compatibility. > We should either have custom serialization for {{KinesisStreamShard}}, or > disintegrate the information in {{Shard}}. Ideally, it would be best to make > {{KinesisStreamShard}} and {{SequenceNumber}} to be non-serializable, hence > avoiding Java serialization in the checkpoints. -- This message was sent by Atlassian JIRA (v6.3.15#6346)