Quick question since the Meter issue does _not_ apply to 1.1.3, which Flink metrics are you using?
– Ufuk On 5 December 2016 at 16:44:47, Daniel Santos (dsan...@cryptolab.net) wrote: > Hello, > > Thank you all for the kindly reply. > > I've got the general idea. I am using version flink's 1.1.3. > > So it seems the fix of Meter's won't make it to 1.1.4 ? > > Best Regards, > > Daniel Santos > > > On 12/05/2016 01:28 PM, Chesnay Schepler wrote: > > We don't have to include it in 1.1.4 since Meter's do not exist in > > 1.1; my bad for tagging it in JIRA for 1.1.4. > > > > On 05.12.2016 14:18, Ufuk Celebi wrote: > >> Just to note that the bug mentioned by Chesnay does not invalidate > >> Stefan's comments. ;-) > >> > >> Chesnay's issue is here: > >> https://issues.apache.org/jira/browse/FLINK-5261 > >> > >> I added an issue to improve the documentation about cancellation > >> (https://issues.apache.org/jira/browse/FLINK-5260). > >> > >> Which version of Flink are you using? Chesnay's fix will make it into > >> the upcoming 1.1.4 release. > >> > >> > >> On 5 December 2016 at 14:04:49, Chesnay Schepler (ches...@apache.org) > >> wrote: > >>> Hello Daniel, > >>> I'm afraid you stumbled upon a bug in Flink. Meters were not properly > >>> cleaned up, causing the underlying dropwizard meter update threads to > >>> not be shutdown either. > >>> I've opened a JIRA > >>> and will open a PR soon. > >>> Thank your for reporting this issue. > >>> Regards, > >>> Chesnay > >>> On 05.12.2016 12:05, Stefan Richter wrote: > >>>> Hi Daniel, > >>>> > >>>> the behaviour you observe looks like some threads are not canceled. > >>>> Thread cancelation in Flink (and Java in general) is always > >>>> cooperative, where cooperative means that the thread you want to > >>>> cancel should somehow check cancelation and react to it. Sometimes > >>>> this also requires some effort from the client that wants to cancel a > >>>> thread. So if you implement e.g. custom operators or functions with > >>>> aerospike, you must ensure that they a) react on cancelation and b) > >>>> cleanup their resources. If you do not consider this, your aerospike > >>>> client might stay in a blocking call forever, in particular blocking > >>>> IO calls are prone to this. What you need to ensure is that > >>>> cancelation from the clients includes closing IO resources such as > >>>> streams to unblock the thread and allow for termination. This means > >>>> that you need your code must (to a certain degree) actively > >>>> participate in Flink's task lifecycle. In Flink 1.2 we introduce a > >>>> feature called CloseableRegistry, which makes participating in this > >>>> lifecycle easier w.r.t. closing resources. For the time being, you > >>>> should check that Flink’s task cancelation also causes your code to > >>>> close the aerospike client and check cancelation flags. > >>>> > >>>> Best, > >>>> Stefan > >>>> > >>>>> Am 05.12.2016 um 11:42 schrieb Daniel Santos > >> >: > >>>>> > >>>>> Hello, > >>>>> > >>>>> I have done some threads checking and dumps. And I have disabled the > >>>>> checkpointing. > >>>>> > >>>>> Here are my findings. > >>>>> > >>>>> I did a thread dump a few hours after I booted up the whole cluster. > >>>>> (@2/12/2016; 5 TM ; 3GB HEAP each ; 7GB total each as Limit ) > >>>>> > >>>>> The dump shows that most threads are of 3 sources. > >>>>> * > >>>>> **OutputFlusher --- 634 -- Sleeping State* > >>>>> > >>>>> "OutputFlusher" - Thread t@4758 > >>>>> java.lang.Thread.State: TIMED_WAITING > >>>>> at java.lang.Thread.sleep(Native Method) > >>>>> at > >>>>> org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:164) > >>>>> > >>>>> > >>>>> > >>>>> Locked ownable synchronizers: > >>>>> - None > >>>>> * > >>>>> **Metrics --- 376 ( Flink Metrics Reporter it's the only metrics > >>>>> being used ) -- Parked State* > >>>>> > >>>>> "metrics-meter-tick-thread-1" - Thread t@29024 > >>>>> java.lang.Thread.State: TIMED_WAITING > >>>>> at sun.misc.Unsafe.park(Native Method) > >>>>> - parking to wait for (a > >>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > >>>>> > >>>>> at > >>>>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > >>>>> > >>>>> at > >>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) > >>>>> > >>>>> > >>>>> at > >>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093) > >>>>> > >>>>> at > >>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809) > >>>>> > >>>>> > >>>>> at > >>>>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067) > >>>>> > >>>>> > >>>>> at > >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127) > >>>>> > >>>>> > >>>>> at > >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > >>>>> > >>>>> > >>>>> at java.lang.Thread.run(Thread.java:745) > >>>>> > >>>>> Locked ownable synchronizers: > >>>>> - None > >>>>> * > >>>>> * > >>>>> > >>>>> *tend -- 220 ( Aerospike Client Thread ) -- Sleeping State > >>>>> * > >>>>> > >>>>> "tend" - Thread t@29011 > >>>>> java.lang.Thread.State: TIMED_WAITING > >>>>> at java.lang.Thread.sleep(Native Method) > >>>>> at com.aerospike.client.util.Util.sleep(Util.java:38) > >>>>> at com.aerospike.client.cluster.Cluster.run(Cluster.java:262) > >>>>> at java.lang.Thread.run(Thread.java:745) > >>>>> > >>>>> Locked ownable synchronizers: > >>>>> - None > >>>>> > >>>>> > >>>>> I have 2 streaming jobs and a batch Job that runs once in a while. > >>>>> > >>>>> Streaming job A runs with a parallel of 2 and runs Aerospike only in > >>>>> RichSink . > >>>>> > >>>>> Streaming job B runs with a parallel of 24 and runs Aerospike in > >>>>> RichFilterFunction / RichMapFunction with open and close methods, in > >>>>> order to open and close the client. > >>>>> > >>>>> Batch Job runs Aerospike Client in RichFilterFunction / > >>>>> RichMapFunction with open and close methods in order to open and > >>>>> close the client. > >>>>> > >>>>> Next thing I cancelled all the streaming jobs @5/12/2016 and checked > >>>>> the threads and the JVM non-heap usage. > >>>>> > >>>>> JVM non-heap usage reaches 3GB, threads go down, but some still > >>>>> linger around and they are the following. > >>>>> > >>>>> *Metrics --- 790 ( Flink Metrics Reporter it's the only metrics being > >>>>> used ) * > >>>>> > >>>>> "metrics-meter-tick-thread-1" - Thread t@29024 > >>>>> java.lang.Thread.State: TIMED_WAITING > >>>>> at sun.misc.Unsafe.park(Native Method) > >>>>> - parking to wait for (a > >>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > >>>>> > >>>>> at > >>>>> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > >>>>> > >>>>> at > >>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) > >>>>> > >>>>> > >>>>> at > >>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093) > >>>>> > >>>>> at > >>>>> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809) > >>>>> > >>>>> > >>>>> at > >>>>> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067) > >>>>> > >>>>> > >>>>> at > >>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127) > >>>>> > >>>>> > >>>>> at > >>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > >>>>> > >>>>> > >>>>> at java.lang.Thread.run(Thread.java:745) > >>>>> > >>>>> Locked ownable synchronizers: > >>>>> - None > >>>>> > >>>>> * > >>>>> * > >>>>> > >>>>> *tend -- 432**( Aerospike Client Thread )* > >>>>> > >>>>> > >>>>> "tend" - Thread t@29011 > >>>>> java.lang.Thread.State: TIMED_WAITING > >>>>> at java.lang.Thread.sleep(Native Method) > >>>>> at com.aerospike.client.util.Util.sleep(Util.java:38) > >>>>> at com.aerospike.client.cluster.Cluster.run(Cluster.java:262) > >>>>> at java.lang.Thread.run(Thread.java:745) > >>>>> > >>>>> Locked ownable synchronizers: > >>>>> - None > >>>>> > >>>>> > >>>>> Total number threads are 1289 ( total ) / 1220 ( tend + metrics ) . > >>>>> So I have 1220 threads that I believe that sould be dead and not > >>>>> running, since I have no jobs running at all. > >>>>> > >>>>> And the JVM Non-HEAP usage doesn't decreases at all, after removing > >>>>> every job. > >>>>> > >>>>> > >>>>> Why the hell metrics grow to no end ? > >>>>> > >>>>> I am using the following libs for metrics : > >>>>> > >>>>> - metrics-graphite-3.1.0.jar > >>>>> > >>>>> - metrics-core-3.1.0.jar > >>>>> > >>>>> - flink-metrics-dropwizard-1.1.3.jar > >>>>> > >>>>> - flink-metrics-graphite-1.1.3.jar > >>>>> > >>>>> And the config for reporter is : > >>>>> > >>>>> metrics.reporters: graphite > >>>>> metrics.reporter.graphite.class: > >>>>> org.apache.flink.metrics.graphite.GraphiteReporter > >>>>> metrics.reporter.graphite.host: CARBONRELAYHOST > >>>>> metrics.reporter.graphite.port: 2003 > >>>>> > >>>>> > >>>>> Shouldn't also the Aerospike Client be closed ? Or am I missing > >>>>> something, or doing something wrong ? > >>>>> > >>>>> > >>>>> Sorry for the long post. > >>>>> > >>>>> Best Regards, > >>>>> > >>>>> Daniel Santos > >>>>> > >>>>> > >>>>> On 11/29/2016 04:57 PM, Ufuk Celebi wrote: > >>>>>> Hey Daniel! > >>>>>> > >>>>>> Thanks for reporting this. Unbounded growth of non-heap memory is > >>>>>> not expected. > >>> What kind of Threads are you seeing being spawned/lingering around? > >>>>>> As a first step, could you try to disable checkpointing and see > >>>>>> how it behaves afterwards? > >>>>>> > >>>>>> – Ufuk > >>>>>> > >>>>>> On 29 November 2016 at 17:32:32, Daniel Santos > >>>>>> (dsan...@cryptolab.net) wrote: > >>>>>>> Hello, > >>>>>>> > >>>>>>> Nope I am using Hadoop HDFS, as state backend, Kafka, as source, > >>>>>>> and a > >>>>>>> HttpClient as a Sink, also Kafka as Sink. > >>>>>>> So it's possible that the state backend is the culprit? > >>>>>>> > >>>>>>> Curious thing is even when no jobs are running streaming or > >>>>>>> otherwise, > >>>>>>> the JVM Non-HEAP stays the same. > >>>>>>> Which I find it odd. > >>>>>>> > >>>>>>> Another curious thing is that it's proportional to an increase > >>>>>>> of JVM > >>>>>>> thread's number. > >>>>>>> Whenever there are more JVM threads running there is also more JVM > >>>>>>> Non-HEAP being used, which makes sense. > >>>>>>> But threads stick around never decreasing, too, likewise JVM > >>>>>>> Non-HEAP > >>>>>>> memory. > >>>>>>> > >>>>>>> These observations described are based on what flink's metrics > >>>>>>> are being > >>>>>>> sent and recorded to our graphite's system. > >>>>>>> > >>>>>>> Best Regards, > >>>>>>> > >>>>>>> Daniel Santos > >>>>>>> > >>>>>>> On 11/29/2016 04:04 PM, Cliff Resnick wrote: > >>>>>>>> Are you using the RocksDB backend in native mode? If so then the > >>>>>>>> off-heap memory may be there. > >>>>>>>> > >>>>>>>> On Tue, Nov 29, 2016 at 9:54 AM, > > > wrote: > >>>>>>>> > >>>>>>>> i have the same problem,but i put the flink job into yarn. > >>>>>>>> but i put the job into yarn on the computer 22,and the job can > >>>>>>>> success run,and the jobmanager is 79 and taskmanager is 69,they > >>>>>>>> three different compu345ter, > >>>>>>>> however,on computer 22,the pid=3463,which is the job that put into > >>>>>>>> yarn,is have 2.3g memory,15% of total, > >>>>>>>> the commend is : ./flink run -m yarn-cluster -yn 1 -ys 1 -yjm 1024 > >>>>>>>> -ytm 1024 .... > >>>>>>>> why in conputer 22,has occupy so much momory?the job is running > >>>>>>>> computer 79 and computer 69. > >>>>>>>> What would be the possible causes of such behavior ? > >>>>>>>> Best Regards, > >>>>>>>> ----- 原始邮件 ----- > >>>>>>>> 发件人:Daniel Santos > > > > >>>>>>>> 收件人:user@flink.apache.org > >>>>>>>> 主题:JVM Non Heap Memory > >>>>>>>> 日期:2016年11月29日 22点26分 > >>>>>>>> > >>>>>>>> > >>>>>>>> Hello, > >>>>>>>> Is it common to have high usage of Non-Heap in JVM ? > >>>>>>>> I am running flink in stand-alone cluster and in docker, with each > >>>>>>>> docker bieng capped at 6G of memory. > >>>>>>>> I have been struggling to keep memory usage in check. > >>>>>>>> The non-heap increases to no end. It start with just 100MB of > >>>>>>>> usage and > >>>>>>>> after a day it reaches to 1,3GB. > >>>>>>>> Then evetually reaches to 2GB and then eventually the docker is > >>>>>>>> killed > >>>>>>>> because it has reached the memory limit. > >>>>>>>> My configuration for each flink task manager is the following : > >>>>>>>> ----------- flink-conf.yaml -------------- > >>>>>>>> taskmanager.heap.mb: 3072 > >>>>>>>> taskmanager.numberOfTaskSlots: 8 > >>>>>>>> taskmanager.memory.preallocate: false > >>>>>>>> taskmanager.network.numberOfBuffers: 12500 > >>>>>>>> taskmanager.memory.off-heap: false > >>>>>>>> --------------------------------------------- > >>>>>>>> What would be the possible causes of such behavior ? > >>>>>>>> Best Regards, > >>>>>>>> Daniel Santos > >>>>>>>> > >>>>>>>> > >>>>>>> > >> > > > >