I've reset the state and the job appears to be running smoothly again now.
My guess is that this problem was somehow related to my state becoming too
large (it got to around 20GB before the problem began). I would still like
to get to the bottom of what caused this as resetting the job's state is
not a long term solution, so if anyone has any ideas I can restore the
large state and investigate further.

Btw sorry for going a bit off topic on this thread!

On Fri, Nov 4, 2016 at 11:19 AM, Josh <jof...@gmail.com> wrote:

> Hi Scott & Stephan,
>
> The problem has happened a couple more times since yesterday, it's very
> strange as my job was running fine for over a week before this started
> happening. I find that if I restart the job (and restore from the last
> checkpoint) it runs fine for a while (couple of hours) before breaking
> again.
>
> @Scott thanks, I'll try testing with the upgraded versions, though since
> my job was running fine for over a week it feels like there might be
> something else going on here.
>
> @Stephan I see, my sink is a Kafka topic. I only have two nodes in my
> Kafka cluster and CPU and memory usage seems normal on both nodes. I can't
> see anything bad in the task manager logs relating to the Kafka producer
> either. I do have a fairly large state (20GB) but I'm using the latest
> RocksDB state backend (with asynchronous checkpointing). I'm not sure what
> else I can do to investigate this, but let me know if you have any more
> ideas!
>
> Thanks,
> Josh
>
>
> On Thu, Nov 3, 2016 at 6:27 PM, Stephan Ewen <se...@apache.org> wrote:
>
>> Is it possible that you have stalls in your topology?
>>
>> Reasons could be:
>>
>>   - The data sink blocks or becomes slow for some periods (where are you
>> sending the data to?)
>>
>>   - If you are using large state and a state backend that only supports
>> synchronous checkpointing, there may be a delay introduced by the checkpoint
>>
>>
>> On Thu, Nov 3, 2016 at 7:21 PM, Scott Kidder <kidder.sc...@gmail.com>
>> wrote:
>>
>>> Hi Steffan & Josh,
>>>
>>> For what it's worth, I've been using the Kinesis connector with very
>>> good results on Flink 1.1.2 and 1.1.3. I updated the Flink Kinesis
>>> connector KCL and AWS SDK dependencies to the following versions:
>>>
>>> aws.sdk.version: 1.11.34
>>> aws.kinesis-kcl.version: 1.7.0
>>>
>>> My customizations are visible in this commit on my fork:
>>> https://github.com/apache/flink/commit/6d69f99d7cd52b3c2f039
>>> cb4d37518859e159b32
>>>
>>> It might be worth testing with newer AWS SDK & KCL libraries to see if
>>> the problem persists.
>>>
>>> Best,
>>>
>>> --Scott Kidder
>>>
>>>
>>> On Thu, Nov 3, 2016 at 7:08 AM, Josh <jof...@gmail.com> wrote:
>>>
>>>> Hi Gordon,
>>>>
>>>> Thanks for the fast reply!
>>>> You're right about the expired iterator exception occurring just before
>>>> each spike. I can't see any signs of long GC on the task managers... CPU
>>>> has been <15% the whole time when the spikes were taking place and I can't
>>>> see anything unusual in the task manager logs.
>>>>
>>>> But actually I just noticed that the Flink UI showed no successful
>>>> checkpoints during the time of the problem even though my checkpoint
>>>> interval is 15 minutes. So I guess this is probably some kind of Flink
>>>> problem rather than a problem with the Kinesis consumer. Unfortunately I
>>>> can't find anything useful in the logs so not sure what happened!
>>>>
>>>> Josh
>>>>
>>>>
>>>>
>>>> On Thu, Nov 3, 2016 at 12:44 PM, Tzu-Li (Gordon) Tai <
>>>> tzuli...@apache.org> wrote:
>>>>
>>>>> Hi Josh,
>>>>>
>>>>> That warning message was added as part of FLINK-4514. It pops out
>>>>> whenever a shard iterator was used after 5 minutes it was returned from
>>>>> Kinesis.
>>>>> The only time spent between after a shard iterator was returned and
>>>>> before it was used to fetch the next batch of records, is on deserializing
>>>>> and emitting of the records of the last fetched batch.
>>>>> So unless processing of the last fetched batch took over 5 minutes,
>>>>> this normally shouldn’t happen.
>>>>>
>>>>> Have you noticed any sign of long, constant full GC for your Flink
>>>>> task managers? From your description and check in code, the only possible
>>>>> guess I can come up with now is that
>>>>> the source tasks completely seized to be running for a period of time,
>>>>> and when it came back, the shard iterator was unexpectedly found to be
>>>>> expired. According to the graph you attached,
>>>>> when the iterator was refreshed and tasks successfully fetched a few
>>>>> more batches, the source tasks again halted, and so on.
>>>>> So you should see that same warning message right before every small
>>>>> peak within the graph.
>>>>>
>>>>> Best Regards,
>>>>> Gordon
>>>>>
>>>>>
>>>>> On November 3, 2016 at 7:46:42 PM, Josh (jof...@gmail.com) wrote:
>>>>>
>>>>> Hey Gordon,
>>>>>
>>>>> I've been using Flink 1.2-SNAPSHOT for the past week (with FLINK-4514)
>>>>> with no problems, but yesterday the Kinesis consumer started behaving
>>>>> strangely... My Kinesis data stream is fairly constant at around 
>>>>> 1.5MB/sec,
>>>>> however the Flink Kinesis consumer started to stop consuming for periods 
>>>>> of
>>>>> time (see the spikes in graph attached which shows data consumed by the
>>>>> Flink Kinesis consumer)
>>>>>
>>>>> Looking in the task manager logs, there are no exceptions however
>>>>> there is this log message which I believe is related to the problem:
>>>>>
>>>>> 2016-11-03 09:27:53,782 WARN  org.apache.flink.streaming.co
>>>>> nnectors.kinesis.internals.ShardConsumer  - Encountered an unexpected
>>>>> expired iterator AAAAAAAAAAF8OJyh+X3yBnbtzUgIfXv+phS7PK
>>>>> ppd7q09/tduXG3lOhCmBGPUOlZul24tzSSM6KjHsQ+AbZY8MThKc
>>>>> SvGax/EoOIYoTELYbZmuwY4hgeqUsndxLIM0HL55iejroBV8YFmUmGwHsW8q
>>>>> kHsz//Ci4cxcLrGArHex3n+4E+aoZ9AtgTPEZOBjXY49g+VGsDb0bQN5FJUoUVEfnbupk96ore
>>>>> for shard KinesisStreamShard{streamName='stream001', shard='{ShardId:
>>>>> shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
>>>>> 85070511730234615865841151857942042863},SequenceNumberRange:
>>>>> {StartingSequenceNumber: 495665429169236488921642479266
>>>>> 79091159472198219567464450,}}'}; refreshing the iterator ...
>>>>>
>>>>> Having restarted the job from my last savepoint, it's consuming the
>>>>> stream fine again with no problems.
>>>>>
>>>>> Do you have any idea what might be causing this, or anything I should
>>>>> do to investigate further?
>>>>>
>>>>> Cheers,
>>>>>
>>>>> Josh
>>>>>
>>>>> On Wed, Oct 5, 2016 at 4:55 AM, Tzu-Li (Gordon) Tai <
>>>>> tzuli...@apache.org> wrote:
>>>>>
>>>>>> Hi Steffen,
>>>>>>
>>>>>> Turns out that FLINK-4514 just missed Flink 1.1.2 and wasn’t included
>>>>>> in the release (I’ll update the resolve version in JIRA to 1.1.3, thanks
>>>>>> for noticing this!).
>>>>>> The Flink community is going to release 1.1.3 asap, which will
>>>>>> include the fix.
>>>>>> If you don’t want to wait for the release and want to try the fix
>>>>>> now, you can also build on the current “release-1.1” branch, which 
>>>>>> already
>>>>>> has FLINK-4514 merged.
>>>>>> Sorry for the inconvenience. Let me know if you bump into any other
>>>>>> problems afterwards.
>>>>>>
>>>>>> Best Regards,
>>>>>> Gordon
>>>>>>
>>>>>>
>>>>>> On October 5, 2016 at 2:56:21 AM, Steffen Hausmann (
>>>>>> stef...@hausmann-family.de) wrote:
>>>>>>
>>>>>> Hi there,
>>>>>>
>>>>>> I'm running a Flink 1.1.2 job on EMR and Yarn that is reading events
>>>>>> from a Kinesis stream. However, after a while (the exact duration
>>>>>> varies
>>>>>> and is in the order of minutes) the Kinesis source doesn't emit any
>>>>>> further events and hence Flink doesn't produce any further output.
>>>>>> Eventually, an ExpiredIteratorException occurs in one of the task,
>>>>>> causing the entire job to fail:
>>>>>>
>>>>>> > com.amazonaws.services.kinesis.model.ExpiredIteratorException:
>>>>>> Iterator expired. The iterator was created at time Mon Oct 03 18:40:30 
>>>>>> UTC
>>>>>> 2016 while right now it is Mon Oct 03 18:45:33 UTC 2016 which is further 
>>>>>> in
>>>>>> the future than the tolerated delay of 300000 milliseconds. (Service:
>>>>>> AmazonKinesis; Status Code: 400; Error Code: ExpiredIteratorException;
>>>>>> Request ID: dace9532-9031-54bc-8aa2-3cbfb136d590)
>>>>>>
>>>>>> This seems to be related to FLINK-4514, which is marked as resovled
>>>>>> for
>>>>>> Flink 1.1.2. In contrast to what is describe in the ticket, the job
>>>>>> I'm
>>>>>> running isn't suspended but hangs just a few minutes after the job has
>>>>>> been started.
>>>>>>
>>>>>> I've attached a log file showing the described behavior.
>>>>>>
>>>>>> Any idea what may be wrong?
>>>>>>
>>>>>> Thanks,
>>>>>> Steffen
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to