I do not see anything abnormal in the logs before this error :( What are your JVM settings and which java version are you running? What happens if you limit the heap size so that the swap is never used?
Piotrek > On 10 Nov 2017, at 14:57, ÇETİNKAYA EBRU ÇETİNKAYA EBRU > <b20926...@cs.hacettepe.edu.tr> wrote: > > On 2017-11-10 13:14, Piotr Nowojski wrote: >> jobmanager1.log and taskmanager2.log are the same. Can you also submit >> files containing std output? >> Piotrek >>> On 10 Nov 2017, at 09:35, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >>> <b20926...@cs.hacettepe.edu.tr> wrote: >>> On 2017-11-10 11:04, Piotr Nowojski wrote: >>>> Hi, >>>> Thanks for the logs, however I do not see before mentioned exceptions >>>> in it. It ends with java.lang.InterruptedException >>>> Is it the correct log file? Also, could you attach the std output file >>>> of the failing TaskManager? >>>> Piotrek >>>>> On 10 Nov 2017, at 08:42, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >>>>> <b20926...@cs.hacettepe.edu.tr> wrote: >>>>> On 2017-11-09 20:08, Piotr Nowojski wrote: >>>>>> Hi, >>>>>> Could you attach full logs from those task managers? At first glance I >>>>>> don’t see a connection between those exceptions and any memory issue >>>>>> that you might had. It looks like a dependency issue in one (some? >>>>>> All?) of your jobs. >>>>>> Did you build your jars with -Pbuild-jar profile as described here: >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/java_api_quickstart.html#build-project >>>>>> ? >>>>>> If that doesn’t help. Can you binary search which job is causing the >>>>>> problem? There might be some Flink incompatibility between different >>>>>> versions and rebuilding a job’s jar with a version matching to the >>>>>> cluster version might help. >>>>>> Piotrek >>>>>>> On 9 Nov 2017, at 17:36, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >>>>>>> <b20926...@cs.hacettepe.edu.tr> wrote: >>>>>>> On 2017-11-08 18:30, Piotr Nowojski wrote: >>>>>>> Btw, Ebru: >>>>>>> I don’t agree that the main suspect is NetworkBufferPool. On your >>>>>>> screenshots it’s memory consumption was reasonable and stable: >>>>>>> 596MB >>>>>>> -> 602MB -> 597MB. >>>>>>> PoolThreadCache memory usage ~120MB is also reasonable. >>>>>>> Do you experience any problems, like Out Of Memory >>>>>>> errors/crashes/long >>>>>>> GC pauses? Or just JVM process is using more memory over time? You >>>>>>> are >>>>>>> aware that JVM doesn’t like to release memory back to OS once it >>>>>>> was >>>>>>> used? So increasing memory usage until hitting some limit (for >>>>>>> example >>>>>>> JVM max heap size) is expected behaviour. >>>>>>> Piotrek >>>>>>> On 8 Nov 2017, at 15:48, Piotr Nowojski <pi...@data-artisans.com> >>>>>>> wrote: >>>>>>> I don’t know if this is relevant to this issue, but I was >>>>>>> constantly getting failures trying to reproduce this leak using your >>>>>>> Job, because you were using non deterministic getKey function: >>>>>>> @Override >>>>>>> public Integer getKey(Integer event) { >>>>>>> Random randomGen = new Random((new Date()).getTime()); >>>>>>> return randomGen.nextInt() % 8; >>>>>>> } >>>>>>> And quoting Java doc of KeySelector: >>>>>>> "If invoked multiple times on the same object, the returned key must >>>>>>> be the same.” >>>>>>> I’m trying to reproduce this issue with following job: >>>>>>> https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3 >>>>>>> Where IntegerSource is just an infinite source, DisardingSink is >>>>>>> well just discarding incoming data. I’m cancelling the job every 5 >>>>>>> seconds and so far (after ~15 minutes) my memory consumption is >>>>>>> stable, well below maximum java heap size. >>>>>>> Piotrek >>>>>>> On 8 Nov 2017, at 15:28, Javier Lopez <javier.lo...@zalando.de> >>>>>>> wrote: >>>>>>> Yes, I tested with just printing the stream. But it could take a >>>>>>> lot of time to fail. >>>>>>> On Wednesday, 8 November 2017, Piotr Nowojski >>>>>>> <pi...@data-artisans.com> wrote: >>>>>>> Thanks for quick answer. >>>>>>> So it will also fail after some time with `fromElements` source >>>>>>> instead of Kafka, right? >>>>>>> Did you try it also without a Kafka producer? >>>>>>> Piotrek >>>>>>> On 8 Nov 2017, at 14:57, Javier Lopez <javier.lo...@zalando.de> >>>>>>> wrote: >>>>>>> Hi, >>>>>>> You don't need data. With data it will die faster. I tested as >>>>>>> well with a small data set, using the fromElements source, but it >>>>>>> will take some time to die. It's better with some data. >>>>>>> On 8 November 2017 at 14:54, Piotr Nowojski >>>>>>> <pi...@data-artisans.com> wrote: >>>>>>> Hi, >>>>>>> Thanks for sharing this job. >>>>>>> Do I need to feed some data to the Kafka to reproduce this >>>>>> issue with your script? >>>>>>>> Does this OOM issue also happen when you are not using the >>>>>> Kafka source/sink? >>>>>>>> Piotrek >>>>>>>> On 8 Nov 2017, at 14:08, Javier Lopez <javier.lo...@zalando.de> >>>>>> wrote: >>>>>>>> Hi, >>>>>>>> This is the test flink job we created to trigger this leak >>>>>> https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6 >>>>>>>> And this is the python script we are using to execute the job >>>>>> thousands of times to get the OOM problem >>>>>> https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107 >>>>>>>> The cluster we used for this has this configuration: >>>>>>>> Instance type: t2.large >>>>>>>> Number of workers: 2 >>>>>>>> HeapMemory: 5500 >>>>>>>> Number of task slots per node: 4 >>>>>>>> TaskMangMemFraction: 0.5 >>>>>>>> NumberOfNetworkBuffers: 2000 >>>>>>>> We have tried several things, increasing the heap, reducing the >>>>>> heap, more memory fraction, changes this value in the >>>>>> taskmanager.sh "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to >>>>>> work. >>>>>>>> Thanks for your help. >>>>>>>> On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >>>>>> <b20926...@cs.hacettepe.edu.tr> wrote: >>>>>>> On 2017-11-08 15:20, Piotr Nowojski wrote: >>>>>>> Hi Ebru and Javier, >>>>>>> Yes, if you could share this example job it would be helpful. >>>>>>> Ebru: could you explain in a little more details how does >>>>>> your Job(s) >>>>>>> look like? Could you post some code? If you are just using >>>>>> maps and >>>>>>> filters there shouldn’t be any network transfers involved, >>>>>> aside >>>>>>> from Source and Sink functions. >>>>>>> Piotrek >>>>>>> On 8 Nov 2017, at 12:54, ebru >>>>>> <b20926...@cs.hacettepe.edu.tr> wrote: >>>>>>> Hi Javier, >>>>>>> It would be helpful if you share your test job with us. >>>>>>> Which configurations did you try? >>>>>>> -Ebru >>>>>>> On 8 Nov 2017, at 14:43, Javier Lopez >>>>>> <javier.lo...@zalando.de> >>>>>>> wrote: >>>>>>> Hi, >>>>>>> We have been facing a similar problem. We have tried some >>>>>> different >>>>>>> configurations, as proposed in other email thread by Flavio >>>>>> and >>>>>>> Kien, but it didn't work. We have a workaround similar to >>>>>> the one >>>>>>> that Flavio has, we restart the taskmanagers once they reach >>>>>> a >>>>>>> memory threshold. We created a small test to remove all of >>>>>> our >>>>>>> dependencies and leave only flink native libraries. This >>>>>> test reads >>>>>>> data from a Kafka topic and writes it back to another topic >>>>>> in >>>>>>> Kafka. We cancel the job and start another every 5 seconds. >>>>>> After >>>>>>> ~30 minutes of doing this process, the cluster reaches the >>>>>> OS memory >>>>>>> limit and dies. >>>>>>> Currently, we have a test cluster with 8 workers and 8 task >>>>>> slots >>>>>>> per node. We have one job that uses 56 slots, and we cannot >>>>>> execute >>>>>>> that job 5 times in a row because the whole cluster dies. If >>>>>> you >>>>>>> want, we can publish our test job. >>>>>>> Regards, >>>>>>> On 8 November 2017 at 11:20, Aljoscha Krettek >>>>>> <aljos...@apache.org> >>>>>>> wrote: >>>>>>> @Nico & @Piotr Could you please have a look at this? You >>>>>> both >>>>>>> recently worked on the network stack and might be most >>>>>> familiar with >>>>>>> this. >>>>>>> On 8. Nov 2017, at 10:25, Flavio Pompermaier >>>>>> <pomperma...@okkam.it> >>>>>>> wrote: >>>>>>> We also have the same problem in production. At the moment >>>>>> the >>>>>>> solution is to restart the entire Flink cluster after every >>>>>> job.. >>>>>>> We've tried to reproduce this problem with a test (see >>>>>>> https://issues.apache.org/jira/browse/FLINK-7845 [1]) but we >>>>>> don't >>>>>>> know whether the error produced by the test and the leak are >>>>>>> correlated.. >>>>>>> Best, >>>>>>> Flavio >>>>>>> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA >>>>>> EBRU >>>>>>> <b20926...@cs.hacettepe.edu.tr> wrote: >>>>>>> On 2017-11-07 16:53, Ufuk Celebi wrote: >>>>>>> Do you use any windowing? If yes, could you please share >>>>>> that code? >>>>>>> If >>>>>>> there is no stateful operation at all, it's strange where >>>>>> the list >>>>>>> state instances are coming from. >>>>>>> On Tue, Nov 7, 2017 at 2:35 PM, ebru >>>>>> <b20926...@cs.hacettepe.edu.tr> >>>>>>> wrote: >>>>>>> Hi Ufuk, >>>>>>> We don’t explicitly define any state descriptor. We only >>>>>> use map >>>>>>> and filters >>>>>>> operator. We thought that gc handle clearing the flink’s >>>>>> internal >>>>>>> states. >>>>>>> So how can we manage the memory if it is always increasing? >>>>>>> - Ebru >>>>>>> On 7 Nov 2017, at 16:23, Ufuk Celebi <u...@apache.org> wrote: >>>>>>> Hey Ebru, the memory usage might be increasing as long as a >>>>>> job is >>>>>>> running. >>>>>>> This is expected (also in the case of multiple running >>>>>> jobs). The >>>>>>> screenshots are not helpful in that regard. :-( >>>>>>> What kind of stateful operations are you using? Depending on >>>>>> your >>>>>>> use case, >>>>>>> you have to manually call `clear()` on the state instance in >>>>>> order >>>>>>> to >>>>>>> release the managed state. >>>>>>> Best, >>>>>>> Ufuk >>>>>>> On Tue, Nov 7, 2017 at 12:43 PM, ebru >>>>>>> <b20926...@cs.hacettepe.edu.tr> wrote: >>>>>>> Begin forwarded message: >>>>>>> From: ebru <b20926...@cs.hacettepe.edu.tr> >>>>>>> Subject: Re: Flink memory leak >>>>>>> Date: 7 November 2017 at 14:09:17 GMT+3 >>>>>>> To: Ufuk Celebi <u...@apache.org> >>>>>>> Hi Ufuk, >>>>>>> There are there snapshots of htop output. >>>>>>> 1. snapshot is initial state. >>>>>>> 2. snapshot is after submitted one job. >>>>>>> 3. Snapshot is the output of the one job with 15000 EPS. And >>>>>> the >>>>>>> memory >>>>>>> usage is always increasing over time. >>>>>>> <1.png><2.png><3.png> >>>>>>> On 7 Nov 2017, at 13:34, Ufuk Celebi <u...@apache.org> wrote: >>>>>>> Hey Ebru, >>>>>>> let me pull in Aljoscha (CC'd) who might have an idea what's >>>>>> causing >>>>>>> this. >>>>>>> Since multiple jobs are running, it will be hard to >>>>>> understand to >>>>>>> which job the state descriptors from the heap snapshot >>>>>> belong to. >>>>>>> - Is it possible to isolate the problem and reproduce the >>>>>> behaviour >>>>>>> with only a single job? >>>>>>> – Ufuk >>>>>>> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU >>>>>> ÇETİNKAYA EBRU >>>>>>> <b20926...@cs.hacettepe.edu.tr> wrote: >>>>>>> Hi, >>>>>>> We are using Flink 1.3.1 in production, we have one job >>>>>> manager and >>>>>>> 3 task >>>>>>> managers in standalone mode. Recently, we've noticed that we >>>>>> have >>>>>>> memory >>>>>>> related problems. We use docker container to serve Flink >>>>>> cluster. We >>>>>>> have >>>>>>> 300 slots and 20 jobs are running with parallelism of 10. >>>>>> Also the >>>>>>> job >>>>>>> count >>>>>>> may be change over time. Taskmanager memory usage always >>>>>> increases. >>>>>>> After >>>>>>> job cancelation this memory usage doesn't decrease. We've >>>>>> tried to >>>>>>> investigate the problem and we've got the task manager jvm >>>>>> heap >>>>>>> snapshot. >>>>>>> According to the jam heap analysis, possible memory leak was >>>>>> Flink >>>>>>> list >>>>>>> state descriptor. But we are not sure that is the cause of >>>>>> our >>>>>>> memory >>>>>>> problem. How can we solve the problem? >>>>>>> We have two types of Flink job. One has no state full >>>>>> operator >>>>>>> contains only maps and filters and the other has time window >>>>>> with >>>>>>> count trigger. >>>>>>> * We've analysed the jvm heaps again in different >>>>>> conditions. First >>>>>>> we analysed the snapshot when no flink jobs running on >>>>>> cluster. (image >>>>>>> 1) >>>>>>> * Then, we analysed the jvm heap snapshot when the flink job >>>>>> that has >>>>>>> no state full operator is running. And according to the >>>>>> results, leak >>>>>>> suspect was NetworkBufferPool (image 2) >>>>>>> * Last analys, there were both two types of jobs running >>>>>> and leak >>>>>>> suspect was again NetworkBufferPool. (image 3) >>>>>>> In our system jobs are regularly cancelled and resubmitted so >>>>>> we >>>>>>> noticed that when job is submitted some amount of memory >>>>>> allocated and >>>>>>> after cancelation this allocated memory never freed. So over >>>>>> time >>>>>>> memory usage is always increasing and exceeded the limits. >>>>>> Links: >>>>>> ------ >>>>>> [1] https://issues.apache.org/jira/browse/FLINK-7845 >>>>>> Hi Piotr, >>>>>> There are two types of jobs. >>>>>> In first, we use Kafka source and Kafka sink, there isn't any >>>>>> window operator. >>>>>>> In second job, we use Kafka source, filesystem sink and >>>>>> elastic search sink and window operator for buffering. >>>>>> Hi Piotrek, >>>>>> Thanks for your reply. >>>>>> We've tested our link cluster again. We have 360 slots, and our >>>>>> cluster configuration is like this; >>>>>> jobmanager.rpc.address: %JOBMANAGER% >>>>>> jobmanager.rpc.port: 6123 >>>>>> jobmanager.heap.mb: 1536 >>>>>> taskmanager.heap.mb: 1536 >>>>>> taskmanager.numberOfTaskSlots: 120 >>>>>> taskmanager.memory.preallocate: false >>>>>> parallelism.default: 1 >>>>>> jobmanager.web.port: 8081 >>>>>> state.backend: filesystem >>>>>> state.backend.fs.checkpointdir: file:///storage/%CHECKPOINTDIR% >>>>>> state.checkpoints.dir: file:///storage/%CHECKPOINTDIR% >>>>>> taskmanager.network.numberOfBuffers: 5000 >>>>>> We are using docker based Flink cluster. >>>>>> WE submitted 36 jobs with the parallelism of 10. After all slots >>>>>> became full. Memory usage were increasing by the time and one by one >>>>>> task managers start to die. And the exception was like this; >>>>>> Taskmanager1 log: >>>>>> Uncaught error from thread [flink-akka.actor.default-dispatcher-17] >>>>>> shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for >>>>>> ActorSystem[flink] >>>>>> java.lang.NoClassDefFoundError: >>>>>> org/apache/kafka/common/metrics/stats/Rate$1 >>>>>> at >>>>>> org.apache.kafka.common.metrics.stats.Rate.convert(Rate.java:93) >>>>>> at >>>>>> org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:62) >>>>>> at >>>>>> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61) >>>>>> at >>>>>> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52) >>>>>> at >>>>>> org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:35) >>>>>> at >>>>>> org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:26) >>>>>> at >>>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:213) >>>>>> at >>>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(MetricDumpSerialization.java:50) >>>>>> at >>>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:138) >>>>>> at >>>>>> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) >>>>>> at >>>>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) >>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467) >>>>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) >>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >>>>>> at >>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>>> Caused by: java.lang.ClassNotFoundException: >>>>>> org.apache.kafka.common.metrics.stats.Rate$1 >>>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>>>> ... 22 more >>>>>> Taskmanager2 log: >>>>>> Uncaught error from thread [flink-akka.actor.default-dispatcher-17] >>>>>> shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for >>>>>> ActorSystem[flink] >>>>>> Java.lang.NoClassDefFoundError: >>>>>> org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher$1 >>>>>> at >>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$OffsetGauge.getValue(AbstractFetcher.java:492) >>>>>> at >>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$OffsetGauge.getValue(AbstractFetcher.java:480) >>>>>> at >>>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:213) >>>>>> at >>>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(MetricDumpSerialization.java:50) >>>>>> at >>>>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:138) >>>>>> at >>>>>> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) >>>>>> at >>>>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) >>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467) >>>>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) >>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >>>>>> at >>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>>> Caused by: java.lang.ClassNotFoundException: >>>>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1 >>>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>>>> ... 18 more >>>>>> -Ebru >>>>> Hi Piotrek, >>>>> We attached the full log of the taskmanager1. >>>>> This may not be a dependency issue because until all of the task slots is >>>>> full, we didn't get any No Class Def Found exception, when there is >>>>> available memory jobs can run without exception for days. >>>>> Also there is Kafka Instance Already Exist exception in full log, but >>>>> this not relevant and doesn't effect jobs or task managers. >>>>> -Ebru<taskmanager1.log.zip> >>> Hi, >>> Sorry we attached wrong log file. I've attached all task managers and job >>> manager's log. All task managers and job manager was killed.<logs.zip> > > We were lost the std output files so we've reproduced the problem. I attached > task managers and job manager log and also std output files. And after some > time, it start using swap, the screenshot of http output is also > attached.<logs2-1.zip><logs2-2.zip><logs2-3.zip><error2.png>