Hi Daniel, the behaviour you observe looks like some threads are not canceled. Thread cancelation in Flink (and Java in general) is always cooperative, where cooperative means that the thread you want to cancel should somehow check cancelation and react to it. Sometimes this also requires some effort from the client that wants to cancel a thread. So if you implement e.g. custom operators or functions with aerospike, you must ensure that they a) react on cancelation and b) cleanup their resources. If you do not consider this, your aerospike client might stay in a blocking call forever, in particular blocking IO calls are prone to this. What you need to ensure is that cancelation from the clients includes closing IO resources such as streams to unblock the thread and allow for termination. This means that you need your code must (to a certain degree) actively participate in Flink's task lifecycle. In Flink 1.2 we introduce a feature called CloseableRegistry, which makes participating in this lifecycle easier w.r.t. closing resources. For the time being, you should check that Flink’s task cancelation also causes your code to close the aerospike client and check cancelation flags.
Best, Stefan > Am 05.12.2016 um 11:42 schrieb Daniel Santos <dsan...@cryptolab.net>: > > Hello, > > I have done some threads checking and dumps. And I have disabled the > checkpointing. > Here are my findings. > I did a thread dump a few hours after I booted up the whole cluster. > (@2/12/2016; 5 TM ; 3GB HEAP each ; 7GB total each as Limit ) > > The dump shows that most threads are of 3 sources. > > OutputFlusher --- 634 -- Sleeping State > > "OutputFlusher" - Thread t@4758 > java.lang.Thread.State: TIMED_WAITING > at java.lang.Thread.sleep(Native Method) > at > org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:164) > > Locked ownable synchronizers: > - None > > Metrics --- 376 ( Flink Metrics Reporter it's the only metrics being used ) > -- Parked State > > "metrics-meter-tick-thread-1" - Thread t@29024 > java.lang.Thread.State: TIMED_WAITING > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <bcfb9f9> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) > at > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093) > at > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809) > at > java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > Locked ownable synchronizers: > - None > > tend -- 220 ( Aerospike Client Thread ) -- Sleeping State > > "tend" - Thread t@29011 > java.lang.Thread.State: TIMED_WAITING > at java.lang.Thread.sleep(Native Method) > at com.aerospike.client.util.Util.sleep(Util.java:38) > at com.aerospike.client.cluster.Cluster.run(Cluster.java:262) > at java.lang.Thread.run(Thread.java:745) > > Locked ownable synchronizers: > - None > > I have 2 streaming jobs and a batch Job that runs once in a while. > > Streaming job A runs with a parallel of 2 and runs Aerospike only in RichSink > . > > Streaming job B runs with a parallel of 24 and runs Aerospike in > RichFilterFunction / RichMapFunction with open and close methods, in order to > open and close the client. > > Batch Job runs Aerospike Client in RichFilterFunction / RichMapFunction with > open and close methods in order to open and close the client. > > Next thing I cancelled all the streaming jobs @5/12/2016 and checked the > threads and the JVM non-heap usage. > > JVM non-heap usage reaches 3GB, threads go down, but some still linger around > and they are the following. > > Metrics --- 790 ( Flink Metrics Reporter it's the only metrics being used ) > > "metrics-meter-tick-thread-1" - Thread t@29024 > java.lang.Thread.State: TIMED_WAITING > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <bcfb9f9> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) > at > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) > at > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093) > at > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809) > at > java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > Locked ownable synchronizers: > - None > > > > tend -- 432 ( Aerospike Client Thread ) > > > "tend" - Thread t@29011 > java.lang.Thread.State: TIMED_WAITING > at java.lang.Thread.sleep(Native Method) > at com.aerospike.client.util.Util.sleep(Util.java:38) > at com.aerospike.client.cluster.Cluster.run(Cluster.java:262) > at java.lang.Thread.run(Thread.java:745) > > Locked ownable synchronizers: > - None > > > Total number threads are 1289 ( total ) / 1220 ( tend + metrics ) . So I have > 1220 threads that I believe that sould be dead and not running, since I have > no jobs running at all. > > And the JVM Non-HEAP usage doesn't decreases at all, after removing every job. > > Why the hell metrics grow to no end ? > > I am using the following libs for metrics : > > - metrics-graphite-3.1.0.jar > > - metrics-core-3.1.0.jar > > - flink-metrics-dropwizard-1.1.3.jar > > - flink-metrics-graphite-1.1.3.jar > > And the config for reporter is : > metrics.reporters: graphite > metrics.reporter.graphite.class: > org.apache.flink.metrics.graphite.GraphiteReporter > metrics.reporter.graphite.host: CARBONRELAYHOST > metrics.reporter.graphite.port: 2003 > > Shouldn't also the Aerospike Client be closed ? Or am I missing something, or > doing something wrong ? > > Sorry for the long post. > > Best Regards, > Daniel Santos > > On 11/29/2016 04:57 PM, Ufuk Celebi wrote: >> Hey Daniel! >> >> Thanks for reporting this. Unbounded growth of non-heap memory is not >> expected. What kind of Threads are you seeing being spawned/lingering >> around? >> >> As a first step, could you try to disable checkpointing and see how it >> behaves afterwards? >> >> – Ufuk >> >> On 29 November 2016 at 17:32:32, Daniel Santos (dsan...@cryptolab.net >> <mailto:dsan...@cryptolab.net>) wrote: >>> Hello, >>> >>> Nope I am using Hadoop HDFS, as state backend, Kafka, as source, and a >>> HttpClient as a Sink, also Kafka as Sink. >>> So it's possible that the state backend is the culprit? >>> >>> Curious thing is even when no jobs are running streaming or otherwise, >>> the JVM Non-HEAP stays the same. >>> Which I find it odd. >>> >>> Another curious thing is that it's proportional to an increase of JVM >>> thread's number. >>> Whenever there are more JVM threads running there is also more JVM >>> Non-HEAP being used, which makes sense. >>> But threads stick around never decreasing, too, likewise JVM Non-HEAP >>> memory. >>> >>> These observations described are based on what flink's metrics are being >>> sent and recorded to our graphite's system. >>> >>> Best Regards, >>> >>> Daniel Santos >>> >>> On 11/29/2016 04:04 PM, Cliff Resnick wrote: >>>> Are you using the RocksDB backend in native mode? If so then the >>>> off-heap memory may be there. >>>> >>>> On Tue, Nov 29, 2016 at 9:54 AM, > > > wrote: >>>> >>>> i have the same problem,but i put the flink job into yarn. >>>> but i put the job into yarn on the computer 22,and the job can >>>> success run,and the jobmanager is 79 and taskmanager is 69,they >>>> three different compu345ter, >>>> however,on computer 22,the pid=3463,which is the job that put into >>>> yarn,is have 2.3g memory,15% of total, >>>> the commend is : ./flink run -m yarn-cluster -yn 1 -ys 1 -yjm 1024 >>>> -ytm 1024 .... >>>> why in conputer 22,has occupy so much momory?the job is running >>>> computer 79 and computer 69. >>>> What would be the possible causes of such behavior ? >>>> Best Regards, >>>> ----- 原始邮件 ----- >>>> 发件人:Daniel Santos > > > >>>> 收件人:user@flink.apache.org <mailto:user@flink.apache.org> >>>> 主题:JVM Non Heap Memory >>>> 日期:2016年11月29日 22点26分 >>>> >>>> >>>> Hello, >>>> Is it common to have high usage of Non-Heap in JVM ? >>>> I am running flink in stand-alone cluster and in docker, with each >>>> docker bieng capped at 6G of memory. >>>> I have been struggling to keep memory usage in check. >>>> The non-heap increases to no end. It start with just 100MB of >>>> usage and >>>> after a day it reaches to 1,3GB. >>>> Then evetually reaches to 2GB and then eventually the docker is >>>> killed >>>> because it has reached the memory limit. >>>> My configuration for each flink task manager is the following : >>>> ----------- flink-conf.yaml -------------- >>>> taskmanager.heap.mb: 3072 >>>> taskmanager.numberOfTaskSlots: 8 >>>> taskmanager.memory.preallocate: false >>>> taskmanager.network.numberOfBuffers: 12500 >>>> taskmanager.memory.off-heap: false >>>> --------------------------------------------- >>>> What would be the possible causes of such behavior ? >>>> Best Regards, >>>> Daniel Santos >>>> >>>> >>> >>> >