[ https://issues.apache.org/jira/browse/FLINK-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15357072#comment-15357072 ]
ASF GitHub Bot commented on FLINK-3231: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/2131#discussion_r69130874 --- Diff: flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/KinesisConfigConstants.java --- @@ -68,10 +97,30 @@ // Default configuration values // ------------------------------------------------------------------------ - public static final int DEFAULT_STREAM_DESCRIBE_RETRY_TIMES = 3; + public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_BASE = 1000L; + + public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF_MAX = 5000L; + + public static final double DEFAULT_STREAM_DESCRIBE_BACKOFF_EXPONENTIAL_CONSTANT = 1.5; + + public static final int DEFAULT_SHARD_GETRECORDS_MAX = 100; + + public static final int DEFAULT_SHARD_GETRECORDS_RETRIES = 3; + + public static final long DEFAULT_SHARD_GETRECORDS_BACKOFF_BASE = 300L; + + public static final long DEFAULT_SHARD_GETRECORDS_BACKOFF_MAX = 1000L; + + public static final double DEFAULT_SHARD_GETRECORDS_BACKOFF_EXPONENTIAL_CONSTANT = 1.5; + + public static final int DEFAULT_SHARD_GETITERATOR_RETRIES = 3; + + public static final long DEFAULT_SHARD_GETITERATOR_BACKOFF_BASE = 300L; + + public static final long DEFAULT_SHARD_GETITERATOR_BACKOFF_MAX = 1000L; - public static final long DEFAULT_STREAM_DESCRIBE_BACKOFF = 1000L; + public static final double DEFAULT_SHARD_GETITERATOR_BACKOFF_EXPONENTIAL_CONSTANT = 1.5; - public static final int DEFAULT_SHARD_RECORDS_PER_GET = 100; + public static final long DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS = 10000L; --- End diff -- I'm using 10s for default discovery interval here. I tested it and the originally suggested 30s seemed a bit too long as a default, IMHO. Can change it back to 30s if you think it's more appropriate. > Handle Kinesis-side resharding in Kinesis streaming consumer > ------------------------------------------------------------ > > Key: FLINK-3231 > URL: https://issues.apache.org/jira/browse/FLINK-3231 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector, Streaming Connectors > Affects Versions: 1.1.0 > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.1.0 > > > A big difference between Kinesis shards and Kafka partitions is that Kinesis > users can choose to "merge" and "split" shards at any time for adjustable > stream throughput capacity. This article explains this quite clearly: > https://brandur.org/kinesis-by-example. > This will break the static shard-to-task mapping implemented in the basic > version of the Kinesis consumer > (https://issues.apache.org/jira/browse/FLINK-3229). The static shard-to-task > mapping is done in a simple round-robin-like distribution which can be > locally determined at each Flink consumer task (Flink Kafka consumer does > this too). > To handle Kinesis resharding, we will need some way to let the Flink consumer > tasks coordinate which shards they are currently handling, and allow the > tasks to ask the coordinator for a shards reassignment when the task finds > out it has found a closed shard at runtime (shards will be closed by Kinesis > when it is merged and split). > We need a centralized coordinator state store which is visible to all Flink > consumer tasks. Tasks can use this state store to locally determine what > shards it can be reassigned. Amazon KCL uses a DynamoDB table for the > coordination, but as described in > https://issues.apache.org/jira/browse/FLINK-3211, we unfortunately can't use > KCL for the implementation of the consumer if we want to leverage Flink's > checkpointing mechanics. For our own implementation, Zookeeper can be used > for this state store, but that means it would require the user to set up ZK > to work. > Since this feature introduces extensive work, it is opened as a separate > sub-task from the basic implementation > https://issues.apache.org/jira/browse/FLINK-3229. -- This message was sent by Atlassian JIRA (v6.3.4#6332)