Hi Scott & Stephan,

The problem has happened a couple more times since yesterday, it's very
strange as my job was running fine for over a week before this started
happening. I find that if I restart the job (and restore from the last
checkpoint) it runs fine for a while (couple of hours) before breaking
again.

@Scott thanks, I'll try testing with the upgraded versions, though since my
job was running fine for over a week it feels like there might be something
else going on here.

@Stephan I see, my sink is a Kafka topic. I only have two nodes in my Kafka
cluster and CPU and memory usage seems normal on both nodes. I can't see
anything bad in the task manager logs relating to the Kafka producer
either. I do have a fairly large state (20GB) but I'm using the latest
RocksDB state backend (with asynchronous checkpointing). I'm not sure what
else I can do to investigate this, but let me know if you have any more
ideas!

Thanks,
Josh


On Thu, Nov 3, 2016 at 6:27 PM, Stephan Ewen <se...@apache.org> wrote:

> Is it possible that you have stalls in your topology?
>
> Reasons could be:
>
>   - The data sink blocks or becomes slow for some periods (where are you
> sending the data to?)
>
>   - If you are using large state and a state backend that only supports
> synchronous checkpointing, there may be a delay introduced by the checkpoint
>
>
> On Thu, Nov 3, 2016 at 7:21 PM, Scott Kidder <kidder.sc...@gmail.com>
> wrote:
>
>> Hi Steffan & Josh,
>>
>> For what it's worth, I've been using the Kinesis connector with very good
>> results on Flink 1.1.2 and 1.1.3. I updated the Flink Kinesis connector KCL
>> and AWS SDK dependencies to the following versions:
>>
>> aws.sdk.version: 1.11.34
>> aws.kinesis-kcl.version: 1.7.0
>>
>> My customizations are visible in this commit on my fork:
>> https://github.com/apache/flink/commit/6d69f99d7cd52b3c2f039
>> cb4d37518859e159b32
>>
>> It might be worth testing with newer AWS SDK & KCL libraries to see if
>> the problem persists.
>>
>> Best,
>>
>> --Scott Kidder
>>
>>
>> On Thu, Nov 3, 2016 at 7:08 AM, Josh <jof...@gmail.com> wrote:
>>
>>> Hi Gordon,
>>>
>>> Thanks for the fast reply!
>>> You're right about the expired iterator exception occurring just before
>>> each spike. I can't see any signs of long GC on the task managers... CPU
>>> has been <15% the whole time when the spikes were taking place and I can't
>>> see anything unusual in the task manager logs.
>>>
>>> But actually I just noticed that the Flink UI showed no successful
>>> checkpoints during the time of the problem even though my checkpoint
>>> interval is 15 minutes. So I guess this is probably some kind of Flink
>>> problem rather than a problem with the Kinesis consumer. Unfortunately I
>>> can't find anything useful in the logs so not sure what happened!
>>>
>>> Josh
>>>
>>>
>>>
>>> On Thu, Nov 3, 2016 at 12:44 PM, Tzu-Li (Gordon) Tai <
>>> tzuli...@apache.org> wrote:
>>>
>>>> Hi Josh,
>>>>
>>>> That warning message was added as part of FLINK-4514. It pops out
>>>> whenever a shard iterator was used after 5 minutes it was returned from
>>>> Kinesis.
>>>> The only time spent between after a shard iterator was returned and
>>>> before it was used to fetch the next batch of records, is on deserializing
>>>> and emitting of the records of the last fetched batch.
>>>> So unless processing of the last fetched batch took over 5 minutes,
>>>> this normally shouldn’t happen.
>>>>
>>>> Have you noticed any sign of long, constant full GC for your Flink task
>>>> managers? From your description and check in code, the only possible guess
>>>> I can come up with now is that
>>>> the source tasks completely seized to be running for a period of time,
>>>> and when it came back, the shard iterator was unexpectedly found to be
>>>> expired. According to the graph you attached,
>>>> when the iterator was refreshed and tasks successfully fetched a few
>>>> more batches, the source tasks again halted, and so on.
>>>> So you should see that same warning message right before every small
>>>> peak within the graph.
>>>>
>>>> Best Regards,
>>>> Gordon
>>>>
>>>>
>>>> On November 3, 2016 at 7:46:42 PM, Josh (jof...@gmail.com) wrote:
>>>>
>>>> Hey Gordon,
>>>>
>>>> I've been using Flink 1.2-SNAPSHOT for the past week (with FLINK-4514)
>>>> with no problems, but yesterday the Kinesis consumer started behaving
>>>> strangely... My Kinesis data stream is fairly constant at around 1.5MB/sec,
>>>> however the Flink Kinesis consumer started to stop consuming for periods of
>>>> time (see the spikes in graph attached which shows data consumed by the
>>>> Flink Kinesis consumer)
>>>>
>>>> Looking in the task manager logs, there are no exceptions however there
>>>> is this log message which I believe is related to the problem:
>>>>
>>>> 2016-11-03 09:27:53,782 WARN  org.apache.flink.streaming.co
>>>> nnectors.kinesis.internals.ShardConsumer  - Encountered an unexpected
>>>> expired iterator AAAAAAAAAAF8OJyh+X3yBnbtzUgIfXv+phS7PK
>>>> ppd7q09/tduXG3lOhCmBGPUOlZul24tzSSM6KjHsQ+AbZY8MThKc
>>>> SvGax/EoOIYoTELYbZmuwY4hgeqUsndxLIM0HL55iejroBV8YFmUmGwHsW8q
>>>> kHsz//Ci4cxcLrGArHex3n+4E+aoZ9AtgTPEZOBjXY49g+VGsDb0bQN5FJUoUVEfnbupk96ore
>>>> for shard KinesisStreamShard{streamName='stream001', shard='{ShardId:
>>>> shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey:
>>>> 85070511730234615865841151857942042863},SequenceNumberRange:
>>>> {StartingSequenceNumber: 495665429169236488921642479266
>>>> 79091159472198219567464450,}}'}; refreshing the iterator ...
>>>>
>>>> Having restarted the job from my last savepoint, it's consuming the
>>>> stream fine again with no problems.
>>>>
>>>> Do you have any idea what might be causing this, or anything I should
>>>> do to investigate further?
>>>>
>>>> Cheers,
>>>>
>>>> Josh
>>>>
>>>> On Wed, Oct 5, 2016 at 4:55 AM, Tzu-Li (Gordon) Tai <
>>>> tzuli...@apache.org> wrote:
>>>>
>>>>> Hi Steffen,
>>>>>
>>>>> Turns out that FLINK-4514 just missed Flink 1.1.2 and wasn’t included
>>>>> in the release (I’ll update the resolve version in JIRA to 1.1.3, thanks
>>>>> for noticing this!).
>>>>> The Flink community is going to release 1.1.3 asap, which will include
>>>>> the fix.
>>>>> If you don’t want to wait for the release and want to try the fix now,
>>>>> you can also build on the current “release-1.1” branch, which already has
>>>>> FLINK-4514 merged.
>>>>> Sorry for the inconvenience. Let me know if you bump into any other
>>>>> problems afterwards.
>>>>>
>>>>> Best Regards,
>>>>> Gordon
>>>>>
>>>>>
>>>>> On October 5, 2016 at 2:56:21 AM, Steffen Hausmann (
>>>>> stef...@hausmann-family.de) wrote:
>>>>>
>>>>> Hi there,
>>>>>
>>>>> I'm running a Flink 1.1.2 job on EMR and Yarn that is reading events
>>>>> from a Kinesis stream. However, after a while (the exact duration
>>>>> varies
>>>>> and is in the order of minutes) the Kinesis source doesn't emit any
>>>>> further events and hence Flink doesn't produce any further output.
>>>>> Eventually, an ExpiredIteratorException occurs in one of the task,
>>>>> causing the entire job to fail:
>>>>>
>>>>> > com.amazonaws.services.kinesis.model.ExpiredIteratorException:
>>>>> Iterator expired. The iterator was created at time Mon Oct 03 18:40:30 UTC
>>>>> 2016 while right now it is Mon Oct 03 18:45:33 UTC 2016 which is further 
>>>>> in
>>>>> the future than the tolerated delay of 300000 milliseconds. (Service:
>>>>> AmazonKinesis; Status Code: 400; Error Code: ExpiredIteratorException;
>>>>> Request ID: dace9532-9031-54bc-8aa2-3cbfb136d590)
>>>>>
>>>>> This seems to be related to FLINK-4514, which is marked as resovled for
>>>>> Flink 1.1.2. In contrast to what is describe in the ticket, the job I'm
>>>>> running isn't suspended but hangs just a few minutes after the job has
>>>>> been started.
>>>>>
>>>>> I've attached a log file showing the described behavior.
>>>>>
>>>>> Any idea what may be wrong?
>>>>>
>>>>> Thanks,
>>>>> Steffen
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to