Hi Josh, That warning message was added as part of FLINK-4514. It pops out whenever a shard iterator was used after 5 minutes it was returned from Kinesis. The only time spent between after a shard iterator was returned and before it was used to fetch the next batch of records, is on deserializing and emitting of the records of the last fetched batch. So unless processing of the last fetched batch took over 5 minutes, this normally shouldn’t happen.
Have you noticed any sign of long, constant full GC for your Flink task managers? From your description and check in code, the only possible guess I can come up with now is that the source tasks completely seized to be running for a period of time, and when it came back, the shard iterator was unexpectedly found to be expired. According to the graph you attached, when the iterator was refreshed and tasks successfully fetched a few more batches, the source tasks again halted, and so on. So you should see that same warning message right before every small peak within the graph. Best Regards, Gordon On November 3, 2016 at 7:46:42 PM, Josh (jof...@gmail.com) wrote: Hey Gordon, I've been using Flink 1.2-SNAPSHOT for the past week (with FLINK-4514) with no problems, but yesterday the Kinesis consumer started behaving strangely... My Kinesis data stream is fairly constant at around 1.5MB/sec, however the Flink Kinesis consumer started to stop consuming for periods of time (see the spikes in graph attached which shows data consumed by the Flink Kinesis consumer) Looking in the task manager logs, there are no exceptions however there is this log message which I believe is related to the problem: 2016-11-03 09:27:53,782 WARN org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer - Encountered an unexpected expired iterator AAAAAAAAAAF8OJyh+X3yBnbtzUgIfXv+phS7PKppd7q09/tduXG3lOhCmBGPUOlZul24tzSSM6KjHsQ+AbZY8MThKcSvGax/EoOIYoTELYbZmuwY4hgeqUsndxLIM0HL55iejroBV8YFmUmGwHsW8qkHsz//Ci4cxcLrGArHex3n+4E+aoZ9AtgTPEZOBjXY49g+VGsDb0bQN5FJUoUVEfnbupk96ore for shard KinesisStreamShard{streamName='stream001', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 85070511730234615865841151857942042863},SequenceNumberRange: {StartingSequenceNumber: 49566542916923648892164247926679091159472198219567464450,}}'}; refreshing the iterator ... Having restarted the job from my last savepoint, it's consuming the stream fine again with no problems. Do you have any idea what might be causing this, or anything I should do to investigate further? Cheers, Josh On Wed, Oct 5, 2016 at 4:55 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: Hi Steffen, Turns out that FLINK-4514 just missed Flink 1.1.2 and wasn’t included in the release (I’ll update the resolve version in JIRA to 1.1.3, thanks for noticing this!). The Flink community is going to release 1.1.3 asap, which will include the fix. If you don’t want to wait for the release and want to try the fix now, you can also build on the current “release-1.1” branch, which already has FLINK-4514 merged. Sorry for the inconvenience. Let me know if you bump into any other problems afterwards. Best Regards, Gordon On October 5, 2016 at 2:56:21 AM, Steffen Hausmann (stef...@hausmann-family.de) wrote: Hi there, I'm running a Flink 1.1.2 job on EMR and Yarn that is reading events from a Kinesis stream. However, after a while (the exact duration varies and is in the order of minutes) the Kinesis source doesn't emit any further events and hence Flink doesn't produce any further output. Eventually, an ExpiredIteratorException occurs in one of the task, causing the entire job to fail: > com.amazonaws.services.kinesis.model.ExpiredIteratorException: Iterator > expired. The iterator was created at time Mon Oct 03 18:40:30 UTC 2016 while > right now it is Mon Oct 03 18:45:33 UTC 2016 which is further in the future > than the tolerated delay of 300000 milliseconds. (Service: AmazonKinesis; > Status Code: 400; Error Code: ExpiredIteratorException; Request ID: > dace9532-9031-54bc-8aa2-3cbfb136d590) This seems to be related to FLINK-4514, which is marked as resovled for Flink 1.1.2. In contrast to what is describe in the ticket, the job I'm running isn't suspended but hangs just a few minutes after the job has been started. I've attached a log file showing the described behavior. Any idea what may be wrong? Thanks, Steffen