What should we do to confirm it? Do you have any github repo start from? On Tue, Nov 14, 2017 at 4:02 PM, Piotr Nowojski <pi...@data-artisans.com> wrote:
> Ebru, Javier, Flavio: > > I tried to reproduce memory leak by submitting a job, that was generating > classes with random names. And indeed I have found one. Memory was > accumulating in `char[]` instances that belonged to > `java.lang.ClassLoader#parallelLockMap`. OldGen memory pool was growing > in size up to the point I got: > > java.lang.OutOfMemoryError: Java heap space > > This seems like an old known “feature” of JDK: > https://bugs.openjdk.java.net/browse/JDK-8037342 > > Can any of you confirm that this is the issue that you are experiencing? > If not, I would really need more help/information from you to track this > down. > > Piotrek > > On 10 Nov 2017, at 15:12, ÇETİNKAYA EBRU ÇETİNKAYA EBRU < > b20926...@cs.hacettepe.edu.tr> wrote: > > On 2017-11-10 13:14, Piotr Nowojski wrote: > > jobmanager1.log and taskmanager2.log are the same. Can you also submit > files containing std output? > Piotrek > > On 10 Nov 2017, at 09:35, ÇETİNKAYA EBRU ÇETİNKAYA EBRU < > b20926...@cs.hacettepe.edu.tr> wrote: > On 2017-11-10 11:04, Piotr Nowojski wrote: > > Hi, > Thanks for the logs, however I do not see before mentioned exceptions > in it. It ends with java.lang.InterruptedException > Is it the correct log file? Also, could you attach the std output file > of the failing TaskManager? > Piotrek > > On 10 Nov 2017, at 08:42, ÇETİNKAYA EBRU ÇETİNKAYA EBRU < > b20926...@cs.hacettepe.edu.tr> wrote: > On 2017-11-09 20:08, Piotr Nowojski wrote: > > Hi, > Could you attach full logs from those task managers? At first glance I > don’t see a connection between those exceptions and any memory issue > that you might had. It looks like a dependency issue in one (some? > All?) of your jobs. > Did you build your jars with -Pbuild-jar profile as described here: > https://ci.apache.org/projects/flink/flink-docs-release-1.3/ > quickstart/java_api_quickstart.html#build-project > ? > If that doesn’t help. Can you binary search which job is causing the > problem? There might be some Flink incompatibility between different > versions and rebuilding a job’s jar with a version matching to the > cluster version might help. > Piotrek > > On 9 Nov 2017, at 17:36, ÇETİNKAYA EBRU ÇETİNKAYA EBRU > <b20926...@cs.hacettepe.edu.tr> wrote: > On 2017-11-08 18:30, Piotr Nowojski wrote: > Btw, Ebru: > I don’t agree that the main suspect is NetworkBufferPool. On your > screenshots it’s memory consumption was reasonable and stable: > 596MB > -> 602MB -> 597MB. > PoolThreadCache memory usage ~120MB is also reasonable. > Do you experience any problems, like Out Of Memory > errors/crashes/long > GC pauses? Or just JVM process is using more memory over time? You > are > aware that JVM doesn’t like to release memory back to OS once it > was > used? So increasing memory usage until hitting some limit (for > example > JVM max heap size) is expected behaviour. > Piotrek > On 8 Nov 2017, at 15:48, Piotr Nowojski <pi...@data-artisans.com> > wrote: > I don’t know if this is relevant to this issue, but I was > constantly getting failures trying to reproduce this leak using your > Job, because you were using non deterministic getKey function: > @Override > public Integer getKey(Integer event) { > Random randomGen = new Random((new Date()).getTime()); > return randomGen.nextInt() % 8; > } > And quoting Java doc of KeySelector: > "If invoked multiple times on the same object, the returned key must > be the same.” > I’m trying to reproduce this issue with following job: > https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3 > Where IntegerSource is just an infinite source, DisardingSink is > well just discarding incoming data. I’m cancelling the job every 5 > seconds and so far (after ~15 minutes) my memory consumption is > stable, well below maximum java heap size. > Piotrek > On 8 Nov 2017, at 15:28, Javier Lopez <javier.lo...@zalando.de> > wrote: > Yes, I tested with just printing the stream. But it could take a > lot of time to fail. > On Wednesday, 8 November 2017, Piotr Nowojski > <pi...@data-artisans.com> wrote: > Thanks for quick answer. > So it will also fail after some time with `fromElements` source > instead of Kafka, right? > Did you try it also without a Kafka producer? > Piotrek > On 8 Nov 2017, at 14:57, Javier Lopez <javier.lo...@zalando.de> > wrote: > Hi, > You don't need data. With data it will die faster. I tested as > well with a small data set, using the fromElements source, but it > will take some time to die. It's better with some data. > On 8 November 2017 at 14:54, Piotr Nowojski > <pi...@data-artisans.com> wrote: > Hi, > Thanks for sharing this job. > Do I need to feed some data to the Kafka to reproduce this > > issue with your script? > > Does this OOM issue also happen when you are not using the > > Kafka source/sink? > > Piotrek > On 8 Nov 2017, at 14:08, Javier Lopez <javier.lo...@zalando.de> > > wrote: > > Hi, > This is the test flink job we created to trigger this leak > > https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6 > > And this is the python script we are using to execute the job > > thousands of times to get the OOM problem > https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107 > > The cluster we used for this has this configuration: > Instance type: t2.large > Number of workers: 2 > HeapMemory: 5500 > Number of task slots per node: 4 > TaskMangMemFraction: 0.5 > NumberOfNetworkBuffers: 2000 > We have tried several things, increasing the heap, reducing the > > heap, more memory fraction, changes this value in the > taskmanager.sh "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to > work. > > Thanks for your help. > On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU > > <b20926...@cs.hacettepe.edu.tr> wrote: > > On 2017-11-08 15:20, Piotr Nowojski wrote: > Hi Ebru and Javier, > Yes, if you could share this example job it would be helpful. > Ebru: could you explain in a little more details how does > > your Job(s) > > look like? Could you post some code? If you are just using > > maps and > > filters there shouldn’t be any network transfers involved, > > aside > > from Source and Sink functions. > Piotrek > On 8 Nov 2017, at 12:54, ebru > > <b20926...@cs.hacettepe.edu.tr> wrote: > > Hi Javier, > It would be helpful if you share your test job with us. > Which configurations did you try? > -Ebru > On 8 Nov 2017, at 14:43, Javier Lopez > > <javier.lo...@zalando.de> > > wrote: > Hi, > We have been facing a similar problem. We have tried some > > different > > configurations, as proposed in other email thread by Flavio > > and > > Kien, but it didn't work. We have a workaround similar to > > the one > > that Flavio has, we restart the taskmanagers once they reach > > a > > memory threshold. We created a small test to remove all of > > our > > dependencies and leave only flink native libraries. This > > test reads > > data from a Kafka topic and writes it back to another topic > > in > > Kafka. We cancel the job and start another every 5 seconds. > > After > > ~30 minutes of doing this process, the cluster reaches the > > OS memory > > limit and dies. > Currently, we have a test cluster with 8 workers and 8 task > > slots > > per node. We have one job that uses 56 slots, and we cannot > > execute > > that job 5 times in a row because the whole cluster dies. If > > you > > want, we can publish our test job. > Regards, > On 8 November 2017 at 11:20, Aljoscha Krettek > > <aljos...@apache.org> > > wrote: > @Nico & @Piotr Could you please have a look at this? You > > both > > recently worked on the network stack and might be most > > familiar with > > this. > On 8. Nov 2017, at 10:25, Flavio Pompermaier > > <pomperma...@okkam.it> > > wrote: > We also have the same problem in production. At the moment > > the > > solution is to restart the entire Flink cluster after every > > job.. > > We've tried to reproduce this problem with a test (see > https://issues.apache.org/jira/browse/FLINK-7845 [1]) but we > > don't > > know whether the error produced by the test and the leak are > correlated.. > Best, > Flavio > On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA > > EBRU > > <b20926...@cs.hacettepe.edu.tr> wrote: > On 2017-11-07 16:53, Ufuk Celebi wrote: > Do you use any windowing? If yes, could you please share > > that code? > > If > there is no stateful operation at all, it's strange where > > the list > > state instances are coming from. > On Tue, Nov 7, 2017 at 2:35 PM, ebru > > <b20926...@cs.hacettepe.edu.tr> > > wrote: > Hi Ufuk, > We don’t explicitly define any state descriptor. We only > > use map > > and filters > operator. We thought that gc handle clearing the flink’s > > internal > > states. > So how can we manage the memory if it is always increasing? > - Ebru > On 7 Nov 2017, at 16:23, Ufuk Celebi <u...@apache.org> wrote: > Hey Ebru, the memory usage might be increasing as long as a > > job is > > running. > This is expected (also in the case of multiple running > > jobs). The > > screenshots are not helpful in that regard. :-( > What kind of stateful operations are you using? Depending on > > your > > use case, > you have to manually call `clear()` on the state instance in > > order > > to > release the managed state. > Best, > Ufuk > On Tue, Nov 7, 2017 at 12:43 PM, ebru > <b20926...@cs.hacettepe.edu.tr> wrote: > Begin forwarded message: > From: ebru <b20926...@cs.hacettepe.edu.tr> > Subject: Re: Flink memory leak > Date: 7 November 2017 at 14:09:17 GMT+3 > To: Ufuk Celebi <u...@apache.org> > Hi Ufuk, > There are there snapshots of htop output. > 1. snapshot is initial state. > 2. snapshot is after submitted one job. > 3. Snapshot is the output of the one job with 15000 EPS. And > > the > > memory > usage is always increasing over time. > <1.png><2.png><3.png> > On 7 Nov 2017, at 13:34, Ufuk Celebi <u...@apache.org> wrote: > Hey Ebru, > let me pull in Aljoscha (CC'd) who might have an idea what's > > causing > > this. > Since multiple jobs are running, it will be hard to > > understand to > > which job the state descriptors from the heap snapshot > > belong to. > > - Is it possible to isolate the problem and reproduce the > > behaviour > > with only a single job? > – Ufuk > On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU > > ÇETİNKAYA EBRU > > <b20926...@cs.hacettepe.edu.tr> wrote: > Hi, > We are using Flink 1.3.1 in production, we have one job > > manager and > > 3 task > managers in standalone mode. Recently, we've noticed that we > > have > > memory > related problems. We use docker container to serve Flink > > cluster. We > > have > 300 slots and 20 jobs are running with parallelism of 10. > > Also the > > job > count > may be change over time. Taskmanager memory usage always > > increases. > > After > job cancelation this memory usage doesn't decrease. We've > > tried to > > investigate the problem and we've got the task manager jvm > > heap > > snapshot. > According to the jam heap analysis, possible memory leak was > > Flink > > list > state descriptor. But we are not sure that is the cause of > > our > > memory > problem. How can we solve the problem? > We have two types of Flink job. One has no state full > > operator > > contains only maps and filters and the other has time window > > with > > count trigger. > * We've analysed the jvm heaps again in different > > conditions. First > > we analysed the snapshot when no flink jobs running on > > cluster. (image > > 1) > * Then, we analysed the jvm heap snapshot when the flink job > > that has > > no state full operator is running. And according to the > > results, leak > > suspect was NetworkBufferPool (image 2) > * Last analys, there were both two types of jobs running > > and leak > > suspect was again NetworkBufferPool. (image 3) > In our system jobs are regularly cancelled and resubmitted so > > we > > noticed that when job is submitted some amount of memory > > allocated and > > after cancelation this allocated memory never freed. So over > > time > > memory usage is always increasing and exceeded the limits. > > Links: > ------ > [1] https://issues.apache.org/jira/browse/FLINK-7845 > Hi Piotr, > There are two types of jobs. > In first, we use Kafka source and Kafka sink, there isn't any > window operator. > > In second job, we use Kafka source, filesystem sink and > > elastic search sink and window operator for buffering. > Hi Piotrek, > Thanks for your reply. > We've tested our link cluster again. We have 360 slots, and our > cluster configuration is like this; > jobmanager.rpc.address: %JOBMANAGER% > jobmanager.rpc.port: 6123 > jobmanager.heap.mb: 1536 > taskmanager.heap.mb: 1536 > taskmanager.numberOfTaskSlots: 120 > taskmanager.memory.preallocate: false > parallelism.default: 1 > jobmanager.web.port: 8081 > state.backend: filesystem > state.backend.fs.checkpointdir: file:///storage/%CHECKPOINTDIR% > state.checkpoints.dir: file:///storage/%CHECKPOINTDIR% > taskmanager.network.numberOfBuffers: 5000 > We are using docker based Flink cluster. > WE submitted 36 jobs with the parallelism of 10. After all slots > became full. Memory usage were increasing by the time and one by one > task managers start to die. And the exception was like this; > Taskmanager1 log: > Uncaught error from thread [flink-akka.actor.default-dispatcher-17] > shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for > ActorSystem[flink] > java.lang.NoClassDefFoundError: > org/apache/kafka/common/metrics/stats/Rate$1 > at > org.apache.kafka.common.metrics.stats.Rate.convert(Rate.java:93) > at > org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:62) > at > org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61) > at > org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52) > at > org.apache.flink.streaming.connectors.kafka.internals.metric > s.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:35) > at > org.apache.flink.streaming.connectors.kafka.internals.metric > s.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:26) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerializatio > n.serializeGauge(MetricDumpSerialization.java:213) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerializatio > n.access$200(MetricDumpSerialization.java:50) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerializatio > n$MetricDumpSerializer.serialize(MetricDumpSerialization.java:138) > at > org.apache.flink.runtime.metrics.dump.MetricQueryService. > onReceive(MetricQueryService.java:109) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(Untyp > edActor.scala:167) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask. > exec(AbstractDispatcher.scala:397) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask( > ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW > orkerThread.java:107) > Caused by: java.lang.ClassNotFoundException: > org.apache.kafka.common.metrics.stats.Rate$1 > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > ... 22 more > Taskmanager2 log: > Uncaught error from thread [flink-akka.actor.default-dispatcher-17] > shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for > ActorSystem[flink] > Java.lang.NoClassDefFoundError: > org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher$1 > at > org.apache.flink.streaming.connectors.kafka.internals.Abstra > ctFetcher$OffsetGauge.getValue(AbstractFetcher.java:492) > at > org.apache.flink.streaming.connectors.kafka.internals.Abstra > ctFetcher$OffsetGauge.getValue(AbstractFetcher.java:480) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerializatio > n.serializeGauge(MetricDumpSerialization.java:213) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerializatio > n.access$200(MetricDumpSerialization.java:50) > at > org.apache.flink.runtime.metrics.dump.MetricDumpSerializatio > n$MetricDumpSerializer.serialize(MetricDumpSerialization.java:138) > at > org.apache.flink.runtime.metrics.dump.MetricQueryService. > onReceive(MetricQueryService.java:109) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(Untyp > edActor.scala:167) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask. > exec(AbstractDispatcher.scala:397) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask( > ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW > orkerThread.java:107) > Caused by: java.lang.ClassNotFoundException: > org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1 > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > ... 18 more > -Ebru > > Hi Piotrek, > We attached the full log of the taskmanager1. > This may not be a dependency issue because until all of the task slots is > full, we didn't get any No Class Def Found exception, when there is > available memory jobs can run without exception for days. > Also there is Kafka Instance Already Exist exception in full log, but this > not relevant and doesn't effect jobs or task managers. > -Ebru<taskmanager1.log.zip> > > Hi, > Sorry we attached wrong log file. I've attached all task managers and job > manager's log. All task managers and job manager was killed.<logs.zip> > > <logs2-2.zip> > > > -- Flavio Pompermaier Development Department OKKAM S.r.l. Tel. +(39) 0461 041809 <+39%200461%20041809>