[ https://issues.apache.org/jira/browse/FLINK-4821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15979751#comment-15979751 ]
ASF GitHub Bot commented on FLINK-4821: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3001#discussion_r112802220 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -267,38 +289,77 @@ public void close() throws Exception { // ------------------------------------------------------------------------ @Override - public HashMap<KinesisStreamShard, SequenceNumber> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + public void snapshotState(FunctionSnapshotContext context) throws Exception { if (lastStateSnapshot == null) { LOG.debug("snapshotState() requested on not yet opened source; returning null."); - return null; - } - - if (fetcher == null) { + } else if (fetcher == null) { LOG.debug("snapshotState() requested on not yet running source; returning null."); - return null; - } - - if (!running) { + } else if (!running) { LOG.debug("snapshotState() called on closed source; returning null."); - return null; - } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Snapshotting state ..."); + } - if (LOG.isDebugEnabled()) { - LOG.debug("Snapshotting state ..."); - } + sequenceNumsStateForCheckpoint.clear(); + lastStateSnapshot = fetcher.snapshotState(); - lastStateSnapshot = fetcher.snapshotState(); + if (LOG.isDebugEnabled()) { + LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}", + lastStateSnapshot.toString(), context.getCheckpointId(), context.getCheckpointTimestamp()); + } - if (LOG.isDebugEnabled()) { - LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}", - lastStateSnapshot.toString(), checkpointId, checkpointTimestamp); + for (Map.Entry<KinesisStreamShard, SequenceNumber> entry : lastStateSnapshot.entrySet()) { + sequenceNumsStateForCheckpoint.add(Tuple2.of(entry.getKey(), entry.getValue())); + } } + } - return lastStateSnapshot; + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + TypeInformation<Tuple2<KinesisStreamShard, SequenceNumber>> tuple = new TupleTypeInfo<>( + TypeInformation.of(KinesisStreamShard.class), + TypeInformation.of(SequenceNumber.class) + ); + + sequenceNumsStateForCheckpoint = context.getOperatorStateStore().getUnionListState( + new ListStateDescriptor<>("Kinesis-Stream-Shard-State", tuple)); + + if (context.isRestored()) { + if (sequenceNumsToRestore == null) { + sequenceNumsToRestore = new HashMap<>(); + for (Tuple2<KinesisStreamShard, SequenceNumber> kinesisOffset : sequenceNumsStateForCheckpoint.get()) { --- End diff -- "kinesisOffset" --> sequence number > Implement rescalable non-partitioned state for Kinesis Connector > ---------------------------------------------------------------- > > Key: FLINK-4821 > URL: https://issues.apache.org/jira/browse/FLINK-4821 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector > Reporter: Tzu-Li (Gordon) Tai > Assignee: Wei-Che Wei > > FLINK-4379 added the rescalable non-partitioned state feature, along with the > implementation for the Kafka connector. > The AWS Kinesis connector will benefit from the feature and should implement > it too. This ticket tracks progress for this. -- This message was sent by Atlassian JIRA (v6.3.15#6346)