No problem and it’s good to hear that you managed to solve the problem. Piotrek
> On 23 Mar 2019, at 12:49, Padarn Wilson <pad...@gmail.com> wrote: > > Well.. it turned out I was registering millions of timers by accident, which > was why garbage collection was blowing up. Oops. Thanks for your help again. > > On Wed, Mar 6, 2019 at 9:44 PM Padarn Wilson <pad...@gmail.com > <mailto:pad...@gmail.com>> wrote: > Thanks a lot for your suggestion. I’ll dig into it and update for the mailing > list if I find anything useful. > > Padarn > > On Wed, 6 Mar 2019 at 6:03 PM, Piotr Nowojski <pi...@ververica.com > <mailto:pi...@ververica.com>> wrote: > Re-adding user mailing list. > > > Hi, > > If it is a GC issue, only GC logs or some JVM memory profilers (like Oracle’s > Mission Control) can lead you to the solution. Once you confirm that it’s a > GC issue, there are numerous resources online how to analyse the cause of the > problem. For that, it is difficult to use CPU profiling/Flink Metrics, since > GC issues caused by one thread, can cause performance bottlenecks in other > unrelated places. > > If that’s not a GC issue, you can use Flink metrics (like number of buffered > input/output data) to find Task that’s causing a bottleneck. Then you can use > CPU profiler to analyse why is that happening. > > Piotrek > >> On 6 Mar 2019, at 02:52, Padarn Wilson <pad...@gmail.com >> <mailto:pad...@gmail.com>> wrote: >> >> Hi Piotr, >> >> Thanks for your feedback. Makes sense about the checkpoint barriers - this >> definitely could be the cause of a problem. >> >> I would advice profiling your job to find out what’s going on. >> >> Agreed. Outside of inspecting the Flink metrics, do you have suggestions for >> tools with which to do this? >> >> The main thing I'm trying to pin down is: >> 1) Is it the downstream processing from the expansion of records that causes >> a problem, or >> 2) Is is the shuffle of all the records after the expansion which is taking >> a large time - if so, is there anything I can do to mitigate this other than >> trying to ensure less shuffle. >> >> Thanks, >> Padarn >> >> >> On Tue, Mar 5, 2019 at 7:08 PM Piotr Nowojski <pi...@ververica.com >> <mailto:pi...@ververica.com>> wrote: >> Hi, >> >>> Do you mind elaborating on this? What technology would you propose as an >>> alternative, and why would this increase checkpointing time? >> >> The problem is that when Flink starts checkpoint and inject checkpoint >> barriers, those checkpoint barriers travel through the Job Graph. The >> quicker they can do that the better. How fast does it take depends on the >> amount of buffered data before checkpoint barriers (currently all of such >> records must be processed before checkpoint barrier is passed down stream). >> The more buffered records and the more time it takes to process those >> records, the longer the checkpoint take time. Obviously if one stage in the >> job is multiplying the amount of records, it can in a way multiply the >> amount of “buffered work” that needs to be processed before checkpoint >> barriers pass through. >> >> However it might not be the case for you. To analyse what’s going on you >> would need to look at various Flink metrics, like checkpoint times, back >> pressured tasks, state of the output/input buffers of the tasks, etc. >> However #2, those are secondary issues. First of all you should try to pin >> point the cause of long GC pauses. If it comes from your code, you should >> fix this first. If that either isn’t the issue or doesn’t solve it, >> generally speaking I would advice profiling your job to find out what’s >> going on. >> >> Piotrek >> >>> On 5 Mar 2019, at 02:00, Padarn Wilson <pad...@gmail.com >>> <mailto:pad...@gmail.com>> wrote: >>> >>> Hi Piotr, >>> >>> Thanks for your response. I am using Flink 1.7.1 (intend to upgrade to >>> 1.7.2 shortly - there is some S3 fix I'd like to take advantage of). >>> >>> Generally speaking Flink might not the best if you have records fan out, >>> this may significantly increase checkpointing time. >>> >>> Do you mind elaborating on this? What technology would you propose as an >>> alternative, and why would this increase checkpointing time? >>> >>> However you might want to first identify what’s causing long GC times. >>> >>> My current plan is to try and enable GC logs and see if I can get something >>> meaningful from them. >>> >>> Thanks a lot, >>> Padarn >>> >>> >>> On Mon, Mar 4, 2019 at 7:16 PM Piotr Nowojski <pi...@ververica.com >>> <mailto:pi...@ververica.com>> wrote: >>> Hi, >>> >>> What Flink version are you using? >>> >>> Generally speaking Flink might not the best if you have records fan out, >>> this may significantly increase checkpointing time. >>> >>> However you might want to first identify what’s causing long GC times. If >>> there are long GC pause, this should be the first thing to fix. >>> >>> Piotrek >>> >>>> On 2 Mar 2019, at 08:19, Padarn Wilson <pad...@gmail.com >>>> <mailto:pad...@gmail.com>> wrote: >>>> >>>> Hi all again - following up on this I think I've identified my problem as >>>> being something else, but would appreciate if anyone can offer advice. >>>> >>>> After running my stream from sometime, I see that my garbage collector for >>>> old generation starts to take a very long time: >>>> <Screen Shot 2019-03-02 at 3.01.57 PM.png> >>>> here the purple line is young generation time, this is ever increasing, >>>> but grows slowly, while the blue is old generation. >>>> This in itself is not a problem, but as soon as the next checkpoint is >>>> triggered after this happens you see the following: >>>> <Screen Shot 2019-03-02 at 3.02.48 PM.png> >>>> It looks like the checkpoint hits a cap, but this is only because the >>>> checkpoints start to timeout and fail (these are the alignment time per >>>> operator) >>>> >>>> I do notice that my state is growing quite larger over time, but I don't >>>> have a good understanding of what would cause this to happen with the JVM >>>> old generation metric, which appears to be the leading metric before a >>>> problem is noticed. Other metrics such as network buffers also show that >>>> at the checkpoint time things start to go haywire and the situation never >>>> recovers. >>>> >>>> Thanks >>>> >>>> On Thu, Feb 28, 2019 at 5:50 PM Padarn Wilson <pad...@gmail.com >>>> <mailto:pad...@gmail.com>> wrote: >>>> Hi all, >>>> >>>> I'm trying to process many records, and I have an expensive operation I'm >>>> trying to optimize. Simplified it is something like: >>>> >>>> Data: (key1, count, time) >>>> >>>> Source -> Map(x -> (x, newKeyList(x.key1)) >>>> -> Flatmap(x -> x._2.map(y => (y, x._1.count, x._1.time)) >>>> -> Keyby(_.key1).TublingWindow().apply.. >>>> -> Sink >>>> >>>> In the Map -> Flatmap, what is happening is that each key is mapping to a >>>> set of keys, and then this is set as the new key. This effectively >>>> increase the size of the stream by 16x >>>> >>>> What I am trying to figure out is how to set the parallelism of my >>>> operators. I see in some comments that people suggest your source, sink >>>> and aggregation should have different parallelism, but I'm not clear on >>>> exactly why, or what this means for CPU utilization. >>>> (see for example >>>> https://stackoverflow.com/questions/48317438/unable-to-achieve-high-cpu-utilization-with-flink-and-gelly >>>> >>>> <https://stackoverflow.com/questions/48317438/unable-to-achieve-high-cpu-utilization-with-flink-and-gelly>) >>>> >>>> Also, it isn't clear to me the best way to handle this increase in data >>>> within the stream itself. >>>> >>>> Thanks >>> >> >