[ https://issues.apache.org/jira/browse/FLINK-4821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15974152#comment-15974152 ]
ASF GitHub Bot commented on FLINK-4821: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3001#discussion_r112122128 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -267,38 +293,84 @@ public void close() throws Exception { // ------------------------------------------------------------------------ @Override - public HashMap<KinesisStreamShard, SequenceNumber> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + public void snapshotState(FunctionSnapshotContext context) throws Exception { if (lastStateSnapshot == null) { LOG.debug("snapshotState() requested on not yet opened source; returning null."); - return null; - } - - if (fetcher == null) { + } else if (fetcher == null) { LOG.debug("snapshotState() requested on not yet running source; returning null."); - return null; - } - - if (!running) { + } else if (!running) { LOG.debug("snapshotState() called on closed source; returning null."); - return null; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Snapshotting state ..."); + } + + offsetsStateForCheckpoint.clear(); + lastStateSnapshot = fetcher.snapshotState(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Snapshotted state, last processed sequence numbers: {}, checkpoint id: {}, timestamp: {}", + lastStateSnapshot.toString(), context.getCheckpointId(), context.getCheckpointTimestamp()); + } + + for (Map.Entry<KinesisStreamShard, SequenceNumber> entry : lastStateSnapshot.entrySet()) { + offsetsStateForCheckpoint.add(Tuple2.of(entry.getKey(), entry.getValue())); + } } + } - if (LOG.isDebugEnabled()) { - LOG.debug("Snapshotting state ..."); + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + TypeInformation<Tuple2<KinesisStreamShard, SequenceNumber>> tuple = new TupleTypeInfo<>( + TypeInformation.of(KinesisStreamShard.class), + TypeInformation.of(SequenceNumber.class) + ); + + offsetsStateForCheckpoint = context.getOperatorStateStore().getUnionListState( + new ListStateDescriptor<>(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME, tuple)); + + if (context.isRestored()) { + if (sequenceNumsToRestore == null) { + sequenceNumsToRestore = new HashMap<>(); + for (Tuple2<KinesisStreamShard, SequenceNumber> kinesisOffset : offsetsStateForCheckpoint.get()) { + sequenceNumsToRestore.put(kinesisOffset.f0, kinesisOffset.f1); + } + + LOG.info("Setting restore state in the FlinkKinesisConsumer."); + if (LOG.isDebugEnabled()) { + LOG.debug("Using the following offsets: {}", sequenceNumsToRestore); + } + } else if (sequenceNumsToRestore.isEmpty()) { + sequenceNumsToRestore = null; + } + } else { + LOG.info("No restore state for FlinkKinesisConsumer."); } + } - lastStateSnapshot = fetcher.snapshotState(); + @Override + public void restoreState(HashMap<KinesisStreamShard, SequenceNumber> restoredState) throws Exception { + LOG.info("{} (taskIdx={}) restoring offsets from an older version.", --- End diff -- I think the other log messages follow a different format then this. The others do something like `Subtask {} is restoring offset from an older version`? > Implement rescalable non-partitioned state for Kinesis Connector > ---------------------------------------------------------------- > > Key: FLINK-4821 > URL: https://issues.apache.org/jira/browse/FLINK-4821 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector > Reporter: Tzu-Li (Gordon) Tai > Assignee: Wei-Che Wei > > FLINK-4379 added the rescalable non-partitioned state feature, along with the > implementation for the Kafka connector. > The AWS Kinesis connector will benefit from the feature and should implement > it too. This ticket tracks progress for this. -- This message was sent by Atlassian JIRA (v6.3.15#6346)