[ https://issues.apache.org/jira/browse/FLINK-4574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651050#comment-16651050 ]
ASF GitHub Bot commented on FLINK-4574: --------------------------------------- tony810430 closed pull request #2925: [FLINK-4574] [kinesis] Strengthen fetch interval implementation in Kinesis consumer URL: https://github.com/apache/flink/pull/2925 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java index 612a4a7b273..2da0c912771 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java @@ -38,6 +38,10 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Properties; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -64,6 +68,9 @@ private SequenceNumber lastSequenceNum; + /** Reference to the first error thrown by the {@link ShardConsumerFetcher} threads */ + private final AtomicReference<Throwable> error; + /** * Creates a shard consumer. * @@ -81,7 +88,7 @@ public ShardConsumer(KinesisDataFetcher<T> fetcherRef, subscribedShard, lastSequenceNum, KinesisProxy.create(fetcherRef.getConsumerConfiguration())); - } + } /** This constructor is exposed for testing purposes */ protected ShardConsumer(KinesisDataFetcher<T> fetcherRef, @@ -107,27 +114,30 @@ protected ShardConsumer(KinesisDataFetcher<T> fetcherRef, this.fetchIntervalMillis = Long.valueOf(consumerConfig.getProperty( ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_INTERVAL_MILLIS))); + + this.error = new AtomicReference<Throwable>(); } @SuppressWarnings("unchecked") @Override public void run() { - String nextShardItr; + String startShardItr; + Timer shardConsumerFetcherScheduler = new Timer(); try { - // before infinitely looping, we set the initial nextShardItr appropriately + // before infinitely looping, we set the initial startShardItr appropriately if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_LATEST_SEQUENCE_NUM.get())) { // if the shard is already closed, there will be no latest next record to get for this shard if (subscribedShard.isClosed()) { - nextShardItr = null; + startShardItr = null; } else { - nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), null); + startShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.LATEST.toString(), null); } } else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get())) { - nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.TRIM_HORIZON.toString(), null); + startShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.TRIM_HORIZON.toString(), null); } else if (lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) { - nextShardItr = null; + startShardItr = null; } else { // we will be starting from an actual sequence number (due to restore from failure). // if the last sequence number refers to an aggregated record, we need to clean up any dangling sub-records @@ -154,42 +164,115 @@ public void run() { } } - // set the nextShardItr so we can continue iterating in the next while loop - nextShardItr = getRecordsResult.getNextShardIterator(); + // set the startShardItr so we can continue iterating in the next while loop + startShardItr = getRecordsResult.getNextShardIterator(); } else { // the last record was non-aggregated, so we can simply start from the next record - nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber()); + startShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber()); } } - while(isRunning()) { - if (nextShardItr == null) { - fetcherRef.updateState(subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()); - - // we can close this consumer thread once we've reached the end of the subscribed shard - break; - } else { - if (fetchIntervalMillis != 0) { - Thread.sleep(fetchIntervalMillis); - } + ArrayBlockingQueue<UserRecord> queue = new ArrayBlockingQueue<>(maxNumberOfRecordsPerFetch); + ShardConsumerFetcher shardConsumerFetcher; - GetRecordsResult getRecordsResult = getRecords(nextShardItr, maxNumberOfRecordsPerFetch); + if (fetchIntervalMillis > 0L) { + shardConsumerFetcher = new ShardConsumerFetcher(this, startShardItr, queue, false); + shardConsumerFetcherScheduler.scheduleAtFixedRate(shardConsumerFetcher, 0L, fetchIntervalMillis); + } else { + // if fetchIntervalMillis is 0, make the task run forever and schedule it once only. + shardConsumerFetcher = new ShardConsumerFetcher(this, startShardItr, queue, true); + shardConsumerFetcherScheduler.schedule(shardConsumerFetcher, 0L); + } - // each of the Kinesis records may be aggregated, so we must deaggregate them before proceeding - List<UserRecord> fetchedRecords = deaggregateRecords( - getRecordsResult.getRecords(), - subscribedShard.getShard().getHashKeyRange().getStartingHashKey(), - subscribedShard.getShard().getHashKeyRange().getEndingHashKey()); + while(isRunning()) { + UserRecord record = queue.poll(); + if (record != null) { + deserializeRecordForCollectionAndUpdateState(record); + } else { + if (shardConsumerFetcher.nextShardItr == null) { + fetcherRef.updateState(subscribedShardStateIndex, SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get()); - for (UserRecord record : fetchedRecords) { - deserializeRecordForCollectionAndUpdateState(record); + // we can close this consumer thread once we've reached the end of the subscribed shard + break; } + } - nextShardItr = getRecordsResult.getNextShardIterator(); + Throwable throwable = this.error.get(); + if (throwable != null) { + throw throwable; } } } catch (Throwable t) { fetcherRef.stopWithError(t); + } finally { + shardConsumerFetcherScheduler.cancel(); + } + } + + private class ShardConsumerFetcher extends TimerTask { + private String nextShardItr; + + private final ShardConsumer<T> shardConsumerRef; + + private final ArrayBlockingQueue<UserRecord> userRecordQueue; + + /** The latest finish time for fetching data from Kinesis used to recognize if the following task has been delayed. */ + private Long lastFinishTime = -1L; + + private boolean runForever; + + ShardConsumerFetcher(ShardConsumer<T> shardConsumerRef, + String nextShardItr, + ArrayBlockingQueue<UserRecord> userRecordQueue, + boolean runForever) { + this.shardConsumerRef = shardConsumerRef; + this.nextShardItr = nextShardItr; + this.userRecordQueue = userRecordQueue; + this.runForever = runForever; + } + + @Override + public void run() { + GetRecordsResult getRecordsResult; + List<UserRecord> fetchedRecords; + + try { + do { + if (nextShardItr != null) { + // ignore to log this warning if runForever is true, since fetchIntervalMillis is 0 + if (!runForever && this.scheduledExecutionTime() < lastFinishTime) { + // If expected scheduled execution time is earlier than lastFinishTime, + // it seems that the fetchIntervalMillis might be short to finish the previous task. + LOG.warn("The value given for ShardConsumer is too short to finish getRecords on time."); + } else { + getRecordsResult = shardConsumerRef.getRecords(nextShardItr, maxNumberOfRecordsPerFetch); + + if (getRecordsResult != null) { + // each of the Kinesis records may be aggregated, so we must deaggregate them before proceeding + fetchedRecords = deaggregateRecords( + getRecordsResult.getRecords(), + subscribedShard.getShard().getHashKeyRange().getStartingHashKey(), + subscribedShard.getShard().getHashKeyRange().getEndingHashKey()); + + for (UserRecord record : fetchedRecords) { + userRecordQueue.put(record); + } + + nextShardItr = getRecordsResult.getNextShardIterator(); + } else { + // getRecordsResult got null due to iterator expired. + // Give up this task and get a new shard iterator for the next task. + nextShardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber()); + } + lastFinishTime = System.currentTimeMillis(); + } + } else { + break; + } + } while (runForever); + } catch (Throwable t) { + shardConsumerRef.stopWithError(t); + } } } @@ -203,6 +286,12 @@ private boolean isRunning() { return !Thread.interrupted(); } + /** Called by created TimerTask: {@link ShardConsumerFetcher} to pass on errors. Only the first thrown error is set. + * Once set, It will cause run() to throw the error and call stopWithError() in {@link KinesisDataFetcher}*/ + private void stopWithError(Throwable throwable) { + this.error.compareAndSet(null, throwable); + } + /** * Deserializes a record for collection, and accordingly updates the shard state in the fetcher. The last * successfully collected sequence number in this shard consumer is also updated so that @@ -263,20 +352,13 @@ private void deserializeRecordForCollectionAndUpdateState(UserRecord record) */ private GetRecordsResult getRecords(String shardItr, int maxNumberOfRecords) throws InterruptedException { GetRecordsResult getRecordsResult = null; - while (getRecordsResult == null) { - try { - getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords); - } catch (ExpiredIteratorException eiEx) { - LOG.warn("Encountered an unexpected expired iterator {} for shard {};" + - " refreshing the iterator ...", shardItr, subscribedShard); - shardItr = kinesis.getShardIterator(subscribedShard, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), lastSequenceNum.getSequenceNumber()); - - // sleep for the fetch interval before the next getRecords attempt with the refreshed iterator - if (fetchIntervalMillis != 0) { - Thread.sleep(fetchIntervalMillis); - } - } + try { + getRecordsResult = kinesis.getRecords(shardItr, maxNumberOfRecords); + } catch (ExpiredIteratorException eiEx) { + LOG.warn("Encountered an unexpected expired iterator {} for shard {};" + + " refreshing the iterator ...", shardItr, subscribedShard); } + return getRecordsResult; } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Strengthen fetch interval implementation in Kinesis consumer > ------------------------------------------------------------ > > Key: FLINK-4574 > URL: https://issues.apache.org/jira/browse/FLINK-4574 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector > Affects Versions: 1.1.0 > Reporter: Tzu-Li (Gordon) Tai > Assignee: Wei-Che Wei > Priority: Major > Labels: pull-request-available > > As pointed out by [~rmetzger], right now the fetch interval implementation in > the {{ShardConsumer}} class of the Kinesis consumer can lead to much longer > interval times than specified by the user, ex. say the specified fetch > interval is {{f}}, it takes {{x}} to complete a {{getRecords()}} call, and > {{y}} to complete processing the fetched records for emitting, than the > actual interval between each fetch is actually {{f+x+y}}. > The main problem with this is that we can never guarantee how much time has > past since the last {{getRecords}} call, thus can not guarantee that returned > shard iterators will not have expired the next time we use them, even if we > limit the user-given value for {{f}} to not be longer than the iterator > expire time. > I propose to improve this by, per {{ShardConsumer}}, use a > {{ScheduledExecutorService}} / {{Timer}} to do the fixed-interval fetching, > and a separate blocking queue that collects the fetched records for emitting. -- This message was sent by Atlassian JIRA (v7.6.3#76005)