[ https://issues.apache.org/jira/browse/FLINK-4821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15974158#comment-15974158 ]
ASF GitHub Bot commented on FLINK-4821: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3001#discussion_r112122730 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -66,31 +80,48 @@ // Consumer properties // ------------------------------------------------------------------------ - /** The names of the Kinesis streams that we will be consuming from */ + /** + * The names of the Kinesis streams that we will be consuming from + */ private final List<String> streams; - /** Properties to parametrize settings such as AWS service region, initial position in stream, - * shard list retrieval behaviours, etc */ + /** + * Properties to parametrize settings such as AWS service region, initial position in stream, + * shard list retrieval behaviours, etc + */ private final Properties configProps; - /** User supplied deseriliazation schema to convert Kinesis byte messages to Flink objects */ + /** + * User supplied deseriliazation schema to convert Kinesis byte messages to Flink objects + */ private final KinesisDeserializationSchema<T> deserializer; // ------------------------------------------------------------------------ // Runtime state // ------------------------------------------------------------------------ - /** Per-task fetcher for Kinesis data records, where each fetcher pulls data from one or more Kinesis shards */ + /** + * Per-task fetcher for Kinesis data records, where each fetcher pulls data from one or more Kinesis shards + */ private transient KinesisDataFetcher<T> fetcher; - /** The sequence numbers in the last state snapshot of this subtask */ + /** + * The sequence numbers in the last state snapshot of this subtask + */ private transient HashMap<KinesisStreamShard, SequenceNumber> lastStateSnapshot; - /** The sequence numbers to restore to upon restore from failure */ + /** + * The sequence numbers to restore to upon restore from failure + */ private transient HashMap<KinesisStreamShard, SequenceNumber> sequenceNumsToRestore; private volatile boolean running = true; + // ------------------------------------------------------------------------ + // State for Checkpoint + // ------------------------------------------------------------------------ + + private transient ListState<Tuple2<KinesisStreamShard, SequenceNumber>> offsetsStateForCheckpoint; --- End diff -- "offset" is the Kafka term. I would try to rename this to use "sequence number" instead (or a likewise abbreviation). > Implement rescalable non-partitioned state for Kinesis Connector > ---------------------------------------------------------------- > > Key: FLINK-4821 > URL: https://issues.apache.org/jira/browse/FLINK-4821 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector > Reporter: Tzu-Li (Gordon) Tai > Assignee: Wei-Che Wei > > FLINK-4379 added the rescalable non-partitioned state feature, along with the > implementation for the Kafka connector. > The AWS Kinesis connector will benefit from the feature and should implement > it too. This ticket tracks progress for this. -- This message was sent by Atlassian JIRA (v6.3.15#6346)