Hey Aaron, I'm glad to hear that you resolved the issue.
I think a docs contribution for this would be very helpful and could update this page: https://github.com/apache/flink/blob/master/docs/monitoring/debugging_classloading.md. If you want to create a separate JIRA ticket for this, ping me with your JIRA username and I'll add you to the list of contributors (which gives you permissions to create tickets). I'll think a bit more about the other points you mentioned and get back to you if I have another idea. Best, Ufuk On Tue, Jan 29, 2019 at 10:48 PM Aaron Levin <aaronle...@stripe.com> wrote: > > Hi Ufuk, > > I'll answer your question, but first I'll give you an update on how we > resolved the issue: > > * adding `org.apache.hadoop.io.compress.SnappyCodec` to > `classloader.parent-first-patterns.additional` in `flink-conf.yaml` (though, > putting `org.apache.hadoop.util.NativeCodeLoader` also worked) > * putting a jar with `hadoop-common` + it's transitive dependencies, then > using jarjar[0] to `keep org.apache.hadoop.io.compress.SnappyCodec` (and its > transitive dependencies). So we end up with jar that has `SnappyCodec` and > whatever it needs to call transitively. We put this jar on the task manager > classpath. > > I believe `SnappyCodec` was being called via our code. This worked the first > time but deploying a second time caused `libhadoop.so` to be loaded in a > second class loader. By putting a jar with `SnappyCodec` and it's transitive > dependencies on the task manager classpath and specifying that `SnappyCodec` > needs to be loaded from the parent classloader, we ensure that only one > classloader loads `libhadoop.so`. I don't think this is the best way to > achieve what we want, but it works for now. > > Next steps: if no one is on it, I can take a stab at updating the > documentation to clarify how to debug and resolve Native library loading. > This was a nice learning experience and I think it'll be helpful to have this > in the docs for those who aren't well-versed in how classloading on the JVM > works! > > To answer your questions: > > 1. We install hadoop on our machines and tell flink task managers to access > it via `env.java.opts: -Djava.library.path=/usr/local/hadoop/lib/native` in > `flink-conf.yaml` > 2. We put flink's shaded hadoop-fs-s3 on both the task manager and job > manager classpath (I believe this is only used by the Job Managers when they > interact with S3 for checkpoints etc. I don't believe any user code is using > this). > 3. Our flink applications consist of a "fat jar" that has some > `org.apache.hadoop` dependencies bundled with it. I believe this is the > source of why we're loading `SnappyCodec` twice and triggering this issue. > 4. For example code: we have a small wrapper around > `org.apache.flink.api.common.io.FileInputFormat` which does the work with > sequence files. It looks like (after removing some stuff to make it more > clear): > > ``` > abstract class FlinkSequenceFileInputFormat[T, K <: Writable, V <: Writable]( > typeInformation: TypeInformation[T] > ) extends FileInputFormat[T] > with ResultTypeQueryable[T] { > @transient private var bufferedNextRecord: T = _ > @transient private var hadoopStream: HadoopFSDataInputStream = _ > @transient private var sequenceFileReader: SequenceFile.Reader = _ > > unsplittable = true > enumerateNestedFiles = true > > // ***************************************** > // This is where we'd see exceptions. > // ***************************************** > override def open(fileSplit: FileInputSplit): Unit = { > super.open(fileSplit) > val config = new Configuration() > hadoopStream = WrappedHadoopInputStream.wrap(stream) > sequenceFileReader = new SequenceFile.Reader(config, > SequenceFile.Reader.stream(hadoopStream)) > bufferNextRecord() > } > ... > } > > // AND > > class WrappedHadoopInputStream(underlying: FlinkFSDataInputStream) > extends InputStream > with Seekable > with PositionedReadable { > > def read(): Int = underlying.read() > def seek(pos: Long): Unit = underlying.seek(pos) > def getPos: Long = underlying.getPos > } > ... > ``` > > Thanks for all your help, I appreciate it! I wouldn't have been able to debug > and resolve this if it wasn't for you filing the ticket. Thank you so much! > > [0] https://github.com/pantsbuild/jarjar > > Aaron Levin > > On Mon, Jan 28, 2019 at 4:16 AM Ufuk Celebi <u...@apache.org> wrote: >> >> Hey Aaron, >> >> sorry for the late reply (again). >> >> (1) I think that your final result is in line with what I have >> reproduced in https://issues.apache.org/jira/browse/FLINK-11402. >> >> (2) I think renaming the file would not help as it will still be >> loaded multiple times when the jobs restarts (as it happens in >> FLINK-11402). >> >> (3) I'll try to check whether Flink's shading of Hadoop is related to >> this. I don't think so though. @Chesnay (cc'd): What do you think? >> >> (4) @Aaron: Can you tell me which Hadoop libraries you use and share >> some code so I can try to reproduce this exactly on my side? Judging >> from the earlier stack traces you have shared, I'm assuming you are >> trying to read Snappy-compressed sequence files. >> >> – Ufuk >> >> On Fri, Jan 25, 2019 at 4:37 PM Aaron Levin <aaronle...@stripe.com> wrote: >> > >> > I don't control the code calling `System.loadLibrary("hadoop")` so that's >> > not an option for me, unfortunately. >> > >> > On Thu, Jan 24, 2019 at 7:47 PM Guowei Ma <guowei....@gmail.com> wrote: >> >> >> >> This may be caused by a jvm process can only load a so once.So a triky >> >> way is to rename it。 >> >> >> >> 发自我的 iPhone >> >> >> >> 在 2019年1月25日,上午7:12,Aaron Levin <aaronle...@stripe.com> 写道: >> >> >> >> Hi Ufuk, >> >> >> >> Update: I've pinned down the issue. It's multiple classloaders loading >> >> `libhadoop.so`: >> >> >> >> ``` >> >> failed to load native hadoop with error: java.lang.UnsatisfiedLinkError: >> >> Native Library /usr/lib/libhadoop.so already loaded in another classloader >> >> ``` >> >> >> >> I'm not quite sure what the solution is. Ideally flink would destroy a >> >> classloader when a job is canceled, but perhaps there's a jvm limitation >> >> there? Putting the libraries into `/usr/lib` or `/lib` does not work (as >> >> suggested by Chesnay in the ticket) as I get the same error. I might see >> >> if I can put a jar with `org.apache.hadoop.common.io.compress` in >> >> `/flink/install/lib` and then remove it from my jar. It's not an ideal >> >> solution but I can't think of anything else. >> >> >> >> Best, >> >> >> >> Aaron Levin >> >> >> >> On Thu, Jan 24, 2019 at 12:18 PM Aaron Levin <aaronle...@stripe.com> >> >> wrote: >> >>> >> >>> Hi Ufuk, >> >>> >> >>> I'm starting to believe the bug is much deeper than the originally >> >>> reported error because putting the libraries in `/usr/lib` or `/lib` >> >>> does not work. This morning I dug into why putting `libhadoop.so` into >> >>> `/usr/lib` didn't work, despite that being in the `java.library.path` at >> >>> the call site of the error. I wrote a small program to test the loading >> >>> of native libraries, and it was able to successfully load >> >>> `libhadoop.so`. I'm very perplexed. Could this be related to the way >> >>> flink shades hadoop stuff? >> >>> >> >>> Here is my program and its output: >> >>> >> >>> ``` >> >>> $ cat LibTest.scala >> >>> package com.redacted.flink >> >>> >> >>> object LibTest { >> >>> def main(args: Array[String]): Unit = { >> >>> val library = args(0) >> >>> >> >>> System.out.println(s"java.library.path=${System.getProperty("java.library.path")}") >> >>> System.out.println(s"Attempting to load $library") >> >>> System.out.flush() >> >>> System.loadLibrary(library) >> >>> System.out.println(s"Successfully loaded ") >> >>> System.out.flush() >> >>> } >> >>> ``` >> >>> >> >>> I then tried running that on one of the task managers with `hadoop` as >> >>> an argument: >> >>> >> >>> ``` >> >>> $ java -jar lib_test_deploy.jar hadoop >> >>> java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib >> >>> Attempting to load hadoop >> >>> Exception in thread "main" java.lang.UnsatisfiedLinkError: no hadoop in >> >>> java.library.path >> >>> at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867) >> >>> at java.lang.Runtime.loadLibrary0(Runtime.java:870) >> >>> at java.lang.System.loadLibrary(System.java:1122) >> >>> at com.stripe.flink.LibTest$.main(LibTest.scala:11) >> >>> at com.stripe.flink.LibTest.main(LibTest.scala) >> >>> ``` >> >>> >> >>> I then copied the native libraries into `/usr/lib/` and ran it again: >> >>> >> >>> ``` >> >>> $ sudo cp /usr/local/hadoop/lib/native/libhadoop.so /usr/lib/ >> >>> $ java -jar lib_test_deploy.jar hadoop >> >>> java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib >> >>> Attempting to load hadoop >> >>> Successfully loaded >> >>> ``` >> >>> >> >>> Any ideas? >> >>> >> >>> On Wed, Jan 23, 2019 at 7:13 PM Aaron Levin <aaronle...@stripe.com> >> >>> wrote: >> >>>> >> >>>> Hi Ufuk, >> >>>> >> >>>> One more update: I tried copying all the hadoop native `.so` files >> >>>> (mainly `libhadoop.so`) into `/lib` and am I still experiencing the >> >>>> issue I reported. I also tried naively adding the `.so` files to the >> >>>> jar with the flink application and am still experiencing the issue I >> >>>> reported (however, I'm going to investigate this further as I might not >> >>>> have done it correctly). >> >>>> >> >>>> Best, >> >>>> >> >>>> Aaron Levin >> >>>> >> >>>> On Wed, Jan 23, 2019 at 3:18 PM Aaron Levin <aaronle...@stripe.com> >> >>>> wrote: >> >>>>> >> >>>>> Hi Ufuk, >> >>>>> >> >>>>> Two updates: >> >>>>> >> >>>>> 1. As suggested in the ticket, I naively copied the every `.so` in >> >>>>> `hadoop-3.0.0/lib/native/` into `/lib/` and this did not seem to help. >> >>>>> My knowledge of how shared libs get picked up is hazy, so I'm not sure >> >>>>> if blindly copying them like that should work. I did check what >> >>>>> `System.getProperty("java.library.path")` returns at the call-site and >> >>>>> it's: >> >>>>> java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib >> >>>>> 2. The exception I see comes from >> >>>>> `hadoop.util.NativeCodeLoader.buildSupportsSnappy` (stack-trace >> >>>>> below). This uses `System.loadLibrary("hadoop")`. >> >>>>> >> >>>>> [2019-01-23 19:52:33.081216] java.lang.UnsatisfiedLinkError: >> >>>>> org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z >> >>>>> [2019-01-23 19:52:33.081376] at >> >>>>> org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native >> >>>>> Method) >> >>>>> [2019-01-23 19:52:33.081406] at >> >>>>> org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63) >> >>>>> [2019-01-23 19:52:33.081429] at >> >>>>> org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:195) >> >>>>> [2019-01-23 19:52:33.081457] at >> >>>>> org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:181) >> >>>>> [2019-01-23 19:52:33.081494] at >> >>>>> org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:2037) >> >>>>> [2019-01-23 19:52:33.081517] at >> >>>>> org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1923) >> >>>>> [2019-01-23 19:52:33.081549] at >> >>>>> org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1872) >> >>>>> ... (redacted) ... >> >>>>> [2019-01-23 19:52:33.081728] at >> >>>>> scala.collection.immutable.List.foreach(List.scala:392) >> >>>>> ... (redacted) ... >> >>>>> [2019-01-23 19:52:33.081832] at >> >>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94) >> >>>>> [2019-01-23 19:52:33.081854] at >> >>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) >> >>>>> [2019-01-23 19:52:33.081882] at >> >>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) >> >>>>> [2019-01-23 19:52:33.081904] at >> >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) >> >>>>> [2019-01-23 19:52:33.081946] at >> >>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) >> >>>>> [2019-01-23 19:52:33.081967] at java.lang.Thread.run(Thread.java:748) >> >>>>> >> >>>>> On Tue, Jan 22, 2019 at 2:31 PM Aaron Levin <aaronle...@stripe.com> >> >>>>> wrote: >> >>>>>> >> >>>>>> Hey Ufuk, >> >>>>>> >> >>>>>> So, I looked into this a little bit: >> >>>>>> >> >>>>>> 1. clarification: my issues are with the hadoop-related snappy >> >>>>>> libraries and not libsnappy itself (this is my bad for not being >> >>>>>> clearer, sorry!). I already have `libsnappy` on my classpath, but I >> >>>>>> am looking into including the hadoop snappy libraries. >> >>>>>> 2. exception: I don't see the class loading error. I'm going to try >> >>>>>> to put some more instrumentation and see if I can get a clearer >> >>>>>> stacktrace (right now I get an NPE on closing a sequence file in a >> >>>>>> finalizer - when I last logged the exception it was something deep in >> >>>>>> hadoop's snappy libs - I'll get clarification soon). >> >>>>>> 3. I'm looking into including hadoop's snappy libs in my jar and >> >>>>>> we'll see if that resolves the problem. >> >>>>>> >> >>>>>> Thanks again for your help! >> >>>>>> >> >>>>>> Best, >> >>>>>> >> >>>>>> Aaron Levin >> >>>>>> >> >>>>>> On Tue, Jan 22, 2019 at 10:47 AM Aaron Levin <aaronle...@stripe.com> >> >>>>>> wrote: >> >>>>>>> >> >>>>>>> Hey, >> >>>>>>> >> >>>>>>> Thanks so much for the help! This is awesome. I'll start looking >> >>>>>>> into all of this right away and report back. >> >>>>>>> >> >>>>>>> Best, >> >>>>>>> >> >>>>>>> Aaron Levin >> >>>>>>> >> >>>>>>> On Mon, Jan 21, 2019 at 5:16 PM Ufuk Celebi <u...@apache.org> wrote: >> >>>>>>>> >> >>>>>>>> Hey Aaron, >> >>>>>>>> >> >>>>>>>> sorry for the late reply. >> >>>>>>>> >> >>>>>>>> (1) I think I was able to reproduce this issue using snappy-java. >> >>>>>>>> I've >> >>>>>>>> filed a ticket here: >> >>>>>>>> https://issues.apache.org/jira/browse/FLINK-11402. Can you check the >> >>>>>>>> ticket description whether it's in line with what you are >> >>>>>>>> experiencing? Most importantly, do you see the same Exception being >> >>>>>>>> reported after cancelling and re-starting the job? >> >>>>>>>> >> >>>>>>>> (2) I don't think it's caused by the environment options not being >> >>>>>>>> picked up. You can check the head of the log files of the JobManager >> >>>>>>>> or TaskManager to verify that your provided option is picked up as >> >>>>>>>> expected. You should see something similar to this: >> >>>>>>>> >> >>>>>>>> 2019-01-21 22:53:49,863 INFO >> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - >> >>>>>>>> -------------------------------------------------------------------------------- >> >>>>>>>> 2019-01-21 22:53:49,864 INFO >> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - >> >>>>>>>> Starting StandaloneSessionClusterEntrypoint (Version: 1.7.0, >> >>>>>>>> Rev:49da9f9, Date:28.11.2018 @ 17:59:06 UTC) >> >>>>>>>> ... >> >>>>>>>> 2019-01-21 22:53:49,865 INFO >> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - JVM >> >>>>>>>> Options: >> >>>>>>>> 2019-01-21 22:53:49,865 INFO >> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - >> >>>>>>>> -Xms1024m >> >>>>>>>> 2019-01-21 22:53:49,865 INFO >> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - >> >>>>>>>> -Xmx1024m >> >>>>>>>> You are looking for this line ----> 2019-01-21 22:53:49,865 INFO >> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - >> >>>>>>>> -Djava.library.path=/.../org/xerial/snappy/native/Mac/x86_64/ <---- >> >>>>>>>> 2019-01-21 22:53:49,865 INFO >> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - >> >>>>>>>> -Dlog.file=/.../flink-1.7.0/log/flink-standalonesession-0.local.log >> >>>>>>>> ... >> >>>>>>>> 2019-01-21 22:53:49,866 INFO >> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - >> >>>>>>>> Program Arguments: >> >>>>>>>> 2019-01-21 22:53:49,866 INFO >> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - >> >>>>>>>> --configDir >> >>>>>>>> 2019-01-21 22:53:49,866 INFO >> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - >> >>>>>>>> /.../flink-1.7.0/conf >> >>>>>>>> 2019-01-21 22:53:49,866 INFO >> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - >> >>>>>>>> --executionMode >> >>>>>>>> 2019-01-21 22:53:49,866 INFO >> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - >> >>>>>>>> cluster >> >>>>>>>> ... >> >>>>>>>> 2019-01-21 22:53:49,866 INFO >> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - >> >>>>>>>> -------------------------------------------------------------------------------- >> >>>>>>>> >> >>>>>>>> Can you verify that you see the log messages as expected? >> >>>>>>>> >> >>>>>>>> (3) As noted FLINK-11402, is it possible to package the snappy >> >>>>>>>> library >> >>>>>>>> as part of your user code instead of loading the library via >> >>>>>>>> java.library.path? In my example, that seems to work fine. >> >>>>>>>> >> >>>>>>>> – Ufuk >> >>>>>>>> >> >>>>>>>> On Thu, Jan 17, 2019 at 5:53 PM Aaron Levin <aaronle...@stripe.com> >> >>>>>>>> wrote: >> >>>>>>>> > >> >>>>>>>> > Hello! >> >>>>>>>> > >> >>>>>>>> > *tl;dr*: settings in `env.java.opts` seem to stop having impact >> >>>>>>>> > when a job is canceled or fails and then is restarted (with or >> >>>>>>>> > without savepoint/checkpoints). If I restart the task-managers, >> >>>>>>>> > the `env.java.opts` seem to start having impact again and our job >> >>>>>>>> > will run without failure. More below. >> >>>>>>>> > >> >>>>>>>> > We use consume Snappy-compressed sequence files in our flink job. >> >>>>>>>> > This requires access to the hadoop native libraries. In our >> >>>>>>>> > `flink-conf.yaml` for both the task manager and the job manager, >> >>>>>>>> > we put: >> >>>>>>>> > >> >>>>>>>> > ``` >> >>>>>>>> > env.java.opts: -Djava.library.path=/usr/local/hadoop/lib/native >> >>>>>>>> > ``` >> >>>>>>>> > >> >>>>>>>> > If I launch our job on freshly-restarted task managers, the job >> >>>>>>>> > operates fine. If at some point I cancel the job or if the job >> >>>>>>>> > restarts for some other reason, the job will begin to crashloop >> >>>>>>>> > because it tries to open a Snappy-compressed file but doesn't >> >>>>>>>> > have access to the codec from the native hadoop libraries in >> >>>>>>>> > `/usr/local/hadoop/lib/native`. If I then restart the task >> >>>>>>>> > manager while the job is crashlooping, the job is start running >> >>>>>>>> > without any codec failures. >> >>>>>>>> > >> >>>>>>>> > The only reason I can conjure that would cause the Snappy >> >>>>>>>> > compression to fail is if the `env.java.opts` were not being >> >>>>>>>> > passed through to the job on restart for some reason. >> >>>>>>>> > >> >>>>>>>> > Does anyone know what's going on? Am I missing some additional >> >>>>>>>> > configuration? I really appreciate any help! >> >>>>>>>> > >> >>>>>>>> > About our setup: >> >>>>>>>> > >> >>>>>>>> > - Flink Version: 1.7.0 >> >>>>>>>> > - Deployment: Standalone in HA >> >>>>>>>> > - Hadoop/S3 setup: we do *not* set `HADOOP_CLASSPATH`. We use >> >>>>>>>> > Flink’s shaded jars to access our files in S3. We do not use the >> >>>>>>>> > `bundled-with-hadoop` distribution of Flink. >> >>>>>>>> > >> >>>>>>>> > Best, >> >>>>>>>> > >> >>>>>>>> > Aaron Levin