Let's wait for Till then, I hope he can figure this out. On Wed, Apr 26, 2017 at 11:03 AM, Stefan Richter < s.rich...@data-artisans.com> wrote:
> Ok, now the question is also about what classloaders Ignite is creating > and how they are used, but the relevant code line in Flink is probably in > FlinkMiniCluster.scala, line 538 (current master): > > try { > JobClient.submitJobAndWait( > clientActorSystem, > configuration, > leaderRetrievalService, > jobGraph, > timeout, > printUpdates, > this.getClass.getClassLoader()) > } finally { > if(!useSingleActorSystem) { > // we have to shutdown the just created actor system > shutdownJobClientActorSystem(clientActorSystem) > } > } > > > This is what is executed as part of executing a job through > LocalEnvironment. As we can see, the classloader is set to the classloader > of FlinkMiniCluster. Depending on the classloader structure inside Ignite, > this classloader might not know your user code. What you could do is > changing this line in a custom Flink build, changing line 538 for example > to Thread.currentThread().getContextClassloader() and ensuring that the > context classloader ins the runnable is a classloader that a) knows the > user code and b) is a child of the classloader that knows the Ignite and > Flink classes. Notice that this is not a general solution and should not > become a general fix. > > I have heard that Till is about to change some things about local > execution, so I included him in CC. Maybe he can provide additional hints > how your use case might be better supported in the upcoming Flink 1.3. > > Best, > Stefan > > Am 25.04.2017 um 22:50 schrieb Matt <dromitl...@gmail.com>: > > I updated the code a little bit for clarity, now the line #56 mentioned in > my previous message is line #25. > > In summary the error I'm getting is this: > > --- > Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: > Cannot load user class: com.test.Test > ClassLoader info: URL ClassLoader: > Class not resolvable through given classloader. > --- > > But if I'm not wrong, after trying to load the class through > URLClassLoader, Flink should try loading it with its parent ClassLoader, > which should be the same ClassLoader that executed the environment, and it > does have access to the class. > > Not sure what is wrong. > > On Tue, Apr 25, 2017 at 5:38 PM, Matt <dromitl...@gmail.com> wrote: > >> Hi Stefan, >> >> Check the code here: https://gist.github.com/ >> 17d82ee7dd921a0d649574a361cc017d , the output is at the bottom of the >> page. >> >> Here are the results of the additional tests you mentioned: >> >> 1. I was able to instantiate an inner class (Test$Foo) inside the Ignite >> closure, no problem with that >> 2. I tried implementing SourceFunction and SinkFunction in Test itself, I >> was able to instantiate the class inside the Ignite closure >> 3. I'm not sure what you meant in this point, is it something like what I >> tried in line #56? >> >> Additionally, I tried implementing the SourceFunction and SinkFunction in >> Test$Foo with the same result: it says "Cannot load user class: >> com.test.Test$Foo" >> >> Looks like Flink is not using the correct ClassLoader. Any idea? >> >> Regards, >> Matt >> >> On Tue, Apr 25, 2017 at 7:00 AM, Stefan Richter < >> s.rich...@data-artisans.com> wrote: >> >>> Hi, >>> >>> I would expect that the local environment picks up the class path from >>> the code that launched it. So I think the question is what happens behind >>> the scenes when you call ignite.compute().broadcast(runnable); . Which >>> classes are shipped and how is the classpath build in the environment that >>> runs the code. Your example is also not fully conclusive, because >>> com.myproj.Test (which you can successfully instantiate) and >>> com.myproj.Test$1$2 (which fails) are different classes, so maybe only the >>> outer class is shipped with the broadcast call. My theory is that not all >>> classes are shipped (e.g. inner classes), but only Test . You could try >>> three things to analyze to problem a little more: >>> >>> 1) Create another inner class inside Test and try if you are still able >>> to instantiate also this class via reflection. >>> 2) Let Test class itself implement the map function (avoiding the usage >>> of other/inner classes) and see if this works. >>> 3) Check and set the thread’s context classloader inside the runnable to >>> something that contains all required classes and see if this gets picked up >>> by Flink. >>> >>> Best, >>> Stefan >>> >>> Am 25.04.2017 um 07:27 schrieb Matt <dromitl...@gmail.com>: >>> >>> Hi all, >>> >>> I'm trying to run Flink using a local environment, but on an Ignite node >>> to achieve collocation (as mentioned in my previous message on this list). >>> >>> Have a look at the code in [1]. It's pretty simple, but I'm getting a >>> "cannot load user class" error as shown in [2]. >>> >>> If you check line #29 on the code, I'm able to create an instance of >>> class Test, and it's the same context from which I'm creating the Flink >>> job. Shouldn't it work provided I'm using a local environment? >>> >>> It would be really nice to be able to inject a ClassLoader into the >>> chunk of code that creates the job. Is this currently possible? >>> >>> Any fix or workaround is appreciated! >>> >>> Best, >>> Matt >>> >>> [1] https://gist.github.com/f248187b9638023b95ba8bd9d7f06215 >>> [2] https://gist.github.com/796ee05425535ece1736df7b1e884cce >>> >>> >>> >> > >