Quick question since the Meter issue does _not_ apply to 1.1.3, which Flink 
metrics are you using?

– Ufuk

On 5 December 2016 at 16:44:47, Daniel Santos (dsan...@cryptolab.net) wrote:
> Hello,
>  
> Thank you all for the kindly reply.
>  
> I've got the general idea. I am using version flink's 1.1.3.
>  
> So it seems the fix of Meter's won't make it to 1.1.4 ?
>  
> Best Regards,
>  
> Daniel Santos
>  
>  
> On 12/05/2016 01:28 PM, Chesnay Schepler wrote:
> > We don't have to include it in 1.1.4 since Meter's do not exist in
> > 1.1; my bad for tagging it in JIRA for 1.1.4.
> >
> > On 05.12.2016 14:18, Ufuk Celebi wrote:
> >> Just to note that the bug mentioned by Chesnay does not invalidate
> >> Stefan's comments. ;-)
> >>
> >> Chesnay's issue is here:
> >> https://issues.apache.org/jira/browse/FLINK-5261
> >>
> >> I added an issue to improve the documentation about cancellation
> >> (https://issues.apache.org/jira/browse/FLINK-5260).
> >>
> >> Which version of Flink are you using? Chesnay's fix will make it into
> >> the upcoming 1.1.4 release.
> >>
> >>
> >> On 5 December 2016 at 14:04:49, Chesnay Schepler (ches...@apache.org)
> >> wrote:
> >>> Hello Daniel,
> >>> I'm afraid you stumbled upon a bug in Flink. Meters were not properly
> >>> cleaned up, causing the underlying dropwizard meter update threads to
> >>> not be shutdown either.
> >>> I've opened a JIRA
> >>> and will open a PR soon.
> >>> Thank your for reporting this issue.
> >>> Regards,
> >>> Chesnay
> >>> On 05.12.2016 12:05, Stefan Richter wrote:
> >>>> Hi Daniel,
> >>>>
> >>>> the behaviour you observe looks like some threads are not canceled.
> >>>> Thread cancelation in Flink (and Java in general) is always
> >>>> cooperative, where cooperative means that the thread you want to
> >>>> cancel should somehow check cancelation and react to it. Sometimes
> >>>> this also requires some effort from the client that wants to cancel a
> >>>> thread. So if you implement e.g. custom operators or functions with
> >>>> aerospike, you must ensure that they a) react on cancelation and b)
> >>>> cleanup their resources. If you do not consider this, your aerospike
> >>>> client might stay in a blocking call forever, in particular blocking
> >>>> IO calls are prone to this. What you need to ensure is that
> >>>> cancelation from the clients includes closing IO resources such as
> >>>> streams to unblock the thread and allow for termination. This means
> >>>> that you need your code must (to a certain degree) actively
> >>>> participate in Flink's task lifecycle. In Flink 1.2 we introduce a
> >>>> feature called CloseableRegistry, which makes participating in this
> >>>> lifecycle easier w.r.t. closing resources. For the time being, you
> >>>> should check that Flink’s task cancelation also causes your code to
> >>>> close the aerospike client and check cancelation flags.
> >>>>
> >>>> Best,
> >>>> Stefan
> >>>>
> >>>>> Am 05.12.2016 um 11:42 schrieb Daniel Santos > >> >:
> >>>>>
> >>>>> Hello,
> >>>>>
> >>>>> I have done some threads checking and dumps. And I have disabled the
> >>>>> checkpointing.
> >>>>>
> >>>>> Here are my findings.
> >>>>>
> >>>>> I did a thread dump a few hours after I booted up the whole cluster.
> >>>>> (@2/12/2016; 5 TM ; 3GB HEAP each ; 7GB total each as Limit )
> >>>>>
> >>>>> The dump shows that most threads are of 3 sources.
> >>>>> *
> >>>>> **OutputFlusher --- 634 -- Sleeping State*
> >>>>>
> >>>>> "OutputFlusher" - Thread t@4758
> >>>>> java.lang.Thread.State: TIMED_WAITING
> >>>>> at java.lang.Thread.sleep(Native Method)
> >>>>> at
> >>>>> org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:164)
> >>>>>   
> >>>>>
> >>>>>
> >>>>> Locked ownable synchronizers:
> >>>>> - None
> >>>>> *
> >>>>> **Metrics --- 376 ( Flink Metrics Reporter it's the only metrics
> >>>>> being used ) -- Parked State*
> >>>>>
> >>>>> "metrics-meter-tick-thread-1" - Thread t@29024
> >>>>> java.lang.Thread.State: TIMED_WAITING
> >>>>> at sun.misc.Unsafe.park(Native Method)
> >>>>> - parking to wait for (a
> >>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)  
> >>>>>
> >>>>> at
> >>>>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)  
> >>>>>
> >>>>> at
> >>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> >>>>>   
> >>>>>
> >>>>> at
> >>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
> >>>>>
> >>>>> at
> >>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
> >>>>>   
> >>>>>
> >>>>> at
> >>>>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
> >>>>>   
> >>>>>
> >>>>> at
> >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
> >>>>>   
> >>>>>
> >>>>> at
> >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> >>>>>   
> >>>>>
> >>>>> at java.lang.Thread.run(Thread.java:745)
> >>>>>
> >>>>> Locked ownable synchronizers:
> >>>>> - None
> >>>>> *
> >>>>> *
> >>>>>
> >>>>> *tend -- 220 ( Aerospike Client Thread ) -- Sleeping State
> >>>>> *
> >>>>>
> >>>>> "tend" - Thread t@29011
> >>>>> java.lang.Thread.State: TIMED_WAITING
> >>>>> at java.lang.Thread.sleep(Native Method)
> >>>>> at com.aerospike.client.util.Util.sleep(Util.java:38)
> >>>>> at com.aerospike.client.cluster.Cluster.run(Cluster.java:262)
> >>>>> at java.lang.Thread.run(Thread.java:745)
> >>>>>
> >>>>> Locked ownable synchronizers:
> >>>>> - None
> >>>>>
> >>>>>
> >>>>> I have 2 streaming jobs and a batch Job that runs once in a while.
> >>>>>
> >>>>> Streaming job A runs with a parallel of 2 and runs Aerospike only in
> >>>>> RichSink .
> >>>>>
> >>>>> Streaming job B runs with a parallel of 24 and runs Aerospike in
> >>>>> RichFilterFunction / RichMapFunction with open and close methods, in
> >>>>> order to open and close the client.
> >>>>>
> >>>>> Batch Job runs Aerospike Client in RichFilterFunction /
> >>>>> RichMapFunction with open and close methods in order to open and
> >>>>> close the client.
> >>>>>
> >>>>> Next thing I cancelled all the streaming jobs @5/12/2016 and checked
> >>>>> the threads and the JVM non-heap usage.
> >>>>>
> >>>>> JVM non-heap usage reaches 3GB, threads go down, but some still
> >>>>> linger around and they are the following.
> >>>>>
> >>>>> *Metrics --- 790 ( Flink Metrics Reporter it's the only metrics being
> >>>>> used ) *
> >>>>>
> >>>>> "metrics-meter-tick-thread-1" - Thread t@29024
> >>>>> java.lang.Thread.State: TIMED_WAITING
> >>>>> at sun.misc.Unsafe.park(Native Method)
> >>>>> - parking to wait for (a
> >>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)  
> >>>>>
> >>>>> at
> >>>>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)  
> >>>>>
> >>>>> at
> >>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> >>>>>   
> >>>>>
> >>>>> at
> >>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
> >>>>>
> >>>>> at
> >>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
> >>>>>   
> >>>>>
> >>>>> at
> >>>>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
> >>>>>   
> >>>>>
> >>>>> at
> >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
> >>>>>   
> >>>>>
> >>>>> at
> >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> >>>>>   
> >>>>>
> >>>>> at java.lang.Thread.run(Thread.java:745)
> >>>>>
> >>>>> Locked ownable synchronizers:
> >>>>> - None
> >>>>>
> >>>>> *
> >>>>> *
> >>>>>
> >>>>> *tend -- 432**( Aerospike Client Thread )*
> >>>>>
> >>>>>
> >>>>> "tend" - Thread t@29011
> >>>>> java.lang.Thread.State: TIMED_WAITING
> >>>>> at java.lang.Thread.sleep(Native Method)
> >>>>> at com.aerospike.client.util.Util.sleep(Util.java:38)
> >>>>> at com.aerospike.client.cluster.Cluster.run(Cluster.java:262)
> >>>>> at java.lang.Thread.run(Thread.java:745)
> >>>>>
> >>>>> Locked ownable synchronizers:
> >>>>> - None
> >>>>>
> >>>>>
> >>>>> Total number threads are 1289 ( total ) / 1220 ( tend + metrics ) .
> >>>>> So I have 1220 threads that I believe that sould be dead and not
> >>>>> running, since I have no jobs running at all.
> >>>>>
> >>>>> And the JVM Non-HEAP usage doesn't decreases at all, after removing
> >>>>> every job.
> >>>>>
> >>>>>
> >>>>> Why the hell metrics grow to no end ?
> >>>>>
> >>>>> I am using the following libs for metrics :
> >>>>>
> >>>>> - metrics-graphite-3.1.0.jar
> >>>>>
> >>>>> - metrics-core-3.1.0.jar
> >>>>>
> >>>>> - flink-metrics-dropwizard-1.1.3.jar
> >>>>>
> >>>>> - flink-metrics-graphite-1.1.3.jar
> >>>>>
> >>>>> And the config for reporter is :
> >>>>>
> >>>>> metrics.reporters: graphite
> >>>>> metrics.reporter.graphite.class:
> >>>>> org.apache.flink.metrics.graphite.GraphiteReporter
> >>>>> metrics.reporter.graphite.host: CARBONRELAYHOST
> >>>>> metrics.reporter.graphite.port: 2003
> >>>>>
> >>>>>
> >>>>> Shouldn't also the Aerospike Client be closed ? Or am I missing
> >>>>> something, or doing something wrong ?
> >>>>>
> >>>>>
> >>>>> Sorry for the long post.
> >>>>>
> >>>>> Best Regards,
> >>>>>
> >>>>> Daniel Santos
> >>>>>
> >>>>>
> >>>>> On 11/29/2016 04:57 PM, Ufuk Celebi wrote:
> >>>>>> Hey Daniel!
> >>>>>>
> >>>>>> Thanks for reporting this. Unbounded growth of non-heap memory is
> >>>>>> not expected.
> >>> What kind of Threads are you seeing being spawned/lingering around?
> >>>>>> As a first step, could you try to disable checkpointing and see
> >>>>>> how it behaves afterwards?
> >>>>>>
> >>>>>> – Ufuk
> >>>>>>
> >>>>>> On 29 November 2016 at 17:32:32, Daniel Santos
> >>>>>> (dsan...@cryptolab.net) wrote:
> >>>>>>> Hello,
> >>>>>>>
> >>>>>>> Nope I am using Hadoop HDFS, as state backend, Kafka, as source,
> >>>>>>> and a
> >>>>>>> HttpClient as a Sink, also Kafka as Sink.
> >>>>>>> So it's possible that the state backend is the culprit?
> >>>>>>>
> >>>>>>> Curious thing is even when no jobs are running streaming or
> >>>>>>> otherwise,
> >>>>>>> the JVM Non-HEAP stays the same.
> >>>>>>> Which I find it odd.
> >>>>>>>
> >>>>>>> Another curious thing is that it's proportional to an increase
> >>>>>>> of JVM
> >>>>>>> thread's number.
> >>>>>>> Whenever there are more JVM threads running there is also more JVM
> >>>>>>> Non-HEAP being used, which makes sense.
> >>>>>>> But threads stick around never decreasing, too, likewise JVM
> >>>>>>> Non-HEAP
> >>>>>>> memory.
> >>>>>>>
> >>>>>>> These observations described are based on what flink's metrics
> >>>>>>> are being
> >>>>>>> sent and recorded to our graphite's system.
> >>>>>>>
> >>>>>>> Best Regards,
> >>>>>>>
> >>>>>>> Daniel Santos
> >>>>>>>
> >>>>>>> On 11/29/2016 04:04 PM, Cliff Resnick wrote:
> >>>>>>>> Are you using the RocksDB backend in native mode? If so then the
> >>>>>>>> off-heap memory may be there.
> >>>>>>>>
> >>>>>>>> On Tue, Nov 29, 2016 at 9:54 AM, > > > wrote:
> >>>>>>>>
> >>>>>>>> i have the same problem,but i put the flink job into yarn.
> >>>>>>>> but i put the job into yarn on the computer 22,and the job can
> >>>>>>>> success run,and the jobmanager is 79 and taskmanager is 69,they
> >>>>>>>> three different compu345ter,
> >>>>>>>> however,on computer 22,the pid=3463,which is the job that put into
> >>>>>>>> yarn,is have 2.3g memory,15% of total,
> >>>>>>>> the commend is : ./flink run -m yarn-cluster -yn 1 -ys 1 -yjm 1024
> >>>>>>>> -ytm 1024 ....
> >>>>>>>> why in conputer 22,has occupy so much momory?the job is running
> >>>>>>>> computer 79 and computer 69.
> >>>>>>>> What would be the possible causes of such behavior ?
> >>>>>>>> Best Regards,
> >>>>>>>> ----- 原始邮件 -----
> >>>>>>>> 发件人:Daniel Santos > > >
> >>>>>>>> 收件人:user@flink.apache.org
> >>>>>>>> 主题:JVM Non Heap Memory
> >>>>>>>> 日期:2016年11月29日 22点26分
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Hello,
> >>>>>>>> Is it common to have high usage of Non-Heap in JVM ?
> >>>>>>>> I am running flink in stand-alone cluster and in docker, with each
> >>>>>>>> docker bieng capped at 6G of memory.
> >>>>>>>> I have been struggling to keep memory usage in check.
> >>>>>>>> The non-heap increases to no end. It start with just 100MB of
> >>>>>>>> usage and
> >>>>>>>> after a day it reaches to 1,3GB.
> >>>>>>>> Then evetually reaches to 2GB and then eventually the docker is
> >>>>>>>> killed
> >>>>>>>> because it has reached the memory limit.
> >>>>>>>> My configuration for each flink task manager is the following :
> >>>>>>>> ----------- flink-conf.yaml --------------
> >>>>>>>> taskmanager.heap.mb: 3072
> >>>>>>>> taskmanager.numberOfTaskSlots: 8
> >>>>>>>> taskmanager.memory.preallocate: false
> >>>>>>>> taskmanager.network.numberOfBuffers: 12500
> >>>>>>>> taskmanager.memory.off-heap: false
> >>>>>>>> ---------------------------------------------
> >>>>>>>> What would be the possible causes of such behavior ?
> >>>>>>>> Best Regards,
> >>>>>>>> Daniel Santos
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>
> >
>  
>  

Reply via email to