[ https://issues.apache.org/jira/browse/FLINK-4821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15750648#comment-15750648 ]
ASF GitHub Bot commented on FLINK-4821: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3001#discussion_r92555340 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -294,11 +296,18 @@ public void close() throws Exception { lastStateSnapshot.toString(), checkpointId, checkpointTimestamp); } - return lastStateSnapshot; + List<Tuple2<KinesisStreamShard, SequenceNumber>> listState = new ArrayList<>(lastStateSnapshot.size()); + for (Map.Entry<KinesisStreamShard, SequenceNumber> entry: lastStateSnapshot.entrySet()) { + listState.add(Tuple2.of(entry.getKey(), entry.getValue())); + } + return listState; } @Override - public void restoreState(HashMap<KinesisStreamShard, SequenceNumber> restoredState) throws Exception { - sequenceNumsToRestore = restoredState; + public void restoreState(List<Tuple2<KinesisStreamShard, SequenceNumber>> state) throws Exception { + sequenceNumsToRestore = new HashMap<>(); + for (Tuple2<KinesisStreamShard, SequenceNumber> subState: state) { --- End diff -- We should probably do a null check here for `state`. From the looks of #3005, I don't think restored state will ever be null (will be empty list), but it'd be good to make the code here independent of that. > Implement rescalable non-partitioned state for Kinesis Connector > ---------------------------------------------------------------- > > Key: FLINK-4821 > URL: https://issues.apache.org/jira/browse/FLINK-4821 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector > Reporter: Tzu-Li (Gordon) Tai > Assignee: Wei-Che Wei > Fix For: 1.2.0 > > > FLINK-4379 added the rescalable non-partitioned state feature, along with the > implementation for the Kafka connector. > The AWS Kinesis connector will benefit from the feature and should implement > it too. This ticket tracks progress for this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)