Well.. it turned out I was registering millions of timers by accident, which was why garbage collection was blowing up. Oops. Thanks for your help again.
On Wed, Mar 6, 2019 at 9:44 PM Padarn Wilson <pad...@gmail.com> wrote: > Thanks a lot for your suggestion. I’ll dig into it and update for the > mailing list if I find anything useful. > > Padarn > > On Wed, 6 Mar 2019 at 6:03 PM, Piotr Nowojski <pi...@ververica.com> wrote: > >> Re-adding user mailing list. >> >> >> Hi, >> >> If it is a GC issue, only GC logs or some JVM memory profilers (like >> Oracle’s Mission Control) can lead you to the solution. Once you confirm >> that it’s a GC issue, there are numerous resources online how to analyse >> the cause of the problem. For that, it is difficult to use CPU >> profiling/Flink Metrics, since GC issues caused by one thread, can cause >> performance bottlenecks in other unrelated places. >> >> If that’s not a GC issue, you can use Flink metrics (like number of >> buffered input/output data) to find Task that’s causing a bottleneck. Then >> you can use CPU profiler to analyse why is that happening. >> >> Piotrek >> >> On 6 Mar 2019, at 02:52, Padarn Wilson <pad...@gmail.com> wrote: >> >> Hi Piotr, >> >> Thanks for your feedback. Makes sense about the checkpoint barriers - >> this definitely could be the cause of a problem. >> >> I would advice profiling your job to find out what’s going on. >> >> >> Agreed. Outside of inspecting the Flink metrics, do you have suggestions >> for tools with which to do this? >> >> The main thing I'm trying to pin down is: >> 1) Is it the downstream processing from the expansion of records that >> causes a problem, or >> 2) Is is the shuffle of all the records after the expansion which is >> taking a large time - if so, is there anything I can do to mitigate this >> other than trying to ensure less shuffle. >> >> Thanks, >> Padarn >> >> >> On Tue, Mar 5, 2019 at 7:08 PM Piotr Nowojski <pi...@ververica.com> >> wrote: >> >>> Hi, >>> >>> Do you mind elaborating on this? What technology would you propose as an >>> alternative, and why would this increase checkpointing time? >>> >>> >>> The problem is that when Flink starts checkpoint and inject checkpoint >>> barriers, those checkpoint barriers travel through the Job Graph. The >>> quicker they can do that the better. How fast does it take depends on the >>> amount of buffered data before checkpoint barriers (currently all of such >>> records must be processed before checkpoint barrier is passed down stream). >>> The more buffered records and the more time it takes to process those >>> records, the longer the checkpoint take time. Obviously if one stage in the >>> job is multiplying the amount of records, it can in a way multiply the >>> amount of “buffered work” that needs to be processed before checkpoint >>> barriers pass through. >>> >>> However it might not be the case for you. To analyse what’s going on you >>> would need to look at various Flink metrics, like checkpoint times, back >>> pressured tasks, state of the output/input buffers of the tasks, etc. >>> However #2, those are secondary issues. First of all you should try to pin >>> point the cause of long GC pauses. If it comes from your code, you should >>> fix this first. If that either isn’t the issue or doesn’t solve it, >>> generally speaking I would advice profiling your job to find out what’s >>> going on. >>> >>> Piotrek >>> >>> On 5 Mar 2019, at 02:00, Padarn Wilson <pad...@gmail.com> wrote: >>> >>> Hi Piotr, >>> >>> Thanks for your response. I am using Flink 1.7.1 (intend to upgrade to >>> 1.7.2 shortly - there is some S3 fix I'd like to take advantage of). >>> >>> Generally speaking Flink might not the best if you have records fan out, >>>> this may significantly increase checkpointing time. >>> >>> >>> Do you mind elaborating on this? What technology would you propose as an >>> alternative, and why would this increase checkpointing time? >>> >>> However you might want to first identify what’s causing long GC times. >>>> >>> >>> My current plan is to try and enable GC logs and see if I can get >>> something meaningful from them. >>> >>> Thanks a lot, >>> Padarn >>> >>> >>> On Mon, Mar 4, 2019 at 7:16 PM Piotr Nowojski <pi...@ververica.com> >>> wrote: >>> >>>> Hi, >>>> >>>> What Flink version are you using? >>>> >>>> Generally speaking Flink might not the best if you have records fan >>>> out, this may significantly increase checkpointing time. >>>> >>>> However you might want to first identify what’s causing long GC times. >>>> If there are long GC pause, this should be the first thing to fix. >>>> >>>> Piotrek >>>> >>>> On 2 Mar 2019, at 08:19, Padarn Wilson <pad...@gmail.com> wrote: >>>> >>>> Hi all again - following up on this I think I've identified my problem >>>> as being something else, but would appreciate if anyone can offer advice. >>>> >>>> After running my stream from sometime, I see that my garbage collector >>>> for old generation starts to take a very long time: >>>> <Screen Shot 2019-03-02 at 3.01.57 PM.png> >>>> here the* purple line is young generation time*, this is ever >>>> increasing, but grows slowly, while the *blue is old generation*. >>>> This in itself is not a problem, but as soon as the next checkpoint is >>>> triggered after this happens you see the following: >>>> <Screen Shot 2019-03-02 at 3.02.48 PM.png> >>>> It looks like the checkpoint hits a cap, but this is only because the >>>> checkpoints start to timeout and fail (these are the alignment time per >>>> operator) >>>> >>>> I do notice that my state is growing quite larger over time, but I >>>> don't have a good understanding of what would cause this to happen with the >>>> JVM old generation metric, which appears to be the leading metric before a >>>> problem is noticed. Other metrics such as network buffers also show that at >>>> the checkpoint time things start to go haywire and the situation never >>>> recovers. >>>> >>>> Thanks >>>> >>>> On Thu, Feb 28, 2019 at 5:50 PM Padarn Wilson <pad...@gmail.com> wrote: >>>> >>>>> Hi all, >>>>> >>>>> I'm trying to process many records, and I have an expensive operation >>>>> I'm trying to optimize. Simplified it is something like: >>>>> >>>>> Data: (key1, count, time) >>>>> >>>>> Source -> Map(x -> (x, newKeyList(x.key1)) >>>>> -> Flatmap(x -> x._2.map(y => (y, x._1.count, x._1.time)) >>>>> -> Keyby(_.key1).TublingWindow().apply.. >>>>> -> Sink >>>>> >>>>> In the Map -> Flatmap, what is happening is that each key is mapping >>>>> to a set of keys, and then this is set as the new key. This effectively >>>>> increase the size of the stream by 16x >>>>> >>>>> What I am trying to figure out is how to set the parallelism of my >>>>> operators. I see in some comments that people suggest your source, sink >>>>> and >>>>> aggregation should have different parallelism, but I'm not clear on >>>>> exactly >>>>> why, or what this means for CPU utilization. >>>>> (see for example >>>>> https://stackoverflow.com/questions/48317438/unable-to-achieve-high-cpu-utilization-with-flink-and-gelly >>>>> ) >>>>> >>>>> Also, it isn't clear to me the best way to handle this increase in >>>>> data within the stream itself. >>>>> >>>>> Thanks >>>>> >>>> >>>> >>> >>