Hey Aaron,

I'm glad to hear that you resolved the issue.

I think a docs contribution for this would be very helpful and could
update this page:
https://github.com/apache/flink/blob/master/docs/monitoring/debugging_classloading.md.

If you want to create a separate JIRA ticket for this, ping me with
your JIRA username and I'll add you to the list of contributors (which
gives you permissions to create tickets).

I'll think a bit more about the other points you mentioned and get
back to you if I have another idea.

Best,

Ufuk

On Tue, Jan 29, 2019 at 10:48 PM Aaron Levin <aaronle...@stripe.com> wrote:
>
> Hi Ufuk,
>
> I'll answer your question, but first I'll give you an update on how we 
> resolved the issue:
>
> * adding `org.apache.hadoop.io.compress.SnappyCodec` to 
> `classloader.parent-first-patterns.additional` in `flink-conf.yaml` (though, 
> putting `org.apache.hadoop.util.NativeCodeLoader` also worked)
> * putting a jar with `hadoop-common` + it's transitive dependencies, then 
> using jarjar[0] to `keep org.apache.hadoop.io.compress.SnappyCodec` (and its 
> transitive dependencies). So we end up with jar that has `SnappyCodec` and 
> whatever it needs to call transitively. We put this jar on the task manager 
> classpath.
>
> I believe `SnappyCodec` was being called via our code. This worked the first 
> time but deploying a second time caused `libhadoop.so` to be loaded in a 
> second class loader. By putting a jar with `SnappyCodec` and it's transitive 
> dependencies on the task manager classpath and specifying that `SnappyCodec` 
> needs to be loaded from the parent classloader, we ensure that only one 
> classloader loads `libhadoop.so`. I don't think this is the best way to 
> achieve what we want, but it works for now.
>
> Next steps: if no one is on it, I can take a stab at updating the 
> documentation to clarify how to debug and resolve Native library loading. 
> This was a nice learning experience and I think it'll be helpful to have this 
> in the docs for those who aren't well-versed in how classloading on the JVM 
> works!
>
> To answer your questions:
>
> 1. We install hadoop on our machines and tell flink task managers to access 
> it via `env.java.opts: -Djava.library.path=/usr/local/hadoop/lib/native` in 
> `flink-conf.yaml`
> 2. We put flink's shaded hadoop-fs-s3 on both the task manager and job 
> manager classpath (I believe this is only used by the Job Managers when they 
> interact with S3 for checkpoints etc. I don't believe any user code is using 
> this).
> 3. Our flink applications consist of a "fat jar" that has some 
> `org.apache.hadoop` dependencies bundled with it. I believe this is the 
> source of why we're loading `SnappyCodec` twice and triggering this issue.
> 4. For example code: we have a small wrapper around 
> `org.apache.flink.api.common.io.FileInputFormat` which does the work with 
> sequence files. It looks like (after removing some stuff to make it more 
> clear):
>
> ```
> abstract class FlinkSequenceFileInputFormat[T, K <: Writable, V <: Writable](
>     typeInformation: TypeInformation[T]
> ) extends FileInputFormat[T]
>     with ResultTypeQueryable[T] {
>   @transient private var bufferedNextRecord: T = _
>   @transient private var hadoopStream: HadoopFSDataInputStream = _
>   @transient private var sequenceFileReader: SequenceFile.Reader = _
>
>   unsplittable = true
>   enumerateNestedFiles = true
>
>   // *****************************************
>   // This is where we'd see exceptions.
>   // *****************************************
>   override def open(fileSplit: FileInputSplit): Unit = {
>     super.open(fileSplit)
>     val config = new Configuration()
>     hadoopStream = WrappedHadoopInputStream.wrap(stream)
>     sequenceFileReader = new SequenceFile.Reader(config, 
> SequenceFile.Reader.stream(hadoopStream))
>     bufferNextRecord()
>   }
> ...
> }
>
> // AND
>
> class WrappedHadoopInputStream(underlying: FlinkFSDataInputStream)
>     extends InputStream
>     with Seekable
>     with PositionedReadable {
>
>   def read(): Int = underlying.read()
>   def seek(pos: Long): Unit = underlying.seek(pos)
>   def getPos: Long = underlying.getPos
> }
> ...
> ```
>
> Thanks for all your help, I appreciate it! I wouldn't have been able to debug 
> and resolve this if it wasn't for you filing the ticket. Thank you so much!
>
> [0] https://github.com/pantsbuild/jarjar
>
> Aaron Levin
>
> On Mon, Jan 28, 2019 at 4:16 AM Ufuk Celebi <u...@apache.org> wrote:
>>
>> Hey Aaron,
>>
>> sorry for the late reply (again).
>>
>> (1) I think that your final result is in line with what I have
>> reproduced in https://issues.apache.org/jira/browse/FLINK-11402.
>>
>> (2) I think renaming the file would not help as it will still be
>> loaded multiple times when the jobs restarts (as it happens in
>> FLINK-11402).
>>
>> (3) I'll try to check whether Flink's shading of Hadoop is related to
>> this. I don't think so though. @Chesnay (cc'd): What do you think?
>>
>> (4) @Aaron: Can you tell me which Hadoop libraries you use and share
>> some code so I can try to reproduce this exactly on my side? Judging
>> from the earlier stack traces you have shared, I'm assuming you are
>> trying to read Snappy-compressed sequence files.
>>
>> – Ufuk
>>
>> On Fri, Jan 25, 2019 at 4:37 PM Aaron Levin <aaronle...@stripe.com> wrote:
>> >
>> > I don't control the code calling `System.loadLibrary("hadoop")` so that's 
>> > not an option for me, unfortunately.
>> >
>> > On Thu, Jan 24, 2019 at 7:47 PM Guowei Ma <guowei....@gmail.com> wrote:
>> >>
>> >> This may be caused by a  jvm process can only load a so once.So a triky 
>> >> way is to rename it。
>> >>
>> >> 发自我的 iPhone
>> >>
>> >> 在 2019年1月25日,上午7:12,Aaron Levin <aaronle...@stripe.com> 写道:
>> >>
>> >> Hi Ufuk,
>> >>
>> >> Update: I've pinned down the issue. It's multiple classloaders loading 
>> >> `libhadoop.so`:
>> >>
>> >> ```
>> >> failed to load native hadoop with error: java.lang.UnsatisfiedLinkError: 
>> >> Native Library /usr/lib/libhadoop.so already loaded in another classloader
>> >> ```
>> >>
>> >> I'm not quite sure what the solution is. Ideally flink would destroy a 
>> >> classloader when a job is canceled, but perhaps there's a jvm limitation 
>> >> there? Putting the libraries into `/usr/lib` or `/lib` does not work (as 
>> >> suggested by Chesnay in the ticket) as I get the same error. I might see 
>> >> if I can put a jar with `org.apache.hadoop.common.io.compress` in 
>> >> `/flink/install/lib` and then remove it from my jar. It's not an ideal 
>> >> solution but I can't think of anything else.
>> >>
>> >> Best,
>> >>
>> >> Aaron Levin
>> >>
>> >> On Thu, Jan 24, 2019 at 12:18 PM Aaron Levin <aaronle...@stripe.com> 
>> >> wrote:
>> >>>
>> >>> Hi Ufuk,
>> >>>
>> >>> I'm starting to believe the bug is much deeper than the originally 
>> >>> reported error because putting the libraries in `/usr/lib` or `/lib` 
>> >>> does not work. This morning I dug into why putting `libhadoop.so` into 
>> >>> `/usr/lib` didn't work, despite that being in the `java.library.path` at 
>> >>> the call site of the error. I wrote a small program to test the loading 
>> >>> of native libraries, and it was able to successfully load 
>> >>> `libhadoop.so`. I'm very perplexed. Could this be related to the way 
>> >>> flink shades hadoop stuff?
>> >>>
>> >>> Here is my program and its output:
>> >>>
>> >>> ```
>> >>> $ cat LibTest.scala
>> >>> package com.redacted.flink
>> >>>
>> >>> object LibTest {
>> >>>   def main(args: Array[String]): Unit = {
>> >>>     val library = args(0)
>> >>>     
>> >>> System.out.println(s"java.library.path=${System.getProperty("java.library.path")}")
>> >>>     System.out.println(s"Attempting to load $library")
>> >>>     System.out.flush()
>> >>>     System.loadLibrary(library)
>> >>>     System.out.println(s"Successfully loaded ")
>> >>>     System.out.flush()
>> >>> }
>> >>> ```
>> >>>
>> >>> I then tried running that on one of the task managers with `hadoop` as 
>> >>> an argument:
>> >>>
>> >>> ```
>> >>> $ java -jar lib_test_deploy.jar hadoop
>> >>> java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
>> >>> Attempting to load hadoop
>> >>> Exception in thread "main" java.lang.UnsatisfiedLinkError: no hadoop in 
>> >>> java.library.path
>> >>> at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1867)
>> >>> at java.lang.Runtime.loadLibrary0(Runtime.java:870)
>> >>> at java.lang.System.loadLibrary(System.java:1122)
>> >>> at com.stripe.flink.LibTest$.main(LibTest.scala:11)
>> >>> at com.stripe.flink.LibTest.main(LibTest.scala)
>> >>> ```
>> >>>
>> >>> I then copied the native libraries into `/usr/lib/` and ran it again:
>> >>>
>> >>> ```
>> >>> $ sudo cp /usr/local/hadoop/lib/native/libhadoop.so /usr/lib/
>> >>> $ java -jar lib_test_deploy.jar hadoop
>> >>> java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
>> >>> Attempting to load hadoop
>> >>> Successfully loaded
>> >>> ```
>> >>>
>> >>> Any ideas?
>> >>>
>> >>> On Wed, Jan 23, 2019 at 7:13 PM Aaron Levin <aaronle...@stripe.com> 
>> >>> wrote:
>> >>>>
>> >>>> Hi Ufuk,
>> >>>>
>> >>>> One more update: I tried copying all the hadoop native `.so` files 
>> >>>> (mainly `libhadoop.so`) into `/lib` and am I still experiencing the 
>> >>>> issue I reported. I also tried naively adding the `.so` files to the 
>> >>>> jar with the flink application and am still experiencing the issue I 
>> >>>> reported (however, I'm going to investigate this further as I might not 
>> >>>> have done it correctly).
>> >>>>
>> >>>> Best,
>> >>>>
>> >>>> Aaron Levin
>> >>>>
>> >>>> On Wed, Jan 23, 2019 at 3:18 PM Aaron Levin <aaronle...@stripe.com> 
>> >>>> wrote:
>> >>>>>
>> >>>>> Hi Ufuk,
>> >>>>>
>> >>>>> Two updates:
>> >>>>>
>> >>>>> 1. As suggested in the ticket, I naively copied the every `.so` in 
>> >>>>> `hadoop-3.0.0/lib/native/` into `/lib/` and this did not seem to help. 
>> >>>>> My knowledge of how shared libs get picked up is hazy, so I'm not sure 
>> >>>>> if blindly copying them like that should work. I did check what 
>> >>>>> `System.getProperty("java.library.path")` returns at the call-site and 
>> >>>>> it's: 
>> >>>>> java.library.path=/usr/java/packages/lib/amd64:/usr/lib/x86_64-linux-gnu/jni:/lib/x86_64-linux-gnu:/usr/lib/x86_64-linux-gnu:/usr/lib/jni:/lib:/usr/lib
>> >>>>> 2. The exception I see comes from 
>> >>>>> `hadoop.util.NativeCodeLoader.buildSupportsSnappy` (stack-trace 
>> >>>>> below). This uses `System.loadLibrary("hadoop")`.
>> >>>>>
>> >>>>> [2019-01-23 19:52:33.081216] java.lang.UnsatisfiedLinkError: 
>> >>>>> org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()Z
>> >>>>> [2019-01-23 19:52:33.081376]  at 
>> >>>>> org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native 
>> >>>>> Method)
>> >>>>> [2019-01-23 19:52:33.081406]  at 
>> >>>>> org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)
>> >>>>> [2019-01-23 19:52:33.081429]  at 
>> >>>>> org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:195)
>> >>>>> [2019-01-23 19:52:33.081457]  at 
>> >>>>> org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:181)
>> >>>>> [2019-01-23 19:52:33.081494]  at 
>> >>>>> org.apache.hadoop.io.SequenceFile$Reader.init(SequenceFile.java:2037)
>> >>>>> [2019-01-23 19:52:33.081517]  at 
>> >>>>> org.apache.hadoop.io.SequenceFile$Reader.initialize(SequenceFile.java:1923)
>> >>>>> [2019-01-23 19:52:33.081549]  at 
>> >>>>> org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1872)
>> >>>>> ... (redacted) ...
>> >>>>> [2019-01-23 19:52:33.081728]  at 
>> >>>>> scala.collection.immutable.List.foreach(List.scala:392)
>> >>>>> ... (redacted) ...
>> >>>>> [2019-01-23 19:52:33.081832]  at 
>> >>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
>> >>>>> [2019-01-23 19:52:33.081854]  at 
>> >>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
>> >>>>> [2019-01-23 19:52:33.081882]  at 
>> >>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
>> >>>>> [2019-01-23 19:52:33.081904]  at 
>> >>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>> >>>>> [2019-01-23 19:52:33.081946]  at 
>> >>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
>> >>>>> [2019-01-23 19:52:33.081967]  at java.lang.Thread.run(Thread.java:748)
>> >>>>>
>> >>>>> On Tue, Jan 22, 2019 at 2:31 PM Aaron Levin <aaronle...@stripe.com> 
>> >>>>> wrote:
>> >>>>>>
>> >>>>>> Hey Ufuk,
>> >>>>>>
>> >>>>>> So, I looked into this a little bit:
>> >>>>>>
>> >>>>>> 1. clarification: my issues are with the hadoop-related snappy 
>> >>>>>> libraries and not libsnappy itself (this is my bad for not being 
>> >>>>>> clearer, sorry!). I already have `libsnappy` on my classpath, but I 
>> >>>>>> am looking into including the hadoop snappy libraries.
>> >>>>>> 2. exception: I don't see the class loading error. I'm going to try 
>> >>>>>> to put some more instrumentation and see if I can get a clearer 
>> >>>>>> stacktrace (right now I get an NPE on closing a sequence file in a 
>> >>>>>> finalizer - when I last logged the exception it was something deep in 
>> >>>>>> hadoop's snappy libs - I'll get clarification soon).
>> >>>>>> 3. I'm looking into including hadoop's snappy libs in my jar and 
>> >>>>>> we'll see if that resolves the problem.
>> >>>>>>
>> >>>>>> Thanks again for your help!
>> >>>>>>
>> >>>>>> Best,
>> >>>>>>
>> >>>>>> Aaron Levin
>> >>>>>>
>> >>>>>> On Tue, Jan 22, 2019 at 10:47 AM Aaron Levin <aaronle...@stripe.com> 
>> >>>>>> wrote:
>> >>>>>>>
>> >>>>>>> Hey,
>> >>>>>>>
>> >>>>>>> Thanks so much for the help! This is awesome. I'll start looking 
>> >>>>>>> into all of this right away and report back.
>> >>>>>>>
>> >>>>>>> Best,
>> >>>>>>>
>> >>>>>>> Aaron Levin
>> >>>>>>>
>> >>>>>>> On Mon, Jan 21, 2019 at 5:16 PM Ufuk Celebi <u...@apache.org> wrote:
>> >>>>>>>>
>> >>>>>>>> Hey Aaron,
>> >>>>>>>>
>> >>>>>>>> sorry for the late reply.
>> >>>>>>>>
>> >>>>>>>> (1) I think I was able to reproduce this issue using snappy-java. 
>> >>>>>>>> I've
>> >>>>>>>> filed a ticket here:
>> >>>>>>>> https://issues.apache.org/jira/browse/FLINK-11402. Can you check the
>> >>>>>>>> ticket description whether it's in line with what you are
>> >>>>>>>> experiencing? Most importantly, do you see the same Exception being
>> >>>>>>>> reported after cancelling and re-starting the job?
>> >>>>>>>>
>> >>>>>>>> (2) I don't think it's caused by the environment options not being
>> >>>>>>>> picked up. You can check the head of the log files of the JobManager
>> >>>>>>>> or TaskManager to verify that your provided option is picked up as
>> >>>>>>>> expected. You should see something similar to this:
>> >>>>>>>>
>> >>>>>>>> 2019-01-21 22:53:49,863 INFO
>> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>> >>>>>>>> --------------------------------------------------------------------------------
>> >>>>>>>> 2019-01-21 22:53:49,864 INFO
>> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>> >>>>>>>> Starting StandaloneSessionClusterEntrypoint (Version: 1.7.0,
>> >>>>>>>> Rev:49da9f9, Date:28.11.2018 @ 17:59:06 UTC)
>> >>>>>>>> ...
>> >>>>>>>> 2019-01-21 22:53:49,865 INFO
>> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM
>> >>>>>>>> Options:
>> >>>>>>>> 2019-01-21 22:53:49,865 INFO
>> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>> >>>>>>>> -Xms1024m
>> >>>>>>>> 2019-01-21 22:53:49,865 INFO
>> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>> >>>>>>>> -Xmx1024m
>> >>>>>>>> You are looking for this line ----> 2019-01-21 22:53:49,865 INFO
>> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>> >>>>>>>> -Djava.library.path=/.../org/xerial/snappy/native/Mac/x86_64/ <----
>> >>>>>>>> 2019-01-21 22:53:49,865 INFO
>> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>> >>>>>>>> -Dlog.file=/.../flink-1.7.0/log/flink-standalonesession-0.local.log
>> >>>>>>>> ...
>> >>>>>>>> 2019-01-21 22:53:49,866 INFO
>> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>> >>>>>>>> Program Arguments:
>> >>>>>>>> 2019-01-21 22:53:49,866 INFO
>> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>> >>>>>>>> --configDir
>> >>>>>>>> 2019-01-21 22:53:49,866 INFO
>> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>> >>>>>>>> /.../flink-1.7.0/conf
>> >>>>>>>> 2019-01-21 22:53:49,866 INFO
>> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>> >>>>>>>> --executionMode
>> >>>>>>>> 2019-01-21 22:53:49,866 INFO
>> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>> >>>>>>>> cluster
>> >>>>>>>> ...
>> >>>>>>>> 2019-01-21 22:53:49,866 INFO
>> >>>>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -
>> >>>>>>>> --------------------------------------------------------------------------------
>> >>>>>>>>
>> >>>>>>>> Can you verify that you see the log messages as expected?
>> >>>>>>>>
>> >>>>>>>> (3) As noted FLINK-11402, is it possible to package the snappy 
>> >>>>>>>> library
>> >>>>>>>> as part of your user code instead of loading the library via
>> >>>>>>>> java.library.path? In my example, that seems to work fine.
>> >>>>>>>>
>> >>>>>>>> – Ufuk
>> >>>>>>>>
>> >>>>>>>> On Thu, Jan 17, 2019 at 5:53 PM Aaron Levin <aaronle...@stripe.com> 
>> >>>>>>>> wrote:
>> >>>>>>>> >
>> >>>>>>>> > Hello!
>> >>>>>>>> >
>> >>>>>>>> > *tl;dr*: settings in `env.java.opts` seem to stop having impact 
>> >>>>>>>> > when a job is canceled or fails and then is restarted (with or 
>> >>>>>>>> > without savepoint/checkpoints). If I restart the task-managers, 
>> >>>>>>>> > the `env.java.opts` seem to start having impact again and our job 
>> >>>>>>>> > will run without failure. More below.
>> >>>>>>>> >
>> >>>>>>>> > We use consume Snappy-compressed sequence files in our flink job. 
>> >>>>>>>> > This requires access to the hadoop native libraries. In our 
>> >>>>>>>> > `flink-conf.yaml` for both the task manager and the job manager, 
>> >>>>>>>> > we put:
>> >>>>>>>> >
>> >>>>>>>> > ```
>> >>>>>>>> > env.java.opts: -Djava.library.path=/usr/local/hadoop/lib/native
>> >>>>>>>> > ```
>> >>>>>>>> >
>> >>>>>>>> > If I launch our job on freshly-restarted task managers, the job 
>> >>>>>>>> > operates fine. If at some point I cancel the job or if the job 
>> >>>>>>>> > restarts for some other reason, the job will begin to crashloop 
>> >>>>>>>> > because it tries to open a Snappy-compressed file but doesn't 
>> >>>>>>>> > have access to the codec from the native hadoop libraries in 
>> >>>>>>>> > `/usr/local/hadoop/lib/native`. If I then restart the task 
>> >>>>>>>> > manager while the job is crashlooping, the job is start running 
>> >>>>>>>> > without any codec failures.
>> >>>>>>>> >
>> >>>>>>>> > The only reason I can conjure that would cause the Snappy 
>> >>>>>>>> > compression to fail is if the `env.java.opts` were not being 
>> >>>>>>>> > passed through to the job on restart for some reason.
>> >>>>>>>> >
>> >>>>>>>> > Does anyone know what's going on? Am I missing some additional 
>> >>>>>>>> > configuration? I really appreciate any help!
>> >>>>>>>> >
>> >>>>>>>> > About our setup:
>> >>>>>>>> >
>> >>>>>>>> > - Flink Version: 1.7.0
>> >>>>>>>> > - Deployment: Standalone in HA
>> >>>>>>>> > - Hadoop/S3 setup: we do *not* set `HADOOP_CLASSPATH`. We use 
>> >>>>>>>> > Flink’s shaded jars to access our files in S3. We do not use the 
>> >>>>>>>> > `bundled-with-hadoop` distribution of Flink.
>> >>>>>>>> >
>> >>>>>>>> > Best,
>> >>>>>>>> >
>> >>>>>>>> > Aaron Levin

Reply via email to