No problem and it’s good to hear that you managed to solve the problem.

Piotrek 

> On 23 Mar 2019, at 12:49, Padarn Wilson <pad...@gmail.com> wrote:
> 
> Well.. it turned out I was registering millions of timers by accident, which 
> was why garbage collection was blowing up. Oops. Thanks for your help again.
> 
> On Wed, Mar 6, 2019 at 9:44 PM Padarn Wilson <pad...@gmail.com 
> <mailto:pad...@gmail.com>> wrote:
> Thanks a lot for your suggestion. I’ll dig into it and update for the mailing 
> list if I find anything useful.
> 
> Padarn
> 
> On Wed, 6 Mar 2019 at 6:03 PM, Piotr Nowojski <pi...@ververica.com 
> <mailto:pi...@ververica.com>> wrote:
> Re-adding user mailing list.
> 
> 
> Hi,
> 
> If it is a GC issue, only GC logs or some JVM memory profilers (like Oracle’s 
> Mission Control) can lead you to the solution. Once you confirm that it’s a 
> GC issue, there are numerous resources online how to analyse the cause of the 
> problem. For that, it is difficult to use CPU profiling/Flink Metrics, since 
> GC issues caused by one thread, can cause performance bottlenecks in other 
> unrelated places.
> 
> If that’s not a GC issue, you can use Flink metrics (like number of buffered 
> input/output data) to find Task that’s causing a bottleneck. Then you can use 
> CPU profiler to analyse why is that happening.
> 
> Piotrek
> 
>> On 6 Mar 2019, at 02:52, Padarn Wilson <pad...@gmail.com 
>> <mailto:pad...@gmail.com>> wrote:
>> 
>> Hi Piotr,
>> 
>> Thanks for your feedback. Makes sense about the checkpoint barriers - this 
>> definitely could be the cause of a problem.
>> 
>> I would advice profiling your job to find out what’s going on.
>> 
>> Agreed. Outside of inspecting the Flink metrics, do you have suggestions for 
>> tools with which to do this?
>> 
>> The main thing I'm trying to pin down is:
>> 1) Is it the downstream processing from the expansion of records that causes 
>> a problem, or 
>> 2) Is is the shuffle of all the records after the expansion which is taking 
>> a large time - if so, is there anything I can do to mitigate this other than 
>> trying to ensure less shuffle.
>> 
>> Thanks,
>> Padarn
>> 
>> 
>> On Tue, Mar 5, 2019 at 7:08 PM Piotr Nowojski <pi...@ververica.com 
>> <mailto:pi...@ververica.com>> wrote:
>> Hi,
>> 
>>> Do you mind elaborating on this? What technology would you propose as an 
>>> alternative, and why would this increase checkpointing time? 
>> 
>> The problem is that when Flink starts checkpoint and inject checkpoint 
>> barriers, those checkpoint barriers travel through the Job Graph. The 
>> quicker they can do that the better. How fast does it take depends on the 
>> amount of buffered data before checkpoint barriers (currently all of such 
>> records must be processed before checkpoint barrier is passed down stream). 
>> The more buffered records and the more time it takes to process those 
>> records, the longer the checkpoint take time. Obviously if one stage in the 
>> job is multiplying the amount of records, it can in a way multiply the 
>> amount of “buffered work” that needs to be processed before checkpoint 
>> barriers pass through.
>> 
>> However it might not be the case for you. To analyse what’s going on you 
>> would need to look at various Flink metrics, like checkpoint times, back 
>> pressured tasks, state of the output/input buffers of the tasks, etc. 
>> However #2, those are secondary issues. First of all you should try to pin 
>> point the cause of long GC pauses. If it comes from your code, you should 
>> fix this first. If that either isn’t the issue or doesn’t solve it, 
>> generally speaking I would advice profiling your job to find out what’s 
>> going on.
>> 
>> Piotrek
>> 
>>> On 5 Mar 2019, at 02:00, Padarn Wilson <pad...@gmail.com 
>>> <mailto:pad...@gmail.com>> wrote:
>>> 
>>> Hi Piotr,
>>> 
>>> Thanks for your response. I am using Flink 1.7.1 (intend to upgrade to 
>>> 1.7.2 shortly - there is some S3 fix I'd like to take advantage of).
>>> 
>>> Generally speaking Flink might not the best if you have records fan out, 
>>> this may significantly increase checkpointing time. 
>>> 
>>> Do you mind elaborating on this? What technology would you propose as an 
>>> alternative, and why would this increase checkpointing time? 
>>> 
>>> However you might want to first identify what’s causing long GC times.
>>> 
>>> My current plan is to try and enable GC logs and see if I can get something 
>>> meaningful from them. 
>>> 
>>> Thanks a lot,
>>> Padarn 
>>> 
>>> 
>>> On Mon, Mar 4, 2019 at 7:16 PM Piotr Nowojski <pi...@ververica.com 
>>> <mailto:pi...@ververica.com>> wrote:
>>> Hi,
>>> 
>>> What Flink version are you using?
>>> 
>>> Generally speaking Flink might not the best if you have records fan out, 
>>> this may significantly increase checkpointing time. 
>>> 
>>> However you might want to first identify what’s causing long GC times. If 
>>> there are long GC pause, this should be the first thing to fix.
>>> 
>>> Piotrek
>>> 
>>>> On 2 Mar 2019, at 08:19, Padarn Wilson <pad...@gmail.com 
>>>> <mailto:pad...@gmail.com>> wrote:
>>>> 
>>>> Hi all again - following up on this I think I've identified my problem as 
>>>> being something else, but would appreciate if anyone can offer advice.
>>>> 
>>>> After running my stream from sometime, I see that my garbage collector for 
>>>> old generation starts to take a very long time:
>>>> <Screen Shot 2019-03-02 at 3.01.57 PM.png>
>>>> here the purple line is young generation time, this is ever increasing, 
>>>> but grows slowly, while the blue is old generation.
>>>> This in itself is not a problem, but as soon as the next checkpoint is 
>>>> triggered after this happens you see the following:
>>>> <Screen Shot 2019-03-02 at 3.02.48 PM.png>
>>>> It looks like the checkpoint hits a cap, but this is only because the 
>>>> checkpoints start to timeout and fail (these are the alignment time per 
>>>> operator)
>>>> 
>>>> I do notice that my state is growing quite larger over time, but I don't 
>>>> have a good understanding of what would cause this to happen with the JVM 
>>>> old generation metric, which appears to be the leading metric before a 
>>>> problem is noticed. Other metrics such as network buffers also show that 
>>>> at the checkpoint time things start to go haywire and the situation never 
>>>> recovers.
>>>> 
>>>> Thanks
>>>> 
>>>> On Thu, Feb 28, 2019 at 5:50 PM Padarn Wilson <pad...@gmail.com 
>>>> <mailto:pad...@gmail.com>> wrote:
>>>> Hi all,
>>>> 
>>>> I'm trying to process many records, and I have an expensive operation I'm 
>>>> trying to optimize. Simplified it is something like:
>>>> 
>>>> Data: (key1, count, time)
>>>> 
>>>> Source -> Map(x -> (x, newKeyList(x.key1))
>>>>             -> Flatmap(x -> x._2.map(y => (y, x._1.count, x._1.time))
>>>>             -> Keyby(_.key1).TublingWindow().apply..
>>>>             -> Sink
>>>> 
>>>> In the Map -> Flatmap, what is happening is that each key is mapping to a 
>>>> set of keys, and then this is set as the new key. This effectively 
>>>> increase the size of the stream by 16x
>>>> 
>>>> What I am trying to figure out is how to set the parallelism of my 
>>>> operators. I see in some comments that people suggest your source, sink 
>>>> and aggregation should have different parallelism, but I'm not clear on 
>>>> exactly why, or what this means for CPU utilization. 
>>>> (see for example 
>>>> https://stackoverflow.com/questions/48317438/unable-to-achieve-high-cpu-utilization-with-flink-and-gelly
>>>>  
>>>> <https://stackoverflow.com/questions/48317438/unable-to-achieve-high-cpu-utilization-with-flink-and-gelly>)
>>>> 
>>>> Also, it isn't clear to me the best way to handle this increase in data 
>>>> within the stream itself.
>>>> 
>>>> Thanks
>>> 
>> 
> 

Reply via email to