[ https://issues.apache.org/jira/browse/FLINK-4574?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Wei-Che Wei reassigned FLINK-4574: ---------------------------------- Assignee: (was: Wei-Che Wei) > Strengthen fetch interval implementation in Kinesis consumer > ------------------------------------------------------------ > > Key: FLINK-4574 > URL: https://issues.apache.org/jira/browse/FLINK-4574 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector > Affects Versions: 1.1.0 > Reporter: Tzu-Li (Gordon) Tai > Priority: Major > Labels: pull-request-available > > As pointed out by [~rmetzger], right now the fetch interval implementation in > the {{ShardConsumer}} class of the Kinesis consumer can lead to much longer > interval times than specified by the user, ex. say the specified fetch > interval is {{f}}, it takes {{x}} to complete a {{getRecords()}} call, and > {{y}} to complete processing the fetched records for emitting, than the > actual interval between each fetch is actually {{f+x+y}}. > The main problem with this is that we can never guarantee how much time has > past since the last {{getRecords}} call, thus can not guarantee that returned > shard iterators will not have expired the next time we use them, even if we > limit the user-given value for {{f}} to not be longer than the iterator > expire time. > I propose to improve this by, per {{ShardConsumer}}, use a > {{ScheduledExecutorService}} / {{Timer}} to do the fixed-interval fetching, > and a separate blocking queue that collects the fetched records for emitting. -- This message was sent by Atlassian JIRA (v7.6.3#76005)