Thanks a lot for your suggestion. I’ll dig into it and update for the
mailing list if I find anything useful.

Padarn

On Wed, 6 Mar 2019 at 6:03 PM, Piotr Nowojski <pi...@ververica.com> wrote:

> Re-adding user mailing list.
>
>
> Hi,
>
> If it is a GC issue, only GC logs or some JVM memory profilers (like
> Oracle’s Mission Control) can lead you to the solution. Once you confirm
> that it’s a GC issue, there are numerous resources online how to analyse
> the cause of the problem. For that, it is difficult to use CPU
> profiling/Flink Metrics, since GC issues caused by one thread, can cause
> performance bottlenecks in other unrelated places.
>
> If that’s not a GC issue, you can use Flink metrics (like number of
> buffered input/output data) to find Task that’s causing a bottleneck. Then
> you can use CPU profiler to analyse why is that happening.
>
> Piotrek
>
> On 6 Mar 2019, at 02:52, Padarn Wilson <pad...@gmail.com> wrote:
>
> Hi Piotr,
>
> Thanks for your feedback. Makes sense about the checkpoint barriers - this
> definitely could be the cause of a problem.
>
> I would advice profiling your job to find out what’s going on.
>
>
> Agreed. Outside of inspecting the Flink metrics, do you have suggestions
> for tools with which to do this?
>
> The main thing I'm trying to pin down is:
> 1) Is it the downstream processing from the expansion of records that
> causes a problem, or
> 2) Is is the shuffle of all the records after the expansion which is
> taking a large time - if so, is there anything I can do to mitigate this
> other than trying to ensure less shuffle.
>
> Thanks,
> Padarn
>
>
> On Tue, Mar 5, 2019 at 7:08 PM Piotr Nowojski <pi...@ververica.com> wrote:
>
>> Hi,
>>
>> Do you mind elaborating on this? What technology would you propose as an
>> alternative, and why would this increase checkpointing time?
>>
>>
>> The problem is that when Flink starts checkpoint and inject checkpoint
>> barriers, those checkpoint barriers travel through the Job Graph. The
>> quicker they can do that the better. How fast does it take depends on the
>> amount of buffered data before checkpoint barriers (currently all of such
>> records must be processed before checkpoint barrier is passed down stream).
>> The more buffered records and the more time it takes to process those
>> records, the longer the checkpoint take time. Obviously if one stage in the
>> job is multiplying the amount of records, it can in a way multiply the
>> amount of “buffered work” that needs to be processed before checkpoint
>> barriers pass through.
>>
>> However it might not be the case for you. To analyse what’s going on you
>> would need to look at various Flink metrics, like checkpoint times, back
>> pressured tasks, state of the output/input buffers of the tasks, etc.
>> However #2, those are secondary issues. First of all you should try to pin
>> point the cause of long GC pauses. If it comes from your code, you should
>> fix this first. If that either isn’t the issue or doesn’t solve it,
>> generally speaking I would advice profiling your job to find out what’s
>> going on.
>>
>> Piotrek
>>
>> On 5 Mar 2019, at 02:00, Padarn Wilson <pad...@gmail.com> wrote:
>>
>> Hi Piotr,
>>
>> Thanks for your response. I am using Flink 1.7.1 (intend to upgrade to
>> 1.7.2 shortly - there is some S3 fix I'd like to take advantage of).
>>
>> Generally speaking Flink might not the best if you have records fan out,
>>> this may significantly increase checkpointing time.
>>
>>
>> Do you mind elaborating on this? What technology would you propose as an
>> alternative, and why would this increase checkpointing time?
>>
>> However you might want to first identify what’s causing long GC times.
>>>
>>
>> My current plan is to try and enable GC logs and see if I can get
>> something meaningful from them.
>>
>> Thanks a lot,
>> Padarn
>>
>>
>> On Mon, Mar 4, 2019 at 7:16 PM Piotr Nowojski <pi...@ververica.com>
>> wrote:
>>
>>> Hi,
>>>
>>> What Flink version are you using?
>>>
>>> Generally speaking Flink might not the best if you have records fan out,
>>> this may significantly increase checkpointing time.
>>>
>>> However you might want to first identify what’s causing long GC times.
>>> If there are long GC pause, this should be the first thing to fix.
>>>
>>> Piotrek
>>>
>>> On 2 Mar 2019, at 08:19, Padarn Wilson <pad...@gmail.com> wrote:
>>>
>>> Hi all again - following up on this I think I've identified my problem
>>> as being something else, but would appreciate if anyone can offer advice.
>>>
>>> After running my stream from sometime, I see that my garbage collector
>>> for old generation starts to take a very long time:
>>> <Screen Shot 2019-03-02 at 3.01.57 PM.png>
>>> here the* purple line is young generation time*, this is ever
>>> increasing, but grows slowly, while the *blue is old generation*.
>>> This in itself is not a problem, but as soon as the next checkpoint is
>>> triggered after this happens you see the following:
>>> <Screen Shot 2019-03-02 at 3.02.48 PM.png>
>>> It looks like the checkpoint hits a cap, but this is only because the
>>> checkpoints start to timeout and fail (these are the alignment time per
>>> operator)
>>>
>>> I do notice that my state is growing quite larger over time, but I don't
>>> have a good understanding of what would cause this to happen with the JVM
>>> old generation metric, which appears to be the leading metric before a
>>> problem is noticed. Other metrics such as network buffers also show that at
>>> the checkpoint time things start to go haywire and the situation never
>>> recovers.
>>>
>>> Thanks
>>>
>>> On Thu, Feb 28, 2019 at 5:50 PM Padarn Wilson <pad...@gmail.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I'm trying to process many records, and I have an expensive operation
>>>> I'm trying to optimize. Simplified it is something like:
>>>>
>>>> Data: (key1, count, time)
>>>>
>>>> Source -> Map(x -> (x, newKeyList(x.key1))
>>>>             -> Flatmap(x -> x._2.map(y => (y, x._1.count, x._1.time))
>>>>             -> Keyby(_.key1).TublingWindow().apply..
>>>>             -> Sink
>>>>
>>>> In the Map -> Flatmap, what is happening is that each key is mapping to
>>>> a set of keys, and then this is set as the new key. This effectively
>>>> increase the size of the stream by 16x
>>>>
>>>> What I am trying to figure out is how to set the parallelism of my
>>>> operators. I see in some comments that people suggest your source, sink and
>>>> aggregation should have different parallelism, but I'm not clear on exactly
>>>> why, or what this means for CPU utilization.
>>>> (see for example
>>>> https://stackoverflow.com/questions/48317438/unable-to-achieve-high-cpu-utilization-with-flink-and-gelly
>>>> )
>>>>
>>>> Also, it isn't clear to me the best way to handle this increase in data
>>>> within the stream itself.
>>>>
>>>> Thanks
>>>>
>>>
>>>
>>
>

Reply via email to