Just to note that the bug mentioned by Chesnay does not invalidate Stefan's comments. ;-)
Chesnay's issue is here: https://issues.apache.org/jira/browse/FLINK-5261 I added an issue to improve the documentation about cancellation (https://issues.apache.org/jira/browse/FLINK-5260). Which version of Flink are you using? Chesnay's fix will make it into the upcoming 1.1.4 release. On 5 December 2016 at 14:04:49, Chesnay Schepler (ches...@apache.org) wrote: > Hello Daniel, > > I'm afraid you stumbled upon a bug in Flink. Meters were not properly > cleaned up, causing the underlying dropwizard meter update threads to > not be shutdown either. > > I've opened a JIRA > and will open a PR soon. > > Thank your for reporting this issue. > > Regards, > Chesnay > > On 05.12.2016 12:05, Stefan Richter wrote: > > Hi Daniel, > > > > the behaviour you observe looks like some threads are not canceled. > > Thread cancelation in Flink (and Java in general) is always > > cooperative, where cooperative means that the thread you want to > > cancel should somehow check cancelation and react to it. Sometimes > > this also requires some effort from the client that wants to cancel a > > thread. So if you implement e.g. custom operators or functions with > > aerospike, you must ensure that they a) react on cancelation and b) > > cleanup their resources. If you do not consider this, your aerospike > > client might stay in a blocking call forever, in particular blocking > > IO calls are prone to this. What you need to ensure is that > > cancelation from the clients includes closing IO resources such as > > streams to unblock the thread and allow for termination. This means > > that you need your code must (to a certain degree) actively > > participate in Flink's task lifecycle. In Flink 1.2 we introduce a > > feature called CloseableRegistry, which makes participating in this > > lifecycle easier w.r.t. closing resources. For the time being, you > > should check that Flink’s task cancelation also causes your code to > > close the aerospike client and check cancelation flags. > > > > Best, > > Stefan > > > >> Am 05.12.2016 um 11:42 schrieb Daniel Santos > >> >: > >> > >> Hello, > >> > >> I have done some threads checking and dumps. And I have disabled the > >> checkpointing. > >> > >> Here are my findings. > >> > >> I did a thread dump a few hours after I booted up the whole cluster. > >> (@2/12/2016; 5 TM ; 3GB HEAP each ; 7GB total each as Limit ) > >> > >> The dump shows that most threads are of 3 sources. > >> * > >> **OutputFlusher --- 634 -- Sleeping State* > >> > >> "OutputFlusher" - Thread t@4758 > >> java.lang.Thread.State: TIMED_WAITING > >> at java.lang.Thread.sleep(Native Method) > >> at > >> org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:164) > >> > >> > >> Locked ownable synchronizers: > >> - None > >> * > >> **Metrics --- 376 ( Flink Metrics Reporter it's the only metrics > >> being used ) -- Parked State* > >> > >> "metrics-meter-tick-thread-1" - Thread t@29024 > >> java.lang.Thread.State: TIMED_WAITING > >> at sun.misc.Unsafe.park(Native Method) > >> - parking to wait for (a > >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > >> at > >> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > >> at > >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) > >> > >> at > >> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093) > >> at > >> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809) > >> > >> at > >> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067) > >> > >> at > >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127) > >> > >> at > >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > >> > >> at java.lang.Thread.run(Thread.java:745) > >> > >> Locked ownable synchronizers: > >> - None > >> * > >> * > >> > >> *tend -- 220 ( Aerospike Client Thread ) -- Sleeping State > >> * > >> > >> "tend" - Thread t@29011 > >> java.lang.Thread.State: TIMED_WAITING > >> at java.lang.Thread.sleep(Native Method) > >> at com.aerospike.client.util.Util.sleep(Util.java:38) > >> at com.aerospike.client.cluster.Cluster.run(Cluster.java:262) > >> at java.lang.Thread.run(Thread.java:745) > >> > >> Locked ownable synchronizers: > >> - None > >> > >> > >> I have 2 streaming jobs and a batch Job that runs once in a while. > >> > >> Streaming job A runs with a parallel of 2 and runs Aerospike only in > >> RichSink . > >> > >> Streaming job B runs with a parallel of 24 and runs Aerospike in > >> RichFilterFunction / RichMapFunction with open and close methods, in > >> order to open and close the client. > >> > >> Batch Job runs Aerospike Client in RichFilterFunction / > >> RichMapFunction with open and close methods in order to open and > >> close the client. > >> > >> Next thing I cancelled all the streaming jobs @5/12/2016 and checked > >> the threads and the JVM non-heap usage. > >> > >> JVM non-heap usage reaches 3GB, threads go down, but some still > >> linger around and they are the following. > >> > >> *Metrics --- 790 ( Flink Metrics Reporter it's the only metrics being > >> used ) * > >> > >> "metrics-meter-tick-thread-1" - Thread t@29024 > >> java.lang.Thread.State: TIMED_WAITING > >> at sun.misc.Unsafe.park(Native Method) > >> - parking to wait for (a > >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > >> at > >> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > >> at > >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) > >> > >> at > >> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093) > >> at > >> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809) > >> > >> at > >> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067) > >> > >> at > >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127) > >> > >> at > >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > >> > >> at java.lang.Thread.run(Thread.java:745) > >> > >> Locked ownable synchronizers: > >> - None > >> > >> * > >> * > >> > >> *tend -- 432**( Aerospike Client Thread )* > >> > >> > >> "tend" - Thread t@29011 > >> java.lang.Thread.State: TIMED_WAITING > >> at java.lang.Thread.sleep(Native Method) > >> at com.aerospike.client.util.Util.sleep(Util.java:38) > >> at com.aerospike.client.cluster.Cluster.run(Cluster.java:262) > >> at java.lang.Thread.run(Thread.java:745) > >> > >> Locked ownable synchronizers: > >> - None > >> > >> > >> Total number threads are 1289 ( total ) / 1220 ( tend + metrics ) . > >> So I have 1220 threads that I believe that sould be dead and not > >> running, since I have no jobs running at all. > >> > >> And the JVM Non-HEAP usage doesn't decreases at all, after removing > >> every job. > >> > >> > >> Why the hell metrics grow to no end ? > >> > >> I am using the following libs for metrics : > >> > >> - metrics-graphite-3.1.0.jar > >> > >> - metrics-core-3.1.0.jar > >> > >> - flink-metrics-dropwizard-1.1.3.jar > >> > >> - flink-metrics-graphite-1.1.3.jar > >> > >> And the config for reporter is : > >> > >> metrics.reporters: graphite > >> metrics.reporter.graphite.class: > >> org.apache.flink.metrics.graphite.GraphiteReporter > >> metrics.reporter.graphite.host: CARBONRELAYHOST > >> metrics.reporter.graphite.port: 2003 > >> > >> > >> Shouldn't also the Aerospike Client be closed ? Or am I missing > >> something, or doing something wrong ? > >> > >> > >> Sorry for the long post. > >> > >> Best Regards, > >> > >> Daniel Santos > >> > >> > >> On 11/29/2016 04:57 PM, Ufuk Celebi wrote: > >>> Hey Daniel! > >>> > >>> Thanks for reporting this. Unbounded growth of non-heap memory is not > >>> expected. > What kind of Threads are you seeing being spawned/lingering around? > >>> > >>> As a first step, could you try to disable checkpointing and see how it > >>> behaves afterwards? > >>> > >>> – Ufuk > >>> > >>> On 29 November 2016 at 17:32:32, Daniel Santos (dsan...@cryptolab.net) > >>> wrote: > >>>> Hello, > >>>> > >>>> Nope I am using Hadoop HDFS, as state backend, Kafka, as source, and a > >>>> HttpClient as a Sink, also Kafka as Sink. > >>>> So it's possible that the state backend is the culprit? > >>>> > >>>> Curious thing is even when no jobs are running streaming or otherwise, > >>>> the JVM Non-HEAP stays the same. > >>>> Which I find it odd. > >>>> > >>>> Another curious thing is that it's proportional to an increase of JVM > >>>> thread's number. > >>>> Whenever there are more JVM threads running there is also more JVM > >>>> Non-HEAP being used, which makes sense. > >>>> But threads stick around never decreasing, too, likewise JVM Non-HEAP > >>>> memory. > >>>> > >>>> These observations described are based on what flink's metrics are being > >>>> sent and recorded to our graphite's system. > >>>> > >>>> Best Regards, > >>>> > >>>> Daniel Santos > >>>> > >>>> On 11/29/2016 04:04 PM, Cliff Resnick wrote: > >>>>> Are you using the RocksDB backend in native mode? If so then the > >>>>> off-heap memory may be there. > >>>>> > >>>>> On Tue, Nov 29, 2016 at 9:54 AM, > > > wrote: > >>>>> > >>>>> i have the same problem,but i put the flink job into yarn. > >>>>> but i put the job into yarn on the computer 22,and the job can > >>>>> success run,and the jobmanager is 79 and taskmanager is 69,they > >>>>> three different compu345ter, > >>>>> however,on computer 22,the pid=3463,which is the job that put into > >>>>> yarn,is have 2.3g memory,15% of total, > >>>>> the commend is : ./flink run -m yarn-cluster -yn 1 -ys 1 -yjm 1024 > >>>>> -ytm 1024 .... > >>>>> why in conputer 22,has occupy so much momory?the job is running > >>>>> computer 79 and computer 69. > >>>>> What would be the possible causes of such behavior ? > >>>>> Best Regards, > >>>>> ----- 原始邮件 ----- > >>>>> 发件人:Daniel Santos > > > > >>>>> 收件人:user@flink.apache.org > >>>>> 主题:JVM Non Heap Memory > >>>>> 日期:2016年11月29日 22点26分 > >>>>> > >>>>> > >>>>> Hello, > >>>>> Is it common to have high usage of Non-Heap in JVM ? > >>>>> I am running flink in stand-alone cluster and in docker, with each > >>>>> docker bieng capped at 6G of memory. > >>>>> I have been struggling to keep memory usage in check. > >>>>> The non-heap increases to no end. It start with just 100MB of > >>>>> usage and > >>>>> after a day it reaches to 1,3GB. > >>>>> Then evetually reaches to 2GB and then eventually the docker is > >>>>> killed > >>>>> because it has reached the memory limit. > >>>>> My configuration for each flink task manager is the following : > >>>>> ----------- flink-conf.yaml -------------- > >>>>> taskmanager.heap.mb: 3072 > >>>>> taskmanager.numberOfTaskSlots: 8 > >>>>> taskmanager.memory.preallocate: false > >>>>> taskmanager.network.numberOfBuffers: 12500 > >>>>> taskmanager.memory.off-heap: false > >>>>> --------------------------------------------- > >>>>> What would be the possible causes of such behavior ? > >>>>> Best Regards, > >>>>> Daniel Santos > >>>>> > >>>>> > >>>> > >>>> > >> > > > >