Thanks a lot for your suggestion. I’ll dig into it and update for the mailing list if I find anything useful.
Padarn On Wed, 6 Mar 2019 at 6:03 PM, Piotr Nowojski <pi...@ververica.com> wrote: > Re-adding user mailing list. > > > Hi, > > If it is a GC issue, only GC logs or some JVM memory profilers (like > Oracle’s Mission Control) can lead you to the solution. Once you confirm > that it’s a GC issue, there are numerous resources online how to analyse > the cause of the problem. For that, it is difficult to use CPU > profiling/Flink Metrics, since GC issues caused by one thread, can cause > performance bottlenecks in other unrelated places. > > If that’s not a GC issue, you can use Flink metrics (like number of > buffered input/output data) to find Task that’s causing a bottleneck. Then > you can use CPU profiler to analyse why is that happening. > > Piotrek > > On 6 Mar 2019, at 02:52, Padarn Wilson <pad...@gmail.com> wrote: > > Hi Piotr, > > Thanks for your feedback. Makes sense about the checkpoint barriers - this > definitely could be the cause of a problem. > > I would advice profiling your job to find out what’s going on. > > > Agreed. Outside of inspecting the Flink metrics, do you have suggestions > for tools with which to do this? > > The main thing I'm trying to pin down is: > 1) Is it the downstream processing from the expansion of records that > causes a problem, or > 2) Is is the shuffle of all the records after the expansion which is > taking a large time - if so, is there anything I can do to mitigate this > other than trying to ensure less shuffle. > > Thanks, > Padarn > > > On Tue, Mar 5, 2019 at 7:08 PM Piotr Nowojski <pi...@ververica.com> wrote: > >> Hi, >> >> Do you mind elaborating on this? What technology would you propose as an >> alternative, and why would this increase checkpointing time? >> >> >> The problem is that when Flink starts checkpoint and inject checkpoint >> barriers, those checkpoint barriers travel through the Job Graph. The >> quicker they can do that the better. How fast does it take depends on the >> amount of buffered data before checkpoint barriers (currently all of such >> records must be processed before checkpoint barrier is passed down stream). >> The more buffered records and the more time it takes to process those >> records, the longer the checkpoint take time. Obviously if one stage in the >> job is multiplying the amount of records, it can in a way multiply the >> amount of “buffered work” that needs to be processed before checkpoint >> barriers pass through. >> >> However it might not be the case for you. To analyse what’s going on you >> would need to look at various Flink metrics, like checkpoint times, back >> pressured tasks, state of the output/input buffers of the tasks, etc. >> However #2, those are secondary issues. First of all you should try to pin >> point the cause of long GC pauses. If it comes from your code, you should >> fix this first. If that either isn’t the issue or doesn’t solve it, >> generally speaking I would advice profiling your job to find out what’s >> going on. >> >> Piotrek >> >> On 5 Mar 2019, at 02:00, Padarn Wilson <pad...@gmail.com> wrote: >> >> Hi Piotr, >> >> Thanks for your response. I am using Flink 1.7.1 (intend to upgrade to >> 1.7.2 shortly - there is some S3 fix I'd like to take advantage of). >> >> Generally speaking Flink might not the best if you have records fan out, >>> this may significantly increase checkpointing time. >> >> >> Do you mind elaborating on this? What technology would you propose as an >> alternative, and why would this increase checkpointing time? >> >> However you might want to first identify what’s causing long GC times. >>> >> >> My current plan is to try and enable GC logs and see if I can get >> something meaningful from them. >> >> Thanks a lot, >> Padarn >> >> >> On Mon, Mar 4, 2019 at 7:16 PM Piotr Nowojski <pi...@ververica.com> >> wrote: >> >>> Hi, >>> >>> What Flink version are you using? >>> >>> Generally speaking Flink might not the best if you have records fan out, >>> this may significantly increase checkpointing time. >>> >>> However you might want to first identify what’s causing long GC times. >>> If there are long GC pause, this should be the first thing to fix. >>> >>> Piotrek >>> >>> On 2 Mar 2019, at 08:19, Padarn Wilson <pad...@gmail.com> wrote: >>> >>> Hi all again - following up on this I think I've identified my problem >>> as being something else, but would appreciate if anyone can offer advice. >>> >>> After running my stream from sometime, I see that my garbage collector >>> for old generation starts to take a very long time: >>> <Screen Shot 2019-03-02 at 3.01.57 PM.png> >>> here the* purple line is young generation time*, this is ever >>> increasing, but grows slowly, while the *blue is old generation*. >>> This in itself is not a problem, but as soon as the next checkpoint is >>> triggered after this happens you see the following: >>> <Screen Shot 2019-03-02 at 3.02.48 PM.png> >>> It looks like the checkpoint hits a cap, but this is only because the >>> checkpoints start to timeout and fail (these are the alignment time per >>> operator) >>> >>> I do notice that my state is growing quite larger over time, but I don't >>> have a good understanding of what would cause this to happen with the JVM >>> old generation metric, which appears to be the leading metric before a >>> problem is noticed. Other metrics such as network buffers also show that at >>> the checkpoint time things start to go haywire and the situation never >>> recovers. >>> >>> Thanks >>> >>> On Thu, Feb 28, 2019 at 5:50 PM Padarn Wilson <pad...@gmail.com> wrote: >>> >>>> Hi all, >>>> >>>> I'm trying to process many records, and I have an expensive operation >>>> I'm trying to optimize. Simplified it is something like: >>>> >>>> Data: (key1, count, time) >>>> >>>> Source -> Map(x -> (x, newKeyList(x.key1)) >>>> -> Flatmap(x -> x._2.map(y => (y, x._1.count, x._1.time)) >>>> -> Keyby(_.key1).TublingWindow().apply.. >>>> -> Sink >>>> >>>> In the Map -> Flatmap, what is happening is that each key is mapping to >>>> a set of keys, and then this is set as the new key. This effectively >>>> increase the size of the stream by 16x >>>> >>>> What I am trying to figure out is how to set the parallelism of my >>>> operators. I see in some comments that people suggest your source, sink and >>>> aggregation should have different parallelism, but I'm not clear on exactly >>>> why, or what this means for CPU utilization. >>>> (see for example >>>> https://stackoverflow.com/questions/48317438/unable-to-achieve-high-cpu-utilization-with-flink-and-gelly >>>> ) >>>> >>>> Also, it isn't clear to me the best way to handle this increase in data >>>> within the stream itself. >>>> >>>> Thanks >>>> >>> >>> >> >