[ https://issues.apache.org/jira/browse/FLINK-4821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15979756#comment-15979756 ]
ASF GitHub Bot commented on FLINK-4821: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3001#discussion_r112802121 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -194,26 +212,30 @@ public void run(SourceContext<T> sourceContext) throws Exception { // all subtasks will run a fetcher, regardless of whether or not the subtask will initially have // shards to subscribe to; fetchers will continuously poll for changes in the shard list, so all subtasks // can potentially have new shards to subscribe to later on - fetcher = new KinesisDataFetcher<>( - streams, sourceContext, getRuntimeContext(), configProps, deserializer); + fetcher = createFetcher(streams, sourceContext, getRuntimeContext(), configProps, deserializer); boolean isRestoringFromFailure = (sequenceNumsToRestore != null); fetcher.setIsRestoringFromFailure(isRestoringFromFailure); // if we are restoring from a checkpoint, we iterate over the restored // state and accordingly seed the fetcher with subscribed shards states if (isRestoringFromFailure) { - for (Map.Entry<KinesisStreamShard, SequenceNumber> restored : lastStateSnapshot.entrySet()) { + List<KinesisStreamShard> newShardsCreatedWhileNotRunning = fetcher.discoverNewShardsToSubscribe(); + for (KinesisStreamShard shard : newShardsCreatedWhileNotRunning) { fetcher.advanceLastDiscoveredShardOfStream( - restored.getKey().getStreamName(), restored.getKey().getShard().getShardId()); + shard.getStreamName(), shard.getShard().getShardId()); + + SequenceNumber startingStateForNewShard = lastStateSnapshot.containsKey(shard) + ? lastStateSnapshot.get(shard) + : SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get(); - if (LOG.isInfoEnabled()) { + if (LOG.isInfoEnabled() && lastStateSnapshot.containsKey(shard)) { --- End diff -- I think we can integrate the `lastStateSnapshot.containsKey(shard)` check with ``` SequenceNumber startingStateForNewShard = lastStateSnapshot.containsKey(shard) ? lastStateSnapshot.get(shard) : SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get(); ``` and also add a log for the latter case when the seq num is `SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get()`. > Implement rescalable non-partitioned state for Kinesis Connector > ---------------------------------------------------------------- > > Key: FLINK-4821 > URL: https://issues.apache.org/jira/browse/FLINK-4821 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector > Reporter: Tzu-Li (Gordon) Tai > Assignee: Wei-Che Wei > > FLINK-4379 added the rescalable non-partitioned state feature, along with the > implementation for the Kafka connector. > The AWS Kinesis connector will benefit from the feature and should implement > it too. This ticket tracks progress for this. -- This message was sent by Atlassian JIRA (v6.3.15#6346)