jobmanager1.log and taskmanager2.log are the same. Can you also submit files containing std output?
Piotrek > On 10 Nov 2017, at 09:35, ÇETİNKAYA EBRU ÇETİNKAYA EBRU > <b20926...@cs.hacettepe.edu.tr> wrote: > > On 2017-11-10 11:04, Piotr Nowojski wrote: >> Hi, >> Thanks for the logs, however I do not see before mentioned exceptions >> in it. It ends with java.lang.InterruptedException >> Is it the correct log file? Also, could you attach the std output file >> of the failing TaskManager? >> Piotrek >>> On 10 Nov 2017, at 08:42, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >>> <b20926...@cs.hacettepe.edu.tr> wrote: >>> On 2017-11-09 20:08, Piotr Nowojski wrote: >>>> Hi, >>>> Could you attach full logs from those task managers? At first glance I >>>> don’t see a connection between those exceptions and any memory issue >>>> that you might had. It looks like a dependency issue in one (some? >>>> All?) of your jobs. >>>> Did you build your jars with -Pbuild-jar profile as described here: >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/java_api_quickstart.html#build-project >>>> ? >>>> If that doesn’t help. Can you binary search which job is causing the >>>> problem? There might be some Flink incompatibility between different >>>> versions and rebuilding a job’s jar with a version matching to the >>>> cluster version might help. >>>> Piotrek >>>>> On 9 Nov 2017, at 17:36, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >>>>> <b20926...@cs.hacettepe.edu.tr> wrote: >>>>> On 2017-11-08 18:30, Piotr Nowojski wrote: >>>>> Btw, Ebru: >>>>> I don’t agree that the main suspect is NetworkBufferPool. On your >>>>> screenshots it’s memory consumption was reasonable and stable: >>>>> 596MB >>>>> -> 602MB -> 597MB. >>>>> PoolThreadCache memory usage ~120MB is also reasonable. >>>>> Do you experience any problems, like Out Of Memory >>>>> errors/crashes/long >>>>> GC pauses? Or just JVM process is using more memory over time? You >>>>> are >>>>> aware that JVM doesn’t like to release memory back to OS once it >>>>> was >>>>> used? So increasing memory usage until hitting some limit (for >>>>> example >>>>> JVM max heap size) is expected behaviour. >>>>> Piotrek >>>>> On 8 Nov 2017, at 15:48, Piotr Nowojski <pi...@data-artisans.com> >>>>> wrote: >>>>> I don’t know if this is relevant to this issue, but I was >>>>> constantly getting failures trying to reproduce this leak using your >>>>> Job, because you were using non deterministic getKey function: >>>>> @Override >>>>> public Integer getKey(Integer event) { >>>>> Random randomGen = new Random((new Date()).getTime()); >>>>> return randomGen.nextInt() % 8; >>>>> } >>>>> And quoting Java doc of KeySelector: >>>>> "If invoked multiple times on the same object, the returned key must >>>>> be the same.” >>>>> I’m trying to reproduce this issue with following job: >>>>> https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3 >>>>> Where IntegerSource is just an infinite source, DisardingSink is >>>>> well just discarding incoming data. I’m cancelling the job every 5 >>>>> seconds and so far (after ~15 minutes) my memory consumption is >>>>> stable, well below maximum java heap size. >>>>> Piotrek >>>>> On 8 Nov 2017, at 15:28, Javier Lopez <javier.lo...@zalando.de> >>>>> wrote: >>>>> Yes, I tested with just printing the stream. But it could take a >>>>> lot of time to fail. >>>>> On Wednesday, 8 November 2017, Piotr Nowojski >>>>> <pi...@data-artisans.com> wrote: >>>>> Thanks for quick answer. >>>>> So it will also fail after some time with `fromElements` source >>>>> instead of Kafka, right? >>>>> Did you try it also without a Kafka producer? >>>>> Piotrek >>>>> On 8 Nov 2017, at 14:57, Javier Lopez <javier.lo...@zalando.de> >>>>> wrote: >>>>> Hi, >>>>> You don't need data. With data it will die faster. I tested as >>>>> well with a small data set, using the fromElements source, but it >>>>> will take some time to die. It's better with some data. >>>>> On 8 November 2017 at 14:54, Piotr Nowojski >>>>> <pi...@data-artisans.com> wrote: >>>>> Hi, >>>>> Thanks for sharing this job. >>>>> Do I need to feed some data to the Kafka to reproduce this >>>> issue with your script? >>>>>> Does this OOM issue also happen when you are not using the >>>> Kafka source/sink? >>>>>> Piotrek >>>>>> On 8 Nov 2017, at 14:08, Javier Lopez <javier.lo...@zalando.de> >>>> wrote: >>>>>> Hi, >>>>>> This is the test flink job we created to trigger this leak >>>> https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6 >>>>>> And this is the python script we are using to execute the job >>>> thousands of times to get the OOM problem >>>> https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107 >>>>>> The cluster we used for this has this configuration: >>>>>> Instance type: t2.large >>>>>> Number of workers: 2 >>>>>> HeapMemory: 5500 >>>>>> Number of task slots per node: 4 >>>>>> TaskMangMemFraction: 0.5 >>>>>> NumberOfNetworkBuffers: 2000 >>>>>> We have tried several things, increasing the heap, reducing the >>>> heap, more memory fraction, changes this value in the >>>> taskmanager.sh "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to >>>> work. >>>>>> Thanks for your help. >>>>>> On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU >>>> <b20926...@cs.hacettepe.edu.tr> wrote: >>>>> On 2017-11-08 15:20, Piotr Nowojski wrote: >>>>> Hi Ebru and Javier, >>>>> Yes, if you could share this example job it would be helpful. >>>>> Ebru: could you explain in a little more details how does >>>> your Job(s) >>>>> look like? Could you post some code? If you are just using >>>> maps and >>>>> filters there shouldn’t be any network transfers involved, >>>> aside >>>>> from Source and Sink functions. >>>>> Piotrek >>>>> On 8 Nov 2017, at 12:54, ebru >>>> <b20926...@cs.hacettepe.edu.tr> wrote: >>>>> Hi Javier, >>>>> It would be helpful if you share your test job with us. >>>>> Which configurations did you try? >>>>> -Ebru >>>>> On 8 Nov 2017, at 14:43, Javier Lopez >>>> <javier.lo...@zalando.de> >>>>> wrote: >>>>> Hi, >>>>> We have been facing a similar problem. We have tried some >>>> different >>>>> configurations, as proposed in other email thread by Flavio >>>> and >>>>> Kien, but it didn't work. We have a workaround similar to >>>> the one >>>>> that Flavio has, we restart the taskmanagers once they reach >>>> a >>>>> memory threshold. We created a small test to remove all of >>>> our >>>>> dependencies and leave only flink native libraries. This >>>> test reads >>>>> data from a Kafka topic and writes it back to another topic >>>> in >>>>> Kafka. We cancel the job and start another every 5 seconds. >>>> After >>>>> ~30 minutes of doing this process, the cluster reaches the >>>> OS memory >>>>> limit and dies. >>>>> Currently, we have a test cluster with 8 workers and 8 task >>>> slots >>>>> per node. We have one job that uses 56 slots, and we cannot >>>> execute >>>>> that job 5 times in a row because the whole cluster dies. If >>>> you >>>>> want, we can publish our test job. >>>>> Regards, >>>>> On 8 November 2017 at 11:20, Aljoscha Krettek >>>> <aljos...@apache.org> >>>>> wrote: >>>>> @Nico & @Piotr Could you please have a look at this? You >>>> both >>>>> recently worked on the network stack and might be most >>>> familiar with >>>>> this. >>>>> On 8. Nov 2017, at 10:25, Flavio Pompermaier >>>> <pomperma...@okkam.it> >>>>> wrote: >>>>> We also have the same problem in production. At the moment >>>> the >>>>> solution is to restart the entire Flink cluster after every >>>> job.. >>>>> We've tried to reproduce this problem with a test (see >>>>> https://issues.apache.org/jira/browse/FLINK-7845 [1]) but we >>>> don't >>>>> know whether the error produced by the test and the leak are >>>>> correlated.. >>>>> Best, >>>>> Flavio >>>>> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA >>>> EBRU >>>>> <b20926...@cs.hacettepe.edu.tr> wrote: >>>>> On 2017-11-07 16:53, Ufuk Celebi wrote: >>>>> Do you use any windowing? If yes, could you please share >>>> that code? >>>>> If >>>>> there is no stateful operation at all, it's strange where >>>> the list >>>>> state instances are coming from. >>>>> On Tue, Nov 7, 2017 at 2:35 PM, ebru >>>> <b20926...@cs.hacettepe.edu.tr> >>>>> wrote: >>>>> Hi Ufuk, >>>>> We don’t explicitly define any state descriptor. We only >>>> use map >>>>> and filters >>>>> operator. We thought that gc handle clearing the flink’s >>>> internal >>>>> states. >>>>> So how can we manage the memory if it is always increasing? >>>>> - Ebru >>>>> On 7 Nov 2017, at 16:23, Ufuk Celebi <u...@apache.org> wrote: >>>>> Hey Ebru, the memory usage might be increasing as long as a >>>> job is >>>>> running. >>>>> This is expected (also in the case of multiple running >>>> jobs). The >>>>> screenshots are not helpful in that regard. :-( >>>>> What kind of stateful operations are you using? Depending on >>>> your >>>>> use case, >>>>> you have to manually call `clear()` on the state instance in >>>> order >>>>> to >>>>> release the managed state. >>>>> Best, >>>>> Ufuk >>>>> On Tue, Nov 7, 2017 at 12:43 PM, ebru >>>>> <b20926...@cs.hacettepe.edu.tr> wrote: >>>>> Begin forwarded message: >>>>> From: ebru <b20926...@cs.hacettepe.edu.tr> >>>>> Subject: Re: Flink memory leak >>>>> Date: 7 November 2017 at 14:09:17 GMT+3 >>>>> To: Ufuk Celebi <u...@apache.org> >>>>> Hi Ufuk, >>>>> There are there snapshots of htop output. >>>>> 1. snapshot is initial state. >>>>> 2. snapshot is after submitted one job. >>>>> 3. Snapshot is the output of the one job with 15000 EPS. And >>>> the >>>>> memory >>>>> usage is always increasing over time. >>>>> <1.png><2.png><3.png> >>>>> On 7 Nov 2017, at 13:34, Ufuk Celebi <u...@apache.org> wrote: >>>>> Hey Ebru, >>>>> let me pull in Aljoscha (CC'd) who might have an idea what's >>>> causing >>>>> this. >>>>> Since multiple jobs are running, it will be hard to >>>> understand to >>>>> which job the state descriptors from the heap snapshot >>>> belong to. >>>>> - Is it possible to isolate the problem and reproduce the >>>> behaviour >>>>> with only a single job? >>>>> – Ufuk >>>>> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU >>>> ÇETİNKAYA EBRU >>>>> <b20926...@cs.hacettepe.edu.tr> wrote: >>>>> Hi, >>>>> We are using Flink 1.3.1 in production, we have one job >>>> manager and >>>>> 3 task >>>>> managers in standalone mode. Recently, we've noticed that we >>>> have >>>>> memory >>>>> related problems. We use docker container to serve Flink >>>> cluster. We >>>>> have >>>>> 300 slots and 20 jobs are running with parallelism of 10. >>>> Also the >>>>> job >>>>> count >>>>> may be change over time. Taskmanager memory usage always >>>> increases. >>>>> After >>>>> job cancelation this memory usage doesn't decrease. We've >>>> tried to >>>>> investigate the problem and we've got the task manager jvm >>>> heap >>>>> snapshot. >>>>> According to the jam heap analysis, possible memory leak was >>>> Flink >>>>> list >>>>> state descriptor. But we are not sure that is the cause of >>>> our >>>>> memory >>>>> problem. How can we solve the problem? >>>>> We have two types of Flink job. One has no state full >>>> operator >>>>> contains only maps and filters and the other has time window >>>> with >>>>> count trigger. >>>>> * We've analysed the jvm heaps again in different >>>> conditions. First >>>>> we analysed the snapshot when no flink jobs running on >>>> cluster. (image >>>>> 1) >>>>> * Then, we analysed the jvm heap snapshot when the flink job >>>> that has >>>>> no state full operator is running. And according to the >>>> results, leak >>>>> suspect was NetworkBufferPool (image 2) >>>>> * Last analys, there were both two types of jobs running >>>> and leak >>>>> suspect was again NetworkBufferPool. (image 3) >>>>> In our system jobs are regularly cancelled and resubmitted so >>>> we >>>>> noticed that when job is submitted some amount of memory >>>> allocated and >>>>> after cancelation this allocated memory never freed. So over >>>> time >>>>> memory usage is always increasing and exceeded the limits. >>>> Links: >>>> ------ >>>> [1] https://issues.apache.org/jira/browse/FLINK-7845 >>>> Hi Piotr, >>>> There are two types of jobs. >>>> In first, we use Kafka source and Kafka sink, there isn't any >>>> window operator. >>>>> In second job, we use Kafka source, filesystem sink and >>>> elastic search sink and window operator for buffering. >>>> Hi Piotrek, >>>> Thanks for your reply. >>>> We've tested our link cluster again. We have 360 slots, and our >>>> cluster configuration is like this; >>>> jobmanager.rpc.address: %JOBMANAGER% >>>> jobmanager.rpc.port: 6123 >>>> jobmanager.heap.mb: 1536 >>>> taskmanager.heap.mb: 1536 >>>> taskmanager.numberOfTaskSlots: 120 >>>> taskmanager.memory.preallocate: false >>>> parallelism.default: 1 >>>> jobmanager.web.port: 8081 >>>> state.backend: filesystem >>>> state.backend.fs.checkpointdir: file:///storage/%CHECKPOINTDIR% >>>> state.checkpoints.dir: file:///storage/%CHECKPOINTDIR% >>>> taskmanager.network.numberOfBuffers: 5000 >>>> We are using docker based Flink cluster. >>>> WE submitted 36 jobs with the parallelism of 10. After all slots >>>> became full. Memory usage were increasing by the time and one by one >>>> task managers start to die. And the exception was like this; >>>> Taskmanager1 log: >>>> Uncaught error from thread [flink-akka.actor.default-dispatcher-17] >>>> shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for >>>> ActorSystem[flink] >>>> java.lang.NoClassDefFoundError: >>>> org/apache/kafka/common/metrics/stats/Rate$1 >>>> at >>>> org.apache.kafka.common.metrics.stats.Rate.convert(Rate.java:93) >>>> at >>>> org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:62) >>>> at >>>> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61) >>>> at >>>> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:35) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:26) >>>> at >>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:213) >>>> at >>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(MetricDumpSerialization.java:50) >>>> at >>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:138) >>>> at >>>> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) >>>> at >>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) >>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467) >>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) >>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >>>> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >>>> at >>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>> Caused by: java.lang.ClassNotFoundException: >>>> org.apache.kafka.common.metrics.stats.Rate$1 >>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>> ... 22 more >>>> Taskmanager2 log: >>>> Uncaught error from thread [flink-akka.actor.default-dispatcher-17] >>>> shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for >>>> ActorSystem[flink] >>>> Java.lang.NoClassDefFoundError: >>>> org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher$1 >>>> at >>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$OffsetGauge.getValue(AbstractFetcher.java:492) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$OffsetGauge.getValue(AbstractFetcher.java:480) >>>> at >>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:213) >>>> at >>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(MetricDumpSerialization.java:50) >>>> at >>>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:138) >>>> at >>>> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109) >>>> at >>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) >>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467) >>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) >>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >>>> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >>>> at >>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>> Caused by: java.lang.ClassNotFoundException: >>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1 >>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>> ... 18 more >>>> -Ebru >>> Hi Piotrek, >>> We attached the full log of the taskmanager1. >>> This may not be a dependency issue because until all of the task slots is >>> full, we didn't get any No Class Def Found exception, when there is >>> available memory jobs can run without exception for days. >>> Also there is Kafka Instance Already Exist exception in full log, but this >>> not relevant and doesn't effect jobs or task managers. >>> -Ebru<taskmanager1.log.zip> > Hi, > > Sorry we attached wrong log file. I've attached all task managers and job > manager's log. All task managers and job manager was killed.<logs.zip>