Hi Josson,

Thanks for great investigation and coming back to use. Aljoscha, could you
help us here? It looks like you were involved in this original BEAM-3087
issue.

Best,
Piotrek

pt., 23 paź 2020 o 07:36 Josson Paul <jossonp...@gmail.com> napisał(a):

> @Piotr Nowojski <pnowoj...@apache.org>  @Nico Kruber <nkru...@apache.org>
>
> An update.
>
> I am able to figure out the problem code. A change in the Apache Beam code
> is causing this problem.
>
>
>
>
>
> Beam introduced a lock on the “emit” in Unbounded Source. The lock is on
> the Flink’s check point lock. Now the same lock is used by Flink’s timer
> service to emit the Watermarks. Flink’s timer service is starved to get
> hold of the lock and for some reason it never gets that lock. Aftereffect
>  of this situation is that the ‘WaterMark’ is never emitted by Flink’s
> timer service.  Because there is no Watermarks flowing through the system,
> Sliding Windows are never closed. Data gets accumulated in the Window.
>
>
>
> This problem occurs only if we have external lookup calls (like Redis)
> happen before the data goes to Sliding Window. Something like below.
>
>
>
> KafkaSource à Transforms (Occasional Redis
> lookup)->SlidingWindow->Transforms->Kafka Sink
>
>
>
>
>
>
> https://github.com/apache/beam/blob/60e0a22ea95921636c392b5aae77cb48196dd700/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L256
> . This is Beam 2.4 and you can see that there is no synchronized block at
> line 257 and 270.
>
>
>
>
> https://github.com/apache/beam/blob/7931ec055e2da7214c82e368ef7d7fd679faaef1/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L264
> . This is Beam 2.15. See the synchronized block introduced in line 264 and
> 280. We are using Beam 2.15 and Flink 1.8.
>
>
>
> Beam introduced this synchronized block because of this bug.
> https://issues.apache.org/jira/browse/BEAM-3087
>
>
>
> After I removed that synchronized keyword everything started working fine
> in my application.
>
>
>
> What do you guys think about this?. Why does Beam need a Synchronized
> block there?
>
>
>
> Beam is using this lock ->
>
>
> https://github.com/apache/flink/blob/d54807ba10d0392a60663f030f9fe0bfa1c66754/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java#L282
>
>
>
> Thanks,
>
> Josson
>
> On Mon, Sep 14, 2020 at 5:03 AM Piotr Nowojski <pnowoj...@apache.org>
> wrote:
>
>> Hi Josson,
>>
>> The TM logs that you attached are only from a 5 minutes time period. Are
>> you sure they are encompassing the period before the potential failure and
>> after the potential failure? It would be also nice if you would provide the
>> logs matching to the charts (like the one you were providing in the
>> previous messages), to correlate events (spike in latency/GC with some
>> timestamp from the logs).
>>
>> I was not asking necessarily to upgrade to Java9, but an updated/bug
>> fixed version of Java8 [1].
>>
>> > 1) In Flink 1.4 set up, the data in the Heap is throttled. It never
>> goes out of memory whatever be the ingestion rate. our Windows are 5
>> minutes windows.
>> > 2) In Flink 1.8 set up, HeapKeyedStateBackend is never throttled and
>> fills up fast. When Old-gen space goes beyond 60-70% even the Mixed GC or
>> Full GC doesn't reclaim space.
>>
>> In both cases there is the same mechanism for the backpressure. If a
>> task's output runs out of buffers to put produced records, it will block
>> the task. It can be that between 1.4 and 1.8, with credit based flow
>> control changes, the amount of available buffers for the tasks on your
>> setup has grown, so the tasks are backpressuring later. This in turn can
>> sometimes mean that at any point of time there is more data buffered on the
>> operator's state, like `WindowOperator`. I'm not sure what's the
>> best/easiest way how to check this:
>>
>> 1. the amount of buffered data might be visible via metrics [2][3]
>> 2. if you enable DEBUG logs, it should be visible via:
>>
>> > LOG.debug("Using a local buffer pool with {}-{} buffers",
>> numberOfRequiredMemorySegments, maxNumberOfMemorySegments);
>>
>> entry logged by
>> `org.apache.flink.runtime.io.network.buffer.LocalBufferPool`.
>>
>> Piotrek
>>
>> [1] https://en.wikipedia.org/wiki/Java_version_history#Java_8_updates
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/metrics.html#network
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/metrics.html#network
>>
>> pon., 14 wrz 2020 o 05:04 Josson Paul <jossonp...@gmail.com> napisał(a):
>>
>>> @Piotr Nowojski <pnowoj...@apache.org> @Nico Kruber <nkru...@apache.org>
>>> I have attached the  Taskmanager/GC/thread dumps in a zip file.
>>>
>>> I don't see any issues in the TM logs.
>>> Tried to upgrade to Java 9. Flink is on top of another platform which
>>> threw errors while upgrading to Java 9. I can't do much for now. We will
>>> upgrade to Jdk 11 in another 2 months.
>>>
>>> Regarding the Heap size. The new experiment I did was on 4gb Heap on
>>> both Flink 1.4 and Flink 1.8.
>>>
>>> Questions I am trying to get answered are
>>>
>>> 1) In Flink 1.4 set up, the data in the Heap is throttled. It never goes
>>> out of memory whatever be the ingestion rate. our Windows are 5
>>> minutes windows.
>>> 2) In Flink 1.8 set up, HeapKeyedStateBackend is never throttled and
>>> fills up fast. When Old-gen space goes beyond 60-70% even the Mixed GC or
>>> Full GC doesn't reclaim space.
>>>
>>>
>>> On Fri, Sep 11, 2020 at 12:58 AM Piotr Nowojski <pnowoj...@apache.org>
>>> wrote:
>>>
>>>> Hi Josson,
>>>>
>>>> Have you checked the logs as Nico suggested? At 18:55 there is a dip in
>>>> non-heap memory, just about when the problems started happening. Maybe you
>>>> could post the TM logs?
>>>> Have you tried updating JVM to a newer version?
>>>> Also it looks like the heap size is the same between 1.4 and 1.8, but
>>>> in an earlier message you said you increased it by 700MB?
>>>>
>>>> Piotrek
>>>>
>>>> pt., 11 wrz 2020 o 05:07 Josson Paul <jossonp...@gmail.com> napisał(a):
>>>>
>>>>> I have attached two word documents.
>>>>> Flink1.4 and Flink1.8
>>>>> I reduced the heap size in the cluster and tried the experiment in
>>>>> both Flink 1.4 and Flink 1.8.
>>>>> My goal was to simulate ingestion rate of 200 Clients/sec (Not going
>>>>> into the details here).
>>>>>
>>>>> In Flink 1.4 I could reach 200 Clients/Sec and I ran the cluster for 1
>>>>> hour. You can see the details in the attached Flink1.4 document file. You
>>>>> can see the GC activity and Cpu. Both are holding good.
>>>>>
>>>>> In Flin 1.8 I could reach only 160 Clients/Sec and the issue started
>>>>> happening. Issue started within 15 minutes of starting the ingestion. 
>>>>> @Piotr
>>>>> Nowojski <pnowoj...@apache.org> , you can see that there is no meta
>>>>> space related issue. All the GC related details are available in the doc.
>>>>>
>>>>> Especially see the difference in Heap dump of 'Biggest Objects' in
>>>>> both clusters. How Flink 1.4 holds lesser objects in Heap. Is it because
>>>>> Flink 1.4 was efficient and 1.8 solved that in efficiency and this problem
>>>>> is expected?.
>>>>>
>>>>> @Nicko, We are not doing the fat jar stuff.
>>>>>
>>>>> @Piotr Nowojski <pnowoj...@apache.org> , we are in the process of
>>>>> upgrading to Java 11 and Flink 1.11. But I need at least 2 months.
>>>>>
>>>>>
>>>>> I am not getting the Finalizer problem in the latest heap dump. Maybe
>>>>> it was happening only 1 or 2 times.
>>>>>
>>>>> Please let me know if you need additional input
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Josson
>>>>>
>>>>>
>>>>> On Thu, Sep 10, 2020 at 5:19 AM Nico Kruber <nkru...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> What looks a bit strange to me is that with a running job, the
>>>>>> SystemProcessingTimeService should actually not be collected (since
>>>>>> it is
>>>>>> still in use)!
>>>>>>
>>>>>> My guess is that something is indeed happening during that time frame
>>>>>> (maybe
>>>>>> job restarts?) and I would propose to check your logs for anything
>>>>>> suspicious
>>>>>> in there.
>>>>>>
>>>>>>
>>>>>> When I did experiments with Beam pipelines on our platform [1], I
>>>>>> also
>>>>>> noticed, that the standard fat jars that Beam creates include Flink
>>>>>> runtime
>>>>>> classes it shouldn't (at least if you are submitting to a separate
>>>>>> Flink
>>>>>> cluster). This can cause all sorts of problems and I would recommend
>>>>>> removing
>>>>>> those from the fat jar as documented in [1].
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> Nico
>>>>>>
>>>>>>
>>>>>>
>>>>>> [1] https://ververica.zendesk.com/hc/en-us/articles/360014323099
>>>>>>
>>>>>> On Thursday, 10 September 2020 13:44:32 CEST Piotr Nowojski wrote:
>>>>>> > Hi Josson,
>>>>>> >
>>>>>> > Thanks again for the detailed answer, and sorry that I can not help
>>>>>> you
>>>>>> > with some immediate answer. I presume that jvm args for 1.8 are the
>>>>>> same?
>>>>>> >
>>>>>> > Can you maybe post what exactly has crashed in your cases a) and b)?
>>>>>> > Re c), in the previously attached word document, it looks like
>>>>>> Flink was
>>>>>> > running without problems for a couple of hours/minutes, everything
>>>>>> was
>>>>>> > stable, no signs of growing memory consumption, impending problem,
>>>>>> until
>>>>>> > around 23:15, when the problem started, right? Has something else
>>>>>> happened
>>>>>> > at that time, something that could explain the spike? A checkpoint?
>>>>>> Job
>>>>>> > crash/restart? Load spike?
>>>>>> >
>>>>>> > A couple of other random guesses:
>>>>>> > - have you been monitoring other memory pools for Flink 1.4 and
>>>>>> 1.8? Like
>>>>>> > meta space? Growing meta space size can sometimes cause problems. It
>>>>>> > shouldn't be the case here, as you configured XX:MaxMetaspaceSize,
>>>>>> but it
>>>>>> > might be still worth checking...
>>>>>> > - another random idea, have you tried upgrading JDK? Maybe that
>>>>>> would solve
>>>>>> > the problem?
>>>>>> >
>>>>>> > Best regards,
>>>>>> > Piotrek
>>>>>> >
>>>>>> > śr., 9 wrz 2020 o 19:53 Josson Paul <jossonp...@gmail.com>
>>>>>> napisał(a):
>>>>>> > > Hi Piotr,
>>>>>> > >
>>>>>> > >  *JVM start up for Flink 1.4*
>>>>>> > >
>>>>>> > > *-------------------------------*
>>>>>> > >
>>>>>> > >
>>>>>> java-server-XX:HeapDumpPath=/opt/maglev/srv/diagnostics/pipelineruntime-ta
>>>>>> > > skmgr-assurance-1-77d44cf64-z8gd4.heapdump-
>>>>>> > > *Xmx6554m-Xms6554m*-*XX:MaxMetaspaceSize=512m*
>>>>>> > > -XX:+HeapDumpOnOutOfMemoryError-*XX:+UseG1GC*-XX:CICompilerCount=4
>>>>>> > >
>>>>>> *-XX:MaxGCPauseMillis=1000*-XX:+DisableExplicitGC-*XX:ParallelGCThreads=4*
>>>>>> > > -Dsun.net.inetaddr.ttl=60-XX:OnOutOfMemoryError=kill -9
>>>>>> > > %p*-Dio.netty.eventLoopThreads=3*
>>>>>> > >
>>>>>> -Dlog4j.configurationFile=/opt/maglev/sw/apps/pipelineruntime/resources/lo
>>>>>> > >
>>>>>> g4j2.xml-Dorg.apache.flink.shaded.netty4.io.netty.eventLoopThreads=3-Dnetw
>>>>>> > > orkaddress.cache.ttl=120-Dnum.cores=3-
>>>>>> > >
>>>>>> *XX:+UseStringDeduplication-Djava.util.concurrent.ForkJoinPool.common.par
>>>>>> > > allelism=3-XX:ConcGCThreads=4 *
>>>>>> > >
>>>>>> -Djava.library.path=/usr/local/lib-Djava.net.preferIPv4Stack=true-Dapp.di
>>>>>> > >
>>>>>> r=/opt/maglev/sw/apps/pipelineruntime-Dserver.name=pipelineruntime-Dlog.di
>>>>>> > >
>>>>>> r=/opt/maglev/var/log/pipelineruntime-cp/opt/maglev/sw/apps/javacontainer/
>>>>>> > >
>>>>>> resources:/opt/maglev/sw/apps/pipelineruntime/lib/*:/opt/maglev/sw/apps/pi
>>>>>> > >
>>>>>> pelineruntime/resources:/opt/maglev/sw/apps/javacontainer/lib/*com.cisco.m
>>>>>> > > aglev.MaglevServerstartmaglev>
>>>>>> > >    1.   taskmanager.memory.fraction = 0.7f (This was coming to
>>>>>> 4.5 GB. I
>>>>>> > >    didn't know at that time that we could set memory fraction to
>>>>>> zero
>>>>>> > >    because
>>>>>> > >    ours is a streaming job. It was  picking up the default )
>>>>>> > >    2.    Network buffer pool memory was 646MB on the Heap (I
>>>>>> think this
>>>>>> > >    was the default based on some calculations in the Flink 1.4)
>>>>>> > >    3.    G1GC region size was 4MB (Default)
>>>>>> > >
>>>>>> > > I tested this setup by reducing the JVM heap by *1GB.* It still
>>>>>> worked
>>>>>> > > perfectly with some lags here and there.
>>>>>> > >
>>>>>> > > *JVM start up for Flink 1.8*
>>>>>> > > *------------------------------------*
>>>>>> > > a) I started with the same configuration as above. Kubenetis POD
>>>>>> went out
>>>>>> > > of memory. At this point I realized that in Flink 1.8  network
>>>>>> buffer
>>>>>> > > pools
>>>>>> > > are moved to native memory. Based on calculations it was coming
>>>>>> to 200MB
>>>>>> > > in
>>>>>> > > native  memory. I increased the overall POD memory to accommodate
>>>>>> the
>>>>>> > > buffer pool change keeping the *heap the same*.
>>>>>> > >
>>>>>> > > b) Even after I modified the overall POD memory,  the POD still
>>>>>> crashed.
>>>>>> > > At this point I generated Flame graphs to identify the CPU/Malloc
>>>>>> calls
>>>>>> > > (Attached as part of the initial email). Realized that cpu usage
>>>>>> of G1GC
>>>>>> > > is
>>>>>> > > significantly different from Flink 1.4. Now I made 2 changes
>>>>>> > >
>>>>>> > >    1.  taskmanager.memory.fraction = 0.01f (This will give more
>>>>>> heap for
>>>>>> > >    user code)
>>>>>> > >    2. Increased cpu from 3 to 4 cores.
>>>>>> > >
>>>>>> > >         Above changes helped to hold the cluster a little longer.
>>>>>> But it
>>>>>> > >
>>>>>> > > still crashed after sometime.
>>>>>> > >
>>>>>> > > c)  Now I made the below changes.
>>>>>> > >
>>>>>> > >    1. I came across this ->
>>>>>> > >
>>>>>> http://mail.openjdk.java.net/pipermail/hotspot-gc-use/2017-February/002
>>>>>> > >    622.html . Now I changed the G1GC region space to *8MB
>>>>>> *instead of the
>>>>>> > >    default 4MB*.*
>>>>>> > >    2. -XX:MaxGCPauseMillis=2000 (I even tried higher in later
>>>>>> experiments)
>>>>>> > >    3. Played around with G1RSetSparseRegionEntries
>>>>>> > >
>>>>>> > >        This helped to avoid the POD going out of memory. But the
>>>>>> Old Gen
>>>>>> > >
>>>>>> > > heap issue was very evident now (Please see the attached word
>>>>>> document).
>>>>>> > >
>>>>>> > >  d)  Allocated additional heap memory of *700 MB *along with the
>>>>>> above
>>>>>> > >
>>>>>> > > changes. This also didn't help. It just prolonged the crash.  Now
>>>>>> I need
>>>>>> > > help from others to which direction I want to take this to .
>>>>>> > >
>>>>>> > > My worry is even if I upgrade to flink 1.11 this issue might still
>>>>>> > > persist.
>>>>>> > >
>>>>>> > > I have attached a screenshot from Heap dump to show you the
>>>>>> difference
>>>>>> > > between Flink 1.4 and 1.8 the way HeapKeyedStateBackend is
>>>>>> created. Not
>>>>>> > > sure whether this change has something to do with this memory
>>>>>> issue that I
>>>>>> > > am facing.
>>>>>> > > Name Flink-1.4.jpg for the 1.4 and Flink-1.8.jpg for 1.8
>>>>>> > >
>>>>>> > >
>>>>>> > > Thanks,
>>>>>> > > Josson
>>>>>> > >
>>>>>> > > On Wed, Sep 9, 2020 at 5:44 AM Piotr Nowojski <
>>>>>> pnowoj...@apache.org>
>>>>>> > >
>>>>>> > > wrote:
>>>>>> > >> Hi Josson,
>>>>>> > >>
>>>>>> > >> Thanks for getting back.
>>>>>> > >>
>>>>>> > >> What are the JVM settings and in particular GC settings that you
>>>>>> are
>>>>>> > >> using (G1GC?)?
>>>>>> > >> It could also be an issue that in 1.4 you were just slightly
>>>>>> below the
>>>>>> > >> threshold of GC issues, while in 1.8, something is using a bit
>>>>>> more
>>>>>> > >> memory,
>>>>>> > >> causing the GC issues to appear? Have you tried just increasing
>>>>>> the heap
>>>>>> > >> size?
>>>>>> > >> Have you tried to compare on the job start up, what is the usage
>>>>>> and size
>>>>>> > >> of JVM's memory pools with Flink 1.4 and 1.8? Maybe that can
>>>>>> point us in
>>>>>> > >> the right direction.
>>>>>> > >>
>>>>>> > >> > My understanding on back pressure is that it is not based on
>>>>>> Heap
>>>>>> > >>
>>>>>> > >> memory but based on how fast the Network buffers are filled. Is
>>>>>> this
>>>>>> > >> correct?.
>>>>>> > >>
>>>>>> > >> > Does Flink use TCP connection to communicate between tasks if
>>>>>> the tasks
>>>>>> > >>
>>>>>> > >> are in the same Task manager?.
>>>>>> > >>
>>>>>> > >> No, local input channels are being used then, but memory for
>>>>>> network
>>>>>> > >> buffers is assigned to tasks regardless of the fraction of local
>>>>>> input
>>>>>> > >> channels in the task. However with just single taskmanager and
>>>>>> > >> parallelism
>>>>>> > >> of 4, the amount of the memory used by the network stack should
>>>>>> be
>>>>>> > >> insignificant, at least as long as you have a reasonably sized
>>>>>> job graph
>>>>>> > >> (32KB * (2 * parallelism + 7) * number of tasks).
>>>>>> > >>
>>>>>> > >> > What I noticed in Flink 1.4 is that it doesn't read data from
>>>>>> Kafka if
>>>>>> > >>
>>>>>> > >> there is not sufficient heap memory to process data. Somehow
>>>>>> this is not
>>>>>> > >> happening in Flink 1.8 and it fills the heap soon enough not to
>>>>>> get
>>>>>> > >> GCed/Finalized. Any change around this between Flink 1.4 and
>>>>>> Flink 1.8.
>>>>>> > >>
>>>>>> > >> No, there were no changes in this part as far as I remember.
>>>>>> Tasks when
>>>>>> > >> producing records are serialising them and putting into the
>>>>>> network
>>>>>> > >> buffers. If there are no available network buffers, the task is
>>>>>> back
>>>>>> > >> pressuring and stops processing new records.
>>>>>> > >>
>>>>>> > >> Best regards,
>>>>>> > >> Piotrek
>>>>>> > >>
>>>>>> > >> wt., 8 wrz 2020 o 21:51 Josson Paul <jossonp...@gmail.com>
>>>>>> napisał(a):
>>>>>> > >>> Hi Piotr,
>>>>>> > >>>
>>>>>> > >>>    2) SystemProcessingTimeService holds the
>>>>>> HeapKeyedStateBackend and
>>>>>> > >>>
>>>>>> > >>> HeapKeyedStateBackend has lot of Objects and that is filling
>>>>>> the Heap
>>>>>> > >>>
>>>>>> > >>>    3) I am not using Flink Kafka Connector. But we are using
>>>>>> Apache Beam
>>>>>> > >>>
>>>>>> > >>> kafka connector.  There is a change in the Apache Beam version.
>>>>>> But the
>>>>>> > >>> kafka client we are using is the same as the one which was
>>>>>> working in
>>>>>> > >>> the
>>>>>> > >>> other cluster where  Flink was 1.4.
>>>>>> > >>>
>>>>>> > >>>   *There is no change in Hardware/Java/Kafka/Kafka
>>>>>> Client/Application
>>>>>> > >>>
>>>>>> > >>> between the cluster which is working and not working*
>>>>>> > >>>
>>>>>> > >>> I am aware of the memory changes and network buffer changes
>>>>>> between 1.4
>>>>>> > >>> and 1.8.
>>>>>> > >>>
>>>>>> > >>> Flink 1.4 had network buffers on Heap and 1.8 network buffers
>>>>>> are on the
>>>>>> > >>> native memory. I modified the Flink 1.8 code to put it back to
>>>>>> Heap
>>>>>> > >>> memory
>>>>>> > >>> but the issue didn't get resolved.
>>>>>> > >>>
>>>>>> > >>> Mine is a streaming job so we set 'taskmanager.memory.fraction'
>>>>>> to very
>>>>>> > >>> minimal and that heap is fully available for user data.
>>>>>> > >>>
>>>>>> > >>> Flink 1.4 was not using Credit based Flow control and Flink 1.8
>>>>>> uses
>>>>>> > >>> Credit based Flow control. *Our set up has only 1 task manager
>>>>>> and 4
>>>>>> > >>> parallelisms*.  According to this video
>>>>>> > >>>
>>>>>> https://www.youtube.com/watch?v=AbqatHF3tZI&ab_channel=FlinkForward (
>>>>>> > >>> *16:21*) if tasks are in same task manager,  Flink doesn't use
>>>>>> Credit
>>>>>> > >>> Based Flow control. Essentially no change between Flink 1.4 and
>>>>>> 1.8 in
>>>>>> > >>> *our
>>>>>> > >>> set up*. Still I tried to change the Credit Based Flow Control
>>>>>> to False
>>>>>> > >>> and test my setup. The problem persists.
>>>>>> > >>>
>>>>>> > >>> What I noticed in Flink 1.4 is that it doesn't read data from
>>>>>> Kafka if
>>>>>> > >>> there is not sufficient heap memory to process data. Somehow
>>>>>> this is not
>>>>>> > >>> happening in Flink 1.8 and it fills the heap soon enough not to
>>>>>> get
>>>>>> > >>> GCed/Finalized. Any change around this between Flink 1.4 and
>>>>>> Flink 1.8.
>>>>>> > >>>
>>>>>> > >>> My understanding on back pressure is that it is not based on
>>>>>> Heap memory
>>>>>> > >>> but based on how fast the Network buffers are filled. Is this
>>>>>> correct?.
>>>>>> > >>> Does Flink use TCP connection to communicate between tasks if
>>>>>> the tasks
>>>>>> > >>> are in the same Task manager?.
>>>>>> > >>>
>>>>>> > >>> Thanks,
>>>>>> > >>> josson
>>>>>> > >>>
>>>>>> > >>> On Thu, Sep 3, 2020 at 12:35 PM Piotr Nowojski <
>>>>>> pnowoj...@apache.org>
>>>>>> > >>>
>>>>>> > >>> wrote:
>>>>>> > >>>> Hi Josson,
>>>>>> > >>>>
>>>>>> > >>>> 2. Are you sure that all/vast majority of those objects are
>>>>>> pointing
>>>>>> > >>>> towards SystemProcessingTimeService? And is this really the
>>>>>> problem of
>>>>>> > >>>> those objects? Are they taking that much of the memory?
>>>>>> > >>>> 3. It still could be Kafka's problem, as it's likely that
>>>>>> between 1.4
>>>>>> > >>>> and 1.8.x we bumped Kafka dependencies.
>>>>>> > >>>>
>>>>>> > >>>> Frankly if that's not some other external dependency issue, I
>>>>>> would
>>>>>> > >>>> expect that the problem might lie somewhere completely else.
>>>>>> Flink's
>>>>>> > >>>> code
>>>>>> > >>>> relaying on the finalisation hasn't changed since 2015/2016.
>>>>>> On the
>>>>>> > >>>> other
>>>>>> > >>>> hand there were quite a bit of changes between 1.4 and 1.8.x,
>>>>>> some of
>>>>>> > >>>> them
>>>>>> > >>>> were affecting memory usage. Have you read release notes for
>>>>>> versions
>>>>>> > >>>> 1.5,
>>>>>> > >>>> 1.6, 1.7 and 1.8? In particular both 1.5 [1] and 1.8 [2] have
>>>>>> memory
>>>>>> > >>>> related notes that could be addressed via configuration
>>>>>> changes.
>>>>>> > >>>>
>>>>>> > >>>> Thanks,
>>>>>> > >>>> Piotrek
>>>>>> > >>>>
>>>>>> > >>>> [1]
>>>>>> > >>>>
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/release-not
>>>>>> > >>>> es/flink-1.5.html [2]
>>>>>> > >>>>
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/release-not
>>>>>> > >>>> es/flink-1.8.html>>>>
>>>>>> > >>>> czw., 3 wrz 2020 o 18:50 Josson Paul <jossonp...@gmail.com>
>>>>>> napisał(a):
>>>>>> > >>>>> 1) We are in the process of migrating to Flink 1.11. But it
>>>>>> is going
>>>>>> > >>>>> to take a while before we can make everything work with the
>>>>>> latest
>>>>>> > >>>>> version.
>>>>>> > >>>>> Meanwhile since this is happening in production I am trying
>>>>>> to solve
>>>>>> > >>>>> this.
>>>>>> > >>>>> 2) Finalizae class is pointing
>>>>>> > >>>>> to
>>>>>> > >>>>>
>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService
>>>>>> > >>>>> .
>>>>>> > >>>>> This class has a finalize method. I have attached spreadsheet
>>>>>> (
>>>>>> > >>>>> *Object-explorer.csv*) to give you a high level view
>>>>>> > >>>>> 3) The difference between working cluster and NON working
>>>>>> cluster is
>>>>>> > >>>>> only on Beam and Flink. Hardware, Input message rate,
>>>>>> Application
>>>>>> > >>>>> jars,
>>>>>> > >>>>> Kafka are all the same between those 2 clusters. Working
>>>>>> cluster was
>>>>>> > >>>>> with
>>>>>> > >>>>> Flink 1.4 and Beam 2.4.0
>>>>>> > >>>>>
>>>>>> > >>>>> Any insights into this will help me to debug further
>>>>>> > >>>>>
>>>>>> > >>>>> Thanks,
>>>>>> > >>>>> Josson
>>>>>> > >>>>>
>>>>>> > >>>>>
>>>>>> > >>>>> On Thu, Sep 3, 2020 at 3:34 AM Piotr Nowojski <
>>>>>> pnowoj...@apache.org>
>>>>>> > >>>>>
>>>>>> > >>>>> wrote:
>>>>>> > >>>>>> Hi,
>>>>>> > >>>>>>
>>>>>> > >>>>>> Have you tried using a more recent Flink version? 1.8.x is
>>>>>> no longer
>>>>>> > >>>>>> supported, and latest versions might not have this issue
>>>>>> anymore.
>>>>>> > >>>>>>
>>>>>> > >>>>>> Secondly, have you tried backtracking those references to the
>>>>>> > >>>>>> Finalizers? Assuming that Finalizer is indeed the class
>>>>>> causing
>>>>>> > >>>>>> problems.
>>>>>> > >>>>>>
>>>>>> > >>>>>> Also it may well be a non Flink issue [1].
>>>>>> > >>>>>>
>>>>>> > >>>>>> Best regards,
>>>>>> > >>>>>> Piotrek
>>>>>> > >>>>>>
>>>>>> > >>>>>> [1] https://issues.apache.org/jira/browse/KAFKA-8546
>>>>>> > >>>>>>
>>>>>> > >>>>>> czw., 3 wrz 2020 o 04:47 Josson Paul <jossonp...@gmail.com>
>>>>>> > >>>>>>
>>>>>> > >>>>>> napisał(a):
>>>>>> > >>>>>>> Hi All,
>>>>>> > >>>>>>>
>>>>>> > >>>>>>> *ISSUE*
>>>>>> > >>>>>>> ------
>>>>>> > >>>>>>> Flink application runs for sometime and suddenly the CPU
>>>>>> shoots up
>>>>>> > >>>>>>> and touches the peak, POD memory reaches to the peak, GC
>>>>>> count
>>>>>> > >>>>>>> increases,
>>>>>> > >>>>>>> Old-gen spaces reach close to 100%. Full GC doesn't clean
>>>>>> up heap
>>>>>> > >>>>>>> space. At
>>>>>> > >>>>>>> this point I stopped sending the data and cancelled the
>>>>>> Flink Jobs.
>>>>>> > >>>>>>> Still
>>>>>> > >>>>>>> the Old-Gen space doesn't come down. I took a heap dump and
>>>>>> can see
>>>>>> > >>>>>>> that
>>>>>> > >>>>>>> lot of Objects in the java.lang.Finalizer class. I have
>>>>>> attached the
>>>>>> > >>>>>>> details in a word document. I do have the heap dump but it
>>>>>> is close
>>>>>> > >>>>>>> to 2GB
>>>>>> > >>>>>>> of compressed size. Is it safe to upload somewhere and
>>>>>> share it
>>>>>> > >>>>>>> here?.
>>>>>> > >>>>>>>
>>>>>> > >>>>>>> This issue doesn't happen in Flink: 1.4.0 and Beam:
>>>>>> release-2.4.0
>>>>>> > >>>>>>>
>>>>>> > >>>>>>> *WORKING CLUSTER INFO* (Flink: 1.4.0 and Beam:
>>>>>> release-2.4.0)
>>>>>> > >>>>>>> ----------------------------------------------------
>>>>>> > >>>>>>>
>>>>>> > >>>>>>> Application reads from Kafka and does aggregations and
>>>>>> writes into
>>>>>> > >>>>>>> Kafka. Application has 5 minutes windows. Application uses
>>>>>> Beam
>>>>>> > >>>>>>> constructs
>>>>>> > >>>>>>> to build the pipeline. To read and write we use Beam
>>>>>> connectors.
>>>>>> > >>>>>>>
>>>>>> > >>>>>>> Flink version: 1.4.0
>>>>>> > >>>>>>> Beam version: release-2.4.0
>>>>>> > >>>>>>> Backend State: State backend is in the Heap and check
>>>>>> pointing
>>>>>> > >>>>>>> happening to the distributed File System.
>>>>>> > >>>>>>>
>>>>>> > >>>>>>> No of task Managers: 1
>>>>>> > >>>>>>> Heap: 6.4 GB
>>>>>> > >>>>>>> CPU: 4 Cores
>>>>>> > >>>>>>> Standalone cluster deployment on a Kubernetes pod
>>>>>> > >>>>>>>
>>>>>> > >>>>>>> *NOT WORKING CLUSTER INFO* (Flink version: 1.8.3 and Beam
>>>>>> version:
>>>>>> > >>>>>>> release-2.15.0)
>>>>>> > >>>>>>> ----------
>>>>>> > >>>>>>> Application details are same as above
>>>>>> > >>>>>>>
>>>>>> > >>>>>>> *No change in application and the rate at which data is
>>>>>> injected.
>>>>>> > >>>>>>> But change in Flink and Beam versions*
>>>>>> > >>>>>>>
>>>>>> > >>>>>>>
>>>>>> > >>>>>>> Flink version: 1.8.3
>>>>>> > >>>>>>> Beam version: release-2.15.0
>>>>>> > >>>>>>> Backend State: State backend is in the Heap and check
>>>>>> pointing
>>>>>> > >>>>>>> happening to the distributed File System.
>>>>>> > >>>>>>>
>>>>>> > >>>>>>> No of task Managers: 1
>>>>>> > >>>>>>> Heap: 6.5 GB
>>>>>> > >>>>>>> CPU: 4 Cores
>>>>>> > >>>>>>>
>>>>>> > >>>>>>> Deployment: Standalone cluster deployment on a Kubernetes
>>>>>> pod
>>>>>> > >>>>>>>
>>>>>> > >>>>>>> My Observations
>>>>>> > >>>>>>> -------------
>>>>>> > >>>>>>>
>>>>>> > >>>>>>> 1) CPU flame graph shows that in the working version, the
>>>>>> cpu time
>>>>>> > >>>>>>> on GC is lesser compared to non-working version (Please see
>>>>>> the
>>>>>> > >>>>>>> attached
>>>>>> > >>>>>>> Flame Graph. *CPU-flame-WORKING.svg* for working cluster and
>>>>>> > >>>>>>> *CPU-flame-NOT-working.svg*)
>>>>>> > >>>>>>>
>>>>>> > >>>>>>> 2) I have attached the flame graph for native memory MALLOC
>>>>>> calls
>>>>>> > >>>>>>> when the issue was happening. Please find the attached SVG
>>>>>> image (
>>>>>> > >>>>>>> *malloc-NOT-working.svg*). The POD memory peaks when this
>>>>>> issue
>>>>>> > >>>>>>> happens. For me, it looks like the GC process is requesting
>>>>>> a lot of
>>>>>> > >>>>>>> native
>>>>>> > >>>>>>> memory.
>>>>>> > >>>>>>>
>>>>>> > >>>>>>> 3) When the issue is happening the GC cpu usage is very
>>>>>> high. Please
>>>>>> > >>>>>>> see the flame graph (*CPU-graph-at-issuetime.svg*)
>>>>>> > >>>>>>>
>>>>>> > >>>>>>> Note: SVG file can be opened using any browser and it is
>>>>>> clickable
>>>>>> > >>>>>>> while opened.
>>>>>> > >>>>>>> --
>>>>>> > >>>>>>> Thanks
>>>>>> > >>>>>>> Josson
>>>>>> > >>>>>
>>>>>> > >>>>> --
>>>>>> > >>>>> Thanks
>>>>>> > >>>>> Josson
>>>>>> > >>>
>>>>>> > >>> --
>>>>>> > >>> Thanks
>>>>>> > >>> Josson
>>>>>> > >
>>>>>> > > --
>>>>>> > > Thanks
>>>>>> > > Josson
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Thanks
>>>>> Josson
>>>>>
>>>>
>>>
>>> --
>>> Thanks
>>> Josson
>>>
>>
>
> --
> Thanks
> Josson
>

Reply via email to