Hi Scott & Stephan, The problem has happened a couple more times since yesterday, it's very strange as my job was running fine for over a week before this started happening. I find that if I restart the job (and restore from the last checkpoint) it runs fine for a while (couple of hours) before breaking again.
@Scott thanks, I'll try testing with the upgraded versions, though since my job was running fine for over a week it feels like there might be something else going on here. @Stephan I see, my sink is a Kafka topic. I only have two nodes in my Kafka cluster and CPU and memory usage seems normal on both nodes. I can't see anything bad in the task manager logs relating to the Kafka producer either. I do have a fairly large state (20GB) but I'm using the latest RocksDB state backend (with asynchronous checkpointing). I'm not sure what else I can do to investigate this, but let me know if you have any more ideas! Thanks, Josh On Thu, Nov 3, 2016 at 6:27 PM, Stephan Ewen <se...@apache.org> wrote: > Is it possible that you have stalls in your topology? > > Reasons could be: > > - The data sink blocks or becomes slow for some periods (where are you > sending the data to?) > > - If you are using large state and a state backend that only supports > synchronous checkpointing, there may be a delay introduced by the checkpoint > > > On Thu, Nov 3, 2016 at 7:21 PM, Scott Kidder <kidder.sc...@gmail.com> > wrote: > >> Hi Steffan & Josh, >> >> For what it's worth, I've been using the Kinesis connector with very good >> results on Flink 1.1.2 and 1.1.3. I updated the Flink Kinesis connector KCL >> and AWS SDK dependencies to the following versions: >> >> aws.sdk.version: 1.11.34 >> aws.kinesis-kcl.version: 1.7.0 >> >> My customizations are visible in this commit on my fork: >> https://github.com/apache/flink/commit/6d69f99d7cd52b3c2f039 >> cb4d37518859e159b32 >> >> It might be worth testing with newer AWS SDK & KCL libraries to see if >> the problem persists. >> >> Best, >> >> --Scott Kidder >> >> >> On Thu, Nov 3, 2016 at 7:08 AM, Josh <jof...@gmail.com> wrote: >> >>> Hi Gordon, >>> >>> Thanks for the fast reply! >>> You're right about the expired iterator exception occurring just before >>> each spike. I can't see any signs of long GC on the task managers... CPU >>> has been <15% the whole time when the spikes were taking place and I can't >>> see anything unusual in the task manager logs. >>> >>> But actually I just noticed that the Flink UI showed no successful >>> checkpoints during the time of the problem even though my checkpoint >>> interval is 15 minutes. So I guess this is probably some kind of Flink >>> problem rather than a problem with the Kinesis consumer. Unfortunately I >>> can't find anything useful in the logs so not sure what happened! >>> >>> Josh >>> >>> >>> >>> On Thu, Nov 3, 2016 at 12:44 PM, Tzu-Li (Gordon) Tai < >>> tzuli...@apache.org> wrote: >>> >>>> Hi Josh, >>>> >>>> That warning message was added as part of FLINK-4514. It pops out >>>> whenever a shard iterator was used after 5 minutes it was returned from >>>> Kinesis. >>>> The only time spent between after a shard iterator was returned and >>>> before it was used to fetch the next batch of records, is on deserializing >>>> and emitting of the records of the last fetched batch. >>>> So unless processing of the last fetched batch took over 5 minutes, >>>> this normally shouldn’t happen. >>>> >>>> Have you noticed any sign of long, constant full GC for your Flink task >>>> managers? From your description and check in code, the only possible guess >>>> I can come up with now is that >>>> the source tasks completely seized to be running for a period of time, >>>> and when it came back, the shard iterator was unexpectedly found to be >>>> expired. According to the graph you attached, >>>> when the iterator was refreshed and tasks successfully fetched a few >>>> more batches, the source tasks again halted, and so on. >>>> So you should see that same warning message right before every small >>>> peak within the graph. >>>> >>>> Best Regards, >>>> Gordon >>>> >>>> >>>> On November 3, 2016 at 7:46:42 PM, Josh (jof...@gmail.com) wrote: >>>> >>>> Hey Gordon, >>>> >>>> I've been using Flink 1.2-SNAPSHOT for the past week (with FLINK-4514) >>>> with no problems, but yesterday the Kinesis consumer started behaving >>>> strangely... My Kinesis data stream is fairly constant at around 1.5MB/sec, >>>> however the Flink Kinesis consumer started to stop consuming for periods of >>>> time (see the spikes in graph attached which shows data consumed by the >>>> Flink Kinesis consumer) >>>> >>>> Looking in the task manager logs, there are no exceptions however there >>>> is this log message which I believe is related to the problem: >>>> >>>> 2016-11-03 09:27:53,782 WARN org.apache.flink.streaming.co >>>> nnectors.kinesis.internals.ShardConsumer - Encountered an unexpected >>>> expired iterator AAAAAAAAAAF8OJyh+X3yBnbtzUgIfXv+phS7PK >>>> ppd7q09/tduXG3lOhCmBGPUOlZul24tzSSM6KjHsQ+AbZY8MThKc >>>> SvGax/EoOIYoTELYbZmuwY4hgeqUsndxLIM0HL55iejroBV8YFmUmGwHsW8q >>>> kHsz//Ci4cxcLrGArHex3n+4E+aoZ9AtgTPEZOBjXY49g+VGsDb0bQN5FJUoUVEfnbupk96ore >>>> for shard KinesisStreamShard{streamName='stream001', shard='{ShardId: >>>> shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: >>>> 85070511730234615865841151857942042863},SequenceNumberRange: >>>> {StartingSequenceNumber: 495665429169236488921642479266 >>>> 79091159472198219567464450,}}'}; refreshing the iterator ... >>>> >>>> Having restarted the job from my last savepoint, it's consuming the >>>> stream fine again with no problems. >>>> >>>> Do you have any idea what might be causing this, or anything I should >>>> do to investigate further? >>>> >>>> Cheers, >>>> >>>> Josh >>>> >>>> On Wed, Oct 5, 2016 at 4:55 AM, Tzu-Li (Gordon) Tai < >>>> tzuli...@apache.org> wrote: >>>> >>>>> Hi Steffen, >>>>> >>>>> Turns out that FLINK-4514 just missed Flink 1.1.2 and wasn’t included >>>>> in the release (I’ll update the resolve version in JIRA to 1.1.3, thanks >>>>> for noticing this!). >>>>> The Flink community is going to release 1.1.3 asap, which will include >>>>> the fix. >>>>> If you don’t want to wait for the release and want to try the fix now, >>>>> you can also build on the current “release-1.1” branch, which already has >>>>> FLINK-4514 merged. >>>>> Sorry for the inconvenience. Let me know if you bump into any other >>>>> problems afterwards. >>>>> >>>>> Best Regards, >>>>> Gordon >>>>> >>>>> >>>>> On October 5, 2016 at 2:56:21 AM, Steffen Hausmann ( >>>>> stef...@hausmann-family.de) wrote: >>>>> >>>>> Hi there, >>>>> >>>>> I'm running a Flink 1.1.2 job on EMR and Yarn that is reading events >>>>> from a Kinesis stream. However, after a while (the exact duration >>>>> varies >>>>> and is in the order of minutes) the Kinesis source doesn't emit any >>>>> further events and hence Flink doesn't produce any further output. >>>>> Eventually, an ExpiredIteratorException occurs in one of the task, >>>>> causing the entire job to fail: >>>>> >>>>> > com.amazonaws.services.kinesis.model.ExpiredIteratorException: >>>>> Iterator expired. The iterator was created at time Mon Oct 03 18:40:30 UTC >>>>> 2016 while right now it is Mon Oct 03 18:45:33 UTC 2016 which is further >>>>> in >>>>> the future than the tolerated delay of 300000 milliseconds. (Service: >>>>> AmazonKinesis; Status Code: 400; Error Code: ExpiredIteratorException; >>>>> Request ID: dace9532-9031-54bc-8aa2-3cbfb136d590) >>>>> >>>>> This seems to be related to FLINK-4514, which is marked as resovled for >>>>> Flink 1.1.2. In contrast to what is describe in the ticket, the job I'm >>>>> running isn't suspended but hangs just a few minutes after the job has >>>>> been started. >>>>> >>>>> I've attached a log file showing the described behavior. >>>>> >>>>> Any idea what may be wrong? >>>>> >>>>> Thanks, >>>>> Steffen >>>>> >>>>> >>>> >>> >> >