Hi Josson, Thanks for great investigation and coming back to use. Aljoscha, could you help us here? It looks like you were involved in this original BEAM-3087 issue.
Best, Piotrek pt., 23 paź 2020 o 07:36 Josson Paul <jossonp...@gmail.com> napisał(a): > @Piotr Nowojski <pnowoj...@apache.org> @Nico Kruber <nkru...@apache.org> > > An update. > > I am able to figure out the problem code. A change in the Apache Beam code > is causing this problem. > > > > > > Beam introduced a lock on the “emit” in Unbounded Source. The lock is on > the Flink’s check point lock. Now the same lock is used by Flink’s timer > service to emit the Watermarks. Flink’s timer service is starved to get > hold of the lock and for some reason it never gets that lock. Aftereffect > of this situation is that the ‘WaterMark’ is never emitted by Flink’s > timer service. Because there is no Watermarks flowing through the system, > Sliding Windows are never closed. Data gets accumulated in the Window. > > > > This problem occurs only if we have external lookup calls (like Redis) > happen before the data goes to Sliding Window. Something like below. > > > > KafkaSource à Transforms (Occasional Redis > lookup)->SlidingWindow->Transforms->Kafka Sink > > > > > > > https://github.com/apache/beam/blob/60e0a22ea95921636c392b5aae77cb48196dd700/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L256 > . This is Beam 2.4 and you can see that there is no synchronized block at > line 257 and 270. > > > > > https://github.com/apache/beam/blob/7931ec055e2da7214c82e368ef7d7fd679faaef1/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L264 > . This is Beam 2.15. See the synchronized block introduced in line 264 and > 280. We are using Beam 2.15 and Flink 1.8. > > > > Beam introduced this synchronized block because of this bug. > https://issues.apache.org/jira/browse/BEAM-3087 > > > > After I removed that synchronized keyword everything started working fine > in my application. > > > > What do you guys think about this?. Why does Beam need a Synchronized > block there? > > > > Beam is using this lock -> > > > https://github.com/apache/flink/blob/d54807ba10d0392a60663f030f9fe0bfa1c66754/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java#L282 > > > > Thanks, > > Josson > > On Mon, Sep 14, 2020 at 5:03 AM Piotr Nowojski <pnowoj...@apache.org> > wrote: > >> Hi Josson, >> >> The TM logs that you attached are only from a 5 minutes time period. Are >> you sure they are encompassing the period before the potential failure and >> after the potential failure? It would be also nice if you would provide the >> logs matching to the charts (like the one you were providing in the >> previous messages), to correlate events (spike in latency/GC with some >> timestamp from the logs). >> >> I was not asking necessarily to upgrade to Java9, but an updated/bug >> fixed version of Java8 [1]. >> >> > 1) In Flink 1.4 set up, the data in the Heap is throttled. It never >> goes out of memory whatever be the ingestion rate. our Windows are 5 >> minutes windows. >> > 2) In Flink 1.8 set up, HeapKeyedStateBackend is never throttled and >> fills up fast. When Old-gen space goes beyond 60-70% even the Mixed GC or >> Full GC doesn't reclaim space. >> >> In both cases there is the same mechanism for the backpressure. If a >> task's output runs out of buffers to put produced records, it will block >> the task. It can be that between 1.4 and 1.8, with credit based flow >> control changes, the amount of available buffers for the tasks on your >> setup has grown, so the tasks are backpressuring later. This in turn can >> sometimes mean that at any point of time there is more data buffered on the >> operator's state, like `WindowOperator`. I'm not sure what's the >> best/easiest way how to check this: >> >> 1. the amount of buffered data might be visible via metrics [2][3] >> 2. if you enable DEBUG logs, it should be visible via: >> >> > LOG.debug("Using a local buffer pool with {}-{} buffers", >> numberOfRequiredMemorySegments, maxNumberOfMemorySegments); >> >> entry logged by >> `org.apache.flink.runtime.io.network.buffer.LocalBufferPool`. >> >> Piotrek >> >> [1] https://en.wikipedia.org/wiki/Java_version_history#Java_8_updates >> [2] >> https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html#network >> [3] >> https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html#network >> >> pon., 14 wrz 2020 o 05:04 Josson Paul <jossonp...@gmail.com> napisał(a): >> >>> @Piotr Nowojski <pnowoj...@apache.org> @Nico Kruber <nkru...@apache.org> >>> I have attached the Taskmanager/GC/thread dumps in a zip file. >>> >>> I don't see any issues in the TM logs. >>> Tried to upgrade to Java 9. Flink is on top of another platform which >>> threw errors while upgrading to Java 9. I can't do much for now. We will >>> upgrade to Jdk 11 in another 2 months. >>> >>> Regarding the Heap size. The new experiment I did was on 4gb Heap on >>> both Flink 1.4 and Flink 1.8. >>> >>> Questions I am trying to get answered are >>> >>> 1) In Flink 1.4 set up, the data in the Heap is throttled. It never goes >>> out of memory whatever be the ingestion rate. our Windows are 5 >>> minutes windows. >>> 2) In Flink 1.8 set up, HeapKeyedStateBackend is never throttled and >>> fills up fast. When Old-gen space goes beyond 60-70% even the Mixed GC or >>> Full GC doesn't reclaim space. >>> >>> >>> On Fri, Sep 11, 2020 at 12:58 AM Piotr Nowojski <pnowoj...@apache.org> >>> wrote: >>> >>>> Hi Josson, >>>> >>>> Have you checked the logs as Nico suggested? At 18:55 there is a dip in >>>> non-heap memory, just about when the problems started happening. Maybe you >>>> could post the TM logs? >>>> Have you tried updating JVM to a newer version? >>>> Also it looks like the heap size is the same between 1.4 and 1.8, but >>>> in an earlier message you said you increased it by 700MB? >>>> >>>> Piotrek >>>> >>>> pt., 11 wrz 2020 o 05:07 Josson Paul <jossonp...@gmail.com> napisał(a): >>>> >>>>> I have attached two word documents. >>>>> Flink1.4 and Flink1.8 >>>>> I reduced the heap size in the cluster and tried the experiment in >>>>> both Flink 1.4 and Flink 1.8. >>>>> My goal was to simulate ingestion rate of 200 Clients/sec (Not going >>>>> into the details here). >>>>> >>>>> In Flink 1.4 I could reach 200 Clients/Sec and I ran the cluster for 1 >>>>> hour. You can see the details in the attached Flink1.4 document file. You >>>>> can see the GC activity and Cpu. Both are holding good. >>>>> >>>>> In Flin 1.8 I could reach only 160 Clients/Sec and the issue started >>>>> happening. Issue started within 15 minutes of starting the ingestion. >>>>> @Piotr >>>>> Nowojski <pnowoj...@apache.org> , you can see that there is no meta >>>>> space related issue. All the GC related details are available in the doc. >>>>> >>>>> Especially see the difference in Heap dump of 'Biggest Objects' in >>>>> both clusters. How Flink 1.4 holds lesser objects in Heap. Is it because >>>>> Flink 1.4 was efficient and 1.8 solved that in efficiency and this problem >>>>> is expected?. >>>>> >>>>> @Nicko, We are not doing the fat jar stuff. >>>>> >>>>> @Piotr Nowojski <pnowoj...@apache.org> , we are in the process of >>>>> upgrading to Java 11 and Flink 1.11. But I need at least 2 months. >>>>> >>>>> >>>>> I am not getting the Finalizer problem in the latest heap dump. Maybe >>>>> it was happening only 1 or 2 times. >>>>> >>>>> Please let me know if you need additional input >>>>> >>>>> >>>>> Thanks, >>>>> Josson >>>>> >>>>> >>>>> On Thu, Sep 10, 2020 at 5:19 AM Nico Kruber <nkru...@apache.org> >>>>> wrote: >>>>> >>>>>> What looks a bit strange to me is that with a running job, the >>>>>> SystemProcessingTimeService should actually not be collected (since >>>>>> it is >>>>>> still in use)! >>>>>> >>>>>> My guess is that something is indeed happening during that time frame >>>>>> (maybe >>>>>> job restarts?) and I would propose to check your logs for anything >>>>>> suspicious >>>>>> in there. >>>>>> >>>>>> >>>>>> When I did experiments with Beam pipelines on our platform [1], I >>>>>> also >>>>>> noticed, that the standard fat jars that Beam creates include Flink >>>>>> runtime >>>>>> classes it shouldn't (at least if you are submitting to a separate >>>>>> Flink >>>>>> cluster). This can cause all sorts of problems and I would recommend >>>>>> removing >>>>>> those from the fat jar as documented in [1]. >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> Nico >>>>>> >>>>>> >>>>>> >>>>>> [1] https://ververica.zendesk.com/hc/en-us/articles/360014323099 >>>>>> >>>>>> On Thursday, 10 September 2020 13:44:32 CEST Piotr Nowojski wrote: >>>>>> > Hi Josson, >>>>>> > >>>>>> > Thanks again for the detailed answer, and sorry that I can not help >>>>>> you >>>>>> > with some immediate answer. I presume that jvm args for 1.8 are the >>>>>> same? >>>>>> > >>>>>> > Can you maybe post what exactly has crashed in your cases a) and b)? >>>>>> > Re c), in the previously attached word document, it looks like >>>>>> Flink was >>>>>> > running without problems for a couple of hours/minutes, everything >>>>>> was >>>>>> > stable, no signs of growing memory consumption, impending problem, >>>>>> until >>>>>> > around 23:15, when the problem started, right? Has something else >>>>>> happened >>>>>> > at that time, something that could explain the spike? A checkpoint? >>>>>> Job >>>>>> > crash/restart? Load spike? >>>>>> > >>>>>> > A couple of other random guesses: >>>>>> > - have you been monitoring other memory pools for Flink 1.4 and >>>>>> 1.8? Like >>>>>> > meta space? Growing meta space size can sometimes cause problems. It >>>>>> > shouldn't be the case here, as you configured XX:MaxMetaspaceSize, >>>>>> but it >>>>>> > might be still worth checking... >>>>>> > - another random idea, have you tried upgrading JDK? Maybe that >>>>>> would solve >>>>>> > the problem? >>>>>> > >>>>>> > Best regards, >>>>>> > Piotrek >>>>>> > >>>>>> > śr., 9 wrz 2020 o 19:53 Josson Paul <jossonp...@gmail.com> >>>>>> napisał(a): >>>>>> > > Hi Piotr, >>>>>> > > >>>>>> > > *JVM start up for Flink 1.4* >>>>>> > > >>>>>> > > *-------------------------------* >>>>>> > > >>>>>> > > >>>>>> java-server-XX:HeapDumpPath=/opt/maglev/srv/diagnostics/pipelineruntime-ta >>>>>> > > skmgr-assurance-1-77d44cf64-z8gd4.heapdump- >>>>>> > > *Xmx6554m-Xms6554m*-*XX:MaxMetaspaceSize=512m* >>>>>> > > -XX:+HeapDumpOnOutOfMemoryError-*XX:+UseG1GC*-XX:CICompilerCount=4 >>>>>> > > >>>>>> *-XX:MaxGCPauseMillis=1000*-XX:+DisableExplicitGC-*XX:ParallelGCThreads=4* >>>>>> > > -Dsun.net.inetaddr.ttl=60-XX:OnOutOfMemoryError=kill -9 >>>>>> > > %p*-Dio.netty.eventLoopThreads=3* >>>>>> > > >>>>>> -Dlog4j.configurationFile=/opt/maglev/sw/apps/pipelineruntime/resources/lo >>>>>> > > >>>>>> g4j2.xml-Dorg.apache.flink.shaded.netty4.io.netty.eventLoopThreads=3-Dnetw >>>>>> > > orkaddress.cache.ttl=120-Dnum.cores=3- >>>>>> > > >>>>>> *XX:+UseStringDeduplication-Djava.util.concurrent.ForkJoinPool.common.par >>>>>> > > allelism=3-XX:ConcGCThreads=4 * >>>>>> > > >>>>>> -Djava.library.path=/usr/local/lib-Djava.net.preferIPv4Stack=true-Dapp.di >>>>>> > > >>>>>> r=/opt/maglev/sw/apps/pipelineruntime-Dserver.name=pipelineruntime-Dlog.di >>>>>> > > >>>>>> r=/opt/maglev/var/log/pipelineruntime-cp/opt/maglev/sw/apps/javacontainer/ >>>>>> > > >>>>>> resources:/opt/maglev/sw/apps/pipelineruntime/lib/*:/opt/maglev/sw/apps/pi >>>>>> > > >>>>>> pelineruntime/resources:/opt/maglev/sw/apps/javacontainer/lib/*com.cisco.m >>>>>> > > aglev.MaglevServerstartmaglev> >>>>>> > > 1. taskmanager.memory.fraction = 0.7f (This was coming to >>>>>> 4.5 GB. I >>>>>> > > didn't know at that time that we could set memory fraction to >>>>>> zero >>>>>> > > because >>>>>> > > ours is a streaming job. It was picking up the default ) >>>>>> > > 2. Network buffer pool memory was 646MB on the Heap (I >>>>>> think this >>>>>> > > was the default based on some calculations in the Flink 1.4) >>>>>> > > 3. G1GC region size was 4MB (Default) >>>>>> > > >>>>>> > > I tested this setup by reducing the JVM heap by *1GB.* It still >>>>>> worked >>>>>> > > perfectly with some lags here and there. >>>>>> > > >>>>>> > > *JVM start up for Flink 1.8* >>>>>> > > *------------------------------------* >>>>>> > > a) I started with the same configuration as above. Kubenetis POD >>>>>> went out >>>>>> > > of memory. At this point I realized that in Flink 1.8 network >>>>>> buffer >>>>>> > > pools >>>>>> > > are moved to native memory. Based on calculations it was coming >>>>>> to 200MB >>>>>> > > in >>>>>> > > native memory. I increased the overall POD memory to accommodate >>>>>> the >>>>>> > > buffer pool change keeping the *heap the same*. >>>>>> > > >>>>>> > > b) Even after I modified the overall POD memory, the POD still >>>>>> crashed. >>>>>> > > At this point I generated Flame graphs to identify the CPU/Malloc >>>>>> calls >>>>>> > > (Attached as part of the initial email). Realized that cpu usage >>>>>> of G1GC >>>>>> > > is >>>>>> > > significantly different from Flink 1.4. Now I made 2 changes >>>>>> > > >>>>>> > > 1. taskmanager.memory.fraction = 0.01f (This will give more >>>>>> heap for >>>>>> > > user code) >>>>>> > > 2. Increased cpu from 3 to 4 cores. >>>>>> > > >>>>>> > > Above changes helped to hold the cluster a little longer. >>>>>> But it >>>>>> > > >>>>>> > > still crashed after sometime. >>>>>> > > >>>>>> > > c) Now I made the below changes. >>>>>> > > >>>>>> > > 1. I came across this -> >>>>>> > > >>>>>> http://mail.openjdk.java.net/pipermail/hotspot-gc-use/2017-February/002 >>>>>> > > 622.html . Now I changed the G1GC region space to *8MB >>>>>> *instead of the >>>>>> > > default 4MB*.* >>>>>> > > 2. -XX:MaxGCPauseMillis=2000 (I even tried higher in later >>>>>> experiments) >>>>>> > > 3. Played around with G1RSetSparseRegionEntries >>>>>> > > >>>>>> > > This helped to avoid the POD going out of memory. But the >>>>>> Old Gen >>>>>> > > >>>>>> > > heap issue was very evident now (Please see the attached word >>>>>> document). >>>>>> > > >>>>>> > > d) Allocated additional heap memory of *700 MB *along with the >>>>>> above >>>>>> > > >>>>>> > > changes. This also didn't help. It just prolonged the crash. Now >>>>>> I need >>>>>> > > help from others to which direction I want to take this to . >>>>>> > > >>>>>> > > My worry is even if I upgrade to flink 1.11 this issue might still >>>>>> > > persist. >>>>>> > > >>>>>> > > I have attached a screenshot from Heap dump to show you the >>>>>> difference >>>>>> > > between Flink 1.4 and 1.8 the way HeapKeyedStateBackend is >>>>>> created. Not >>>>>> > > sure whether this change has something to do with this memory >>>>>> issue that I >>>>>> > > am facing. >>>>>> > > Name Flink-1.4.jpg for the 1.4 and Flink-1.8.jpg for 1.8 >>>>>> > > >>>>>> > > >>>>>> > > Thanks, >>>>>> > > Josson >>>>>> > > >>>>>> > > On Wed, Sep 9, 2020 at 5:44 AM Piotr Nowojski < >>>>>> pnowoj...@apache.org> >>>>>> > > >>>>>> > > wrote: >>>>>> > >> Hi Josson, >>>>>> > >> >>>>>> > >> Thanks for getting back. >>>>>> > >> >>>>>> > >> What are the JVM settings and in particular GC settings that you >>>>>> are >>>>>> > >> using (G1GC?)? >>>>>> > >> It could also be an issue that in 1.4 you were just slightly >>>>>> below the >>>>>> > >> threshold of GC issues, while in 1.8, something is using a bit >>>>>> more >>>>>> > >> memory, >>>>>> > >> causing the GC issues to appear? Have you tried just increasing >>>>>> the heap >>>>>> > >> size? >>>>>> > >> Have you tried to compare on the job start up, what is the usage >>>>>> and size >>>>>> > >> of JVM's memory pools with Flink 1.4 and 1.8? Maybe that can >>>>>> point us in >>>>>> > >> the right direction. >>>>>> > >> >>>>>> > >> > My understanding on back pressure is that it is not based on >>>>>> Heap >>>>>> > >> >>>>>> > >> memory but based on how fast the Network buffers are filled. Is >>>>>> this >>>>>> > >> correct?. >>>>>> > >> >>>>>> > >> > Does Flink use TCP connection to communicate between tasks if >>>>>> the tasks >>>>>> > >> >>>>>> > >> are in the same Task manager?. >>>>>> > >> >>>>>> > >> No, local input channels are being used then, but memory for >>>>>> network >>>>>> > >> buffers is assigned to tasks regardless of the fraction of local >>>>>> input >>>>>> > >> channels in the task. However with just single taskmanager and >>>>>> > >> parallelism >>>>>> > >> of 4, the amount of the memory used by the network stack should >>>>>> be >>>>>> > >> insignificant, at least as long as you have a reasonably sized >>>>>> job graph >>>>>> > >> (32KB * (2 * parallelism + 7) * number of tasks). >>>>>> > >> >>>>>> > >> > What I noticed in Flink 1.4 is that it doesn't read data from >>>>>> Kafka if >>>>>> > >> >>>>>> > >> there is not sufficient heap memory to process data. Somehow >>>>>> this is not >>>>>> > >> happening in Flink 1.8 and it fills the heap soon enough not to >>>>>> get >>>>>> > >> GCed/Finalized. Any change around this between Flink 1.4 and >>>>>> Flink 1.8. >>>>>> > >> >>>>>> > >> No, there were no changes in this part as far as I remember. >>>>>> Tasks when >>>>>> > >> producing records are serialising them and putting into the >>>>>> network >>>>>> > >> buffers. If there are no available network buffers, the task is >>>>>> back >>>>>> > >> pressuring and stops processing new records. >>>>>> > >> >>>>>> > >> Best regards, >>>>>> > >> Piotrek >>>>>> > >> >>>>>> > >> wt., 8 wrz 2020 o 21:51 Josson Paul <jossonp...@gmail.com> >>>>>> napisał(a): >>>>>> > >>> Hi Piotr, >>>>>> > >>> >>>>>> > >>> 2) SystemProcessingTimeService holds the >>>>>> HeapKeyedStateBackend and >>>>>> > >>> >>>>>> > >>> HeapKeyedStateBackend has lot of Objects and that is filling >>>>>> the Heap >>>>>> > >>> >>>>>> > >>> 3) I am not using Flink Kafka Connector. But we are using >>>>>> Apache Beam >>>>>> > >>> >>>>>> > >>> kafka connector. There is a change in the Apache Beam version. >>>>>> But the >>>>>> > >>> kafka client we are using is the same as the one which was >>>>>> working in >>>>>> > >>> the >>>>>> > >>> other cluster where Flink was 1.4. >>>>>> > >>> >>>>>> > >>> *There is no change in Hardware/Java/Kafka/Kafka >>>>>> Client/Application >>>>>> > >>> >>>>>> > >>> between the cluster which is working and not working* >>>>>> > >>> >>>>>> > >>> I am aware of the memory changes and network buffer changes >>>>>> between 1.4 >>>>>> > >>> and 1.8. >>>>>> > >>> >>>>>> > >>> Flink 1.4 had network buffers on Heap and 1.8 network buffers >>>>>> are on the >>>>>> > >>> native memory. I modified the Flink 1.8 code to put it back to >>>>>> Heap >>>>>> > >>> memory >>>>>> > >>> but the issue didn't get resolved. >>>>>> > >>> >>>>>> > >>> Mine is a streaming job so we set 'taskmanager.memory.fraction' >>>>>> to very >>>>>> > >>> minimal and that heap is fully available for user data. >>>>>> > >>> >>>>>> > >>> Flink 1.4 was not using Credit based Flow control and Flink 1.8 >>>>>> uses >>>>>> > >>> Credit based Flow control. *Our set up has only 1 task manager >>>>>> and 4 >>>>>> > >>> parallelisms*. According to this video >>>>>> > >>> >>>>>> https://www.youtube.com/watch?v=AbqatHF3tZI&ab_channel=FlinkForward ( >>>>>> > >>> *16:21*) if tasks are in same task manager, Flink doesn't use >>>>>> Credit >>>>>> > >>> Based Flow control. Essentially no change between Flink 1.4 and >>>>>> 1.8 in >>>>>> > >>> *our >>>>>> > >>> set up*. Still I tried to change the Credit Based Flow Control >>>>>> to False >>>>>> > >>> and test my setup. The problem persists. >>>>>> > >>> >>>>>> > >>> What I noticed in Flink 1.4 is that it doesn't read data from >>>>>> Kafka if >>>>>> > >>> there is not sufficient heap memory to process data. Somehow >>>>>> this is not >>>>>> > >>> happening in Flink 1.8 and it fills the heap soon enough not to >>>>>> get >>>>>> > >>> GCed/Finalized. Any change around this between Flink 1.4 and >>>>>> Flink 1.8. >>>>>> > >>> >>>>>> > >>> My understanding on back pressure is that it is not based on >>>>>> Heap memory >>>>>> > >>> but based on how fast the Network buffers are filled. Is this >>>>>> correct?. >>>>>> > >>> Does Flink use TCP connection to communicate between tasks if >>>>>> the tasks >>>>>> > >>> are in the same Task manager?. >>>>>> > >>> >>>>>> > >>> Thanks, >>>>>> > >>> josson >>>>>> > >>> >>>>>> > >>> On Thu, Sep 3, 2020 at 12:35 PM Piotr Nowojski < >>>>>> pnowoj...@apache.org> >>>>>> > >>> >>>>>> > >>> wrote: >>>>>> > >>>> Hi Josson, >>>>>> > >>>> >>>>>> > >>>> 2. Are you sure that all/vast majority of those objects are >>>>>> pointing >>>>>> > >>>> towards SystemProcessingTimeService? And is this really the >>>>>> problem of >>>>>> > >>>> those objects? Are they taking that much of the memory? >>>>>> > >>>> 3. It still could be Kafka's problem, as it's likely that >>>>>> between 1.4 >>>>>> > >>>> and 1.8.x we bumped Kafka dependencies. >>>>>> > >>>> >>>>>> > >>>> Frankly if that's not some other external dependency issue, I >>>>>> would >>>>>> > >>>> expect that the problem might lie somewhere completely else. >>>>>> Flink's >>>>>> > >>>> code >>>>>> > >>>> relaying on the finalisation hasn't changed since 2015/2016. >>>>>> On the >>>>>> > >>>> other >>>>>> > >>>> hand there were quite a bit of changes between 1.4 and 1.8.x, >>>>>> some of >>>>>> > >>>> them >>>>>> > >>>> were affecting memory usage. Have you read release notes for >>>>>> versions >>>>>> > >>>> 1.5, >>>>>> > >>>> 1.6, 1.7 and 1.8? In particular both 1.5 [1] and 1.8 [2] have >>>>>> memory >>>>>> > >>>> related notes that could be addressed via configuration >>>>>> changes. >>>>>> > >>>> >>>>>> > >>>> Thanks, >>>>>> > >>>> Piotrek >>>>>> > >>>> >>>>>> > >>>> [1] >>>>>> > >>>> >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/release-not >>>>>> > >>>> es/flink-1.5.html [2] >>>>>> > >>>> >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/release-not >>>>>> > >>>> es/flink-1.8.html>>>> >>>>>> > >>>> czw., 3 wrz 2020 o 18:50 Josson Paul <jossonp...@gmail.com> >>>>>> napisał(a): >>>>>> > >>>>> 1) We are in the process of migrating to Flink 1.11. But it >>>>>> is going >>>>>> > >>>>> to take a while before we can make everything work with the >>>>>> latest >>>>>> > >>>>> version. >>>>>> > >>>>> Meanwhile since this is happening in production I am trying >>>>>> to solve >>>>>> > >>>>> this. >>>>>> > >>>>> 2) Finalizae class is pointing >>>>>> > >>>>> to >>>>>> > >>>>> >>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService >>>>>> > >>>>> . >>>>>> > >>>>> This class has a finalize method. I have attached spreadsheet >>>>>> ( >>>>>> > >>>>> *Object-explorer.csv*) to give you a high level view >>>>>> > >>>>> 3) The difference between working cluster and NON working >>>>>> cluster is >>>>>> > >>>>> only on Beam and Flink. Hardware, Input message rate, >>>>>> Application >>>>>> > >>>>> jars, >>>>>> > >>>>> Kafka are all the same between those 2 clusters. Working >>>>>> cluster was >>>>>> > >>>>> with >>>>>> > >>>>> Flink 1.4 and Beam 2.4.0 >>>>>> > >>>>> >>>>>> > >>>>> Any insights into this will help me to debug further >>>>>> > >>>>> >>>>>> > >>>>> Thanks, >>>>>> > >>>>> Josson >>>>>> > >>>>> >>>>>> > >>>>> >>>>>> > >>>>> On Thu, Sep 3, 2020 at 3:34 AM Piotr Nowojski < >>>>>> pnowoj...@apache.org> >>>>>> > >>>>> >>>>>> > >>>>> wrote: >>>>>> > >>>>>> Hi, >>>>>> > >>>>>> >>>>>> > >>>>>> Have you tried using a more recent Flink version? 1.8.x is >>>>>> no longer >>>>>> > >>>>>> supported, and latest versions might not have this issue >>>>>> anymore. >>>>>> > >>>>>> >>>>>> > >>>>>> Secondly, have you tried backtracking those references to the >>>>>> > >>>>>> Finalizers? Assuming that Finalizer is indeed the class >>>>>> causing >>>>>> > >>>>>> problems. >>>>>> > >>>>>> >>>>>> > >>>>>> Also it may well be a non Flink issue [1]. >>>>>> > >>>>>> >>>>>> > >>>>>> Best regards, >>>>>> > >>>>>> Piotrek >>>>>> > >>>>>> >>>>>> > >>>>>> [1] https://issues.apache.org/jira/browse/KAFKA-8546 >>>>>> > >>>>>> >>>>>> > >>>>>> czw., 3 wrz 2020 o 04:47 Josson Paul <jossonp...@gmail.com> >>>>>> > >>>>>> >>>>>> > >>>>>> napisał(a): >>>>>> > >>>>>>> Hi All, >>>>>> > >>>>>>> >>>>>> > >>>>>>> *ISSUE* >>>>>> > >>>>>>> ------ >>>>>> > >>>>>>> Flink application runs for sometime and suddenly the CPU >>>>>> shoots up >>>>>> > >>>>>>> and touches the peak, POD memory reaches to the peak, GC >>>>>> count >>>>>> > >>>>>>> increases, >>>>>> > >>>>>>> Old-gen spaces reach close to 100%. Full GC doesn't clean >>>>>> up heap >>>>>> > >>>>>>> space. At >>>>>> > >>>>>>> this point I stopped sending the data and cancelled the >>>>>> Flink Jobs. >>>>>> > >>>>>>> Still >>>>>> > >>>>>>> the Old-Gen space doesn't come down. I took a heap dump and >>>>>> can see >>>>>> > >>>>>>> that >>>>>> > >>>>>>> lot of Objects in the java.lang.Finalizer class. I have >>>>>> attached the >>>>>> > >>>>>>> details in a word document. I do have the heap dump but it >>>>>> is close >>>>>> > >>>>>>> to 2GB >>>>>> > >>>>>>> of compressed size. Is it safe to upload somewhere and >>>>>> share it >>>>>> > >>>>>>> here?. >>>>>> > >>>>>>> >>>>>> > >>>>>>> This issue doesn't happen in Flink: 1.4.0 and Beam: >>>>>> release-2.4.0 >>>>>> > >>>>>>> >>>>>> > >>>>>>> *WORKING CLUSTER INFO* (Flink: 1.4.0 and Beam: >>>>>> release-2.4.0) >>>>>> > >>>>>>> ---------------------------------------------------- >>>>>> > >>>>>>> >>>>>> > >>>>>>> Application reads from Kafka and does aggregations and >>>>>> writes into >>>>>> > >>>>>>> Kafka. Application has 5 minutes windows. Application uses >>>>>> Beam >>>>>> > >>>>>>> constructs >>>>>> > >>>>>>> to build the pipeline. To read and write we use Beam >>>>>> connectors. >>>>>> > >>>>>>> >>>>>> > >>>>>>> Flink version: 1.4.0 >>>>>> > >>>>>>> Beam version: release-2.4.0 >>>>>> > >>>>>>> Backend State: State backend is in the Heap and check >>>>>> pointing >>>>>> > >>>>>>> happening to the distributed File System. >>>>>> > >>>>>>> >>>>>> > >>>>>>> No of task Managers: 1 >>>>>> > >>>>>>> Heap: 6.4 GB >>>>>> > >>>>>>> CPU: 4 Cores >>>>>> > >>>>>>> Standalone cluster deployment on a Kubernetes pod >>>>>> > >>>>>>> >>>>>> > >>>>>>> *NOT WORKING CLUSTER INFO* (Flink version: 1.8.3 and Beam >>>>>> version: >>>>>> > >>>>>>> release-2.15.0) >>>>>> > >>>>>>> ---------- >>>>>> > >>>>>>> Application details are same as above >>>>>> > >>>>>>> >>>>>> > >>>>>>> *No change in application and the rate at which data is >>>>>> injected. >>>>>> > >>>>>>> But change in Flink and Beam versions* >>>>>> > >>>>>>> >>>>>> > >>>>>>> >>>>>> > >>>>>>> Flink version: 1.8.3 >>>>>> > >>>>>>> Beam version: release-2.15.0 >>>>>> > >>>>>>> Backend State: State backend is in the Heap and check >>>>>> pointing >>>>>> > >>>>>>> happening to the distributed File System. >>>>>> > >>>>>>> >>>>>> > >>>>>>> No of task Managers: 1 >>>>>> > >>>>>>> Heap: 6.5 GB >>>>>> > >>>>>>> CPU: 4 Cores >>>>>> > >>>>>>> >>>>>> > >>>>>>> Deployment: Standalone cluster deployment on a Kubernetes >>>>>> pod >>>>>> > >>>>>>> >>>>>> > >>>>>>> My Observations >>>>>> > >>>>>>> ------------- >>>>>> > >>>>>>> >>>>>> > >>>>>>> 1) CPU flame graph shows that in the working version, the >>>>>> cpu time >>>>>> > >>>>>>> on GC is lesser compared to non-working version (Please see >>>>>> the >>>>>> > >>>>>>> attached >>>>>> > >>>>>>> Flame Graph. *CPU-flame-WORKING.svg* for working cluster and >>>>>> > >>>>>>> *CPU-flame-NOT-working.svg*) >>>>>> > >>>>>>> >>>>>> > >>>>>>> 2) I have attached the flame graph for native memory MALLOC >>>>>> calls >>>>>> > >>>>>>> when the issue was happening. Please find the attached SVG >>>>>> image ( >>>>>> > >>>>>>> *malloc-NOT-working.svg*). The POD memory peaks when this >>>>>> issue >>>>>> > >>>>>>> happens. For me, it looks like the GC process is requesting >>>>>> a lot of >>>>>> > >>>>>>> native >>>>>> > >>>>>>> memory. >>>>>> > >>>>>>> >>>>>> > >>>>>>> 3) When the issue is happening the GC cpu usage is very >>>>>> high. Please >>>>>> > >>>>>>> see the flame graph (*CPU-graph-at-issuetime.svg*) >>>>>> > >>>>>>> >>>>>> > >>>>>>> Note: SVG file can be opened using any browser and it is >>>>>> clickable >>>>>> > >>>>>>> while opened. >>>>>> > >>>>>>> -- >>>>>> > >>>>>>> Thanks >>>>>> > >>>>>>> Josson >>>>>> > >>>>> >>>>>> > >>>>> -- >>>>>> > >>>>> Thanks >>>>>> > >>>>> Josson >>>>>> > >>> >>>>>> > >>> -- >>>>>> > >>> Thanks >>>>>> > >>> Josson >>>>>> > > >>>>>> > > -- >>>>>> > > Thanks >>>>>> > > Josson >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>>> -- >>>>> Thanks >>>>> Josson >>>>> >>>> >>> >>> -- >>> Thanks >>> Josson >>> >> > > -- > Thanks > Josson >