
Wei-Che Wei edited comment on FLINK-6653 at 5/24/17 12:32 PM:

Hi [~tzulitai],

I would like to take over this issue and here is my proposal.

# Consumer will read two possible states: {{KinesisStreamShardV2}} or 
{KinesisStreamShard}} and merge to {{KinesisStreamShardV2}}
# Convert {{KinesisStreamShardV2}} to {{KinesisStreamShardHandle}} for the 
internal class to interact with AWS library.
# Convert {{KinesisStreamShardHandle}} to {{KinesisStreamShardV2}} and write 
the new states.

Proposed Changes:
# Introduces two models :
## {{KinesisStreamShardV2}}: It stores the stream name and all the information 
in {{Shard}} to decouple with AWS library, and will be the new state.
## {{KinesisStreamShardHandle}}: It is same as {{KinesisStreamShard}} but will 
not be able to be serialized. It is used in {{KinesisFetcher}}
 and {{ShardConsumer}} to distinguish from the legacy {{KinesisStreamShard}} 
state so that it can be changed along with any update in AWS library.
# Add {{KinesisStreamShardHandle}} in {{KinesisStreamShardState}}.
# Make {{KinesisStreamShardV2}} and {{SequenceNumber}} be POJO type.
# Two util functions to convert between {{KinesisStreamShardV2}} and 
# An util function to convert {{KinesisStreamShard}} to {{KinesisStreamShardV2}}

Test Plan:
# Update all tests with the whole new models and modified models.
# Migrate test that makes sure states will be restored if there is only legacy 
state in state backend.
# Unit test for those util functions.
# Test the new state will be serialized by POJO serializer.

was (Author: tonywei):
Hi [~tzulitai],

I would like to take over this issue and here is my proposal.

Proposed Changes:
# Introduces two models :
## {{KinesisStreamShardV2}}: It stores the stream name and all the information 
in {{Shard}} to decouple with AWS library, and will be the new state.
## {{KinesisStreamShardHandle}}: It is same as {{KinesisStreamShard}} but will 
not be able to be serialized. It is used in {{KinesisFetcher}}
 and {{ShardConsumer}} to distinguish from the legacy {{KinesisStreamShard}} 
state so that it can be changed along with any update in AWS library.
# Add {{KinesisStreamShardHandle}} in {{KinesisStreamShardState}}.
# Make {{KinesisStreamShardV2}} and {{SequenceNumber}} be POJO type.
# Two util functions to convert between {{KinesisStreamShardV2}} and 
# An util function to convert {{KinesisStreamShard}} to {{KinesisStreamShardV2}}

Test Plan:
# Update all tests with the whole new models and modified models.
# Migrate test that makes sure states will be restored if there is only legacy 
state in state backend.
# Unit test for those util functions.
# Test the new state will be serialized by POJO serializer.

> Avoid directly serializing AWS's Shard class in Kinesis consumer's checkpoints
> ------------------------------------------------------------------------------
>                 Key: FLINK-6653
>                 URL: https://issues.apache.org/jira/browse/FLINK-6653
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kinesis Connector
>            Reporter: Tzu-Li (Gordon) Tai
> Currently, the Kinesis consumer's checkpoints directly serialize AWS's 
> {{Shard}} instances in checkpoints. This makes bumping AWS library versions 
> hard, since any change to the {{Shard}} class by AWS will break checkpoint 
> compatibility.
> We should either have custom serialization for {{KinesisStreamShard}}, or 
> disintegrate the information in {{Shard}}. Ideally, it would be best to make 
> {{KinesisStreamShard}} and {{SequenceNumber}} to be non-serializable, hence 
> avoiding Java serialization in the checkpoints.

This message was sent by Atlassian JIRA

Reply via email to