[ https://issues.apache.org/jira/browse/FLINK-4821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15750647#comment-15750647 ]
ASF GitHub Bot commented on FLINK-4821: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3001#discussion_r92554905 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -294,11 +296,18 @@ public void close() throws Exception { lastStateSnapshot.toString(), checkpointId, checkpointTimestamp); } - return lastStateSnapshot; + List<Tuple2<KinesisStreamShard, SequenceNumber>> listState = new ArrayList<>(lastStateSnapshot.size()); + for (Map.Entry<KinesisStreamShard, SequenceNumber> entry: lastStateSnapshot.entrySet()) { + listState.add(Tuple2.of(entry.getKey(), entry.getValue())); + } + return listState; } @Override - public void restoreState(HashMap<KinesisStreamShard, SequenceNumber> restoredState) throws Exception { - sequenceNumsToRestore = restoredState; + public void restoreState(List<Tuple2<KinesisStreamShard, SequenceNumber>> state) throws Exception { + sequenceNumsToRestore = new HashMap<>(); + for (Tuple2<KinesisStreamShard, SequenceNumber> subState: state) { --- End diff -- formatting nit: need empty space before colon ":" > Implement rescalable non-partitioned state for Kinesis Connector > ---------------------------------------------------------------- > > Key: FLINK-4821 > URL: https://issues.apache.org/jira/browse/FLINK-4821 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector > Reporter: Tzu-Li (Gordon) Tai > Assignee: Wei-Che Wei > Fix For: 1.2.0 > > > FLINK-4379 added the rescalable non-partitioned state feature, along with the > implementation for the Kafka connector. > The AWS Kinesis connector will benefit from the feature and should implement > it too. This ticket tracks progress for this. -- This message was sent by Atlassian JIRA (v6.3.4#6332)