Ok, now the question is also about what classloaders Ignite is creating and how 
they are used, but the relevant code line in Flink is probably in 
FlinkMiniCluster.scala, line 538 (current master):

 try {
 JobClient.submitJobAndWait(
   clientActorSystem,
   configuration,
   leaderRetrievalService,
   jobGraph,
   timeout,
   printUpdates,
   this.getClass.getClassLoader())
} finally {
   if(!useSingleActorSystem) {
     // we have to shutdown the just created actor system
     shutdownJobClientActorSystem(clientActorSystem)
   }
 }

This is what is executed as part of executing a job through LocalEnvironment. 
As we can see, the classloader is set to the classloader of FlinkMiniCluster. 
Depending on the classloader structure inside Ignite, this classloader might 
not know your user code. What you could do is changing this line in a custom 
Flink build, changing line 538 for example to 
Thread.currentThread().getContextClassloader() and ensuring that the context 
classloader ins the runnable is a classloader that a) knows the user code and 
b) is a child of the classloader that knows the Ignite and Flink classes. 
Notice that this is not a general solution and should not become a general fix.

I have heard that Till is about to change some things about local execution, so 
I included him in CC. Maybe he can provide additional hints how your use case 
might be better supported in the upcoming Flink 1.3.

Best,
Stefan

> Am 25.04.2017 um 22:50 schrieb Matt <dromitl...@gmail.com>:
> 
> I updated the code a little bit for clarity, now the line #56 mentioned in my 
> previous message is line #25.
> 
> In summary the error I'm getting is this:
> 
> ---
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: 
> Cannot load user class: com.test.Test
> ClassLoader info: URL ClassLoader:
> Class not resolvable through given classloader.
> ---
> 
> But if I'm not wrong, after trying to load the class through URLClassLoader, 
> Flink should try loading it with its parent ClassLoader, which should be the 
> same ClassLoader that executed the environment, and it does have access to 
> the class.
> 
> Not sure what is wrong.
> 
> On Tue, Apr 25, 2017 at 5:38 PM, Matt <dromitl...@gmail.com 
> <mailto:dromitl...@gmail.com>> wrote:
> Hi Stefan,
> 
> Check the code here: https://gist.github.com/17d82ee7dd921a0d649574a361cc017d 
> <https://gist.github.com/17d82ee7dd921a0d649574a361cc017d> , the output is at 
> the bottom of the page.
> 
> Here are the results of the additional tests you mentioned:
> 
> 1. I was able to instantiate an inner class (Test$Foo) inside the Ignite 
> closure, no problem with that
> 2. I tried implementing SourceFunction and SinkFunction in Test itself, I was 
> able to instantiate the class inside the Ignite closure
> 3. I'm not sure what you meant in this point, is it something like what I 
> tried in line #56?
> 
> Additionally, I tried implementing the SourceFunction and SinkFunction in 
> Test$Foo with the same result: it says "Cannot load user class: 
> com.test.Test$Foo"
> 
> Looks like Flink is not using the correct ClassLoader. Any idea?
> 
> Regards,
> Matt
> 
> On Tue, Apr 25, 2017 at 7:00 AM, Stefan Richter <s.rich...@data-artisans.com 
> <mailto:s.rich...@data-artisans.com>> wrote:
> Hi,
> 
> I would expect that the local environment picks up the class path from the 
> code that launched it. So I think the question is what happens behind the 
> scenes when you call ignite.compute().broadcast(runnable); . Which classes 
> are shipped and how is the classpath build in the environment that runs the 
> code. Your example is also not fully conclusive, because com.myproj.Test 
> (which you can successfully instantiate) and com.myproj.Test$1$2 (which 
> fails) are different classes, so maybe only the outer class is shipped with 
> the broadcast call. My theory is that not all classes are shipped (e.g. inner 
> classes), but only Test . You could try three things to analyze to problem a 
> little more:
> 
> 1) Create another inner class inside Test and try if you are still able to 
> instantiate also this class via reflection.
> 2) Let Test class itself implement the map function (avoiding the usage of 
> other/inner classes) and see if this works.
> 3) Check and set the thread’s context classloader inside the runnable to 
> something that contains all required classes and see if this gets picked up 
> by Flink.
> 
> Best,
> Stefan
> 
>> Am 25.04.2017 um 07:27 schrieb Matt <dromitl...@gmail.com 
>> <mailto:dromitl...@gmail.com>>:
>> 
>> Hi all,
>> 
>> I'm trying to run Flink using a local environment, but on an Ignite node to 
>> achieve collocation (as mentioned in my previous message on this list).
>> 
>> Have a look at the code in [1]. It's pretty simple, but I'm getting a 
>> "cannot load user class" error as shown in [2].
>> 
>> If you check line #29 on the code, I'm able to create an instance of class 
>> Test, and it's the same context from which I'm creating the Flink job. 
>> Shouldn't it work provided I'm using a local environment?
>> 
>> It would be really nice to be able to inject a ClassLoader into the chunk of 
>> code that creates the job. Is this currently possible?
>> 
>> Any fix or workaround is appreciated!
>> 
>> Best,
>> Matt
>> 
>> [1] https://gist.github.com/f248187b9638023b95ba8bd9d7f06215 
>> <https://gist.github.com/f248187b9638023b95ba8bd9d7f06215>
>> [2] https://gist.github.com/796ee05425535ece1736df7b1e884cce 
>> <https://gist.github.com/796ee05425535ece1736df7b1e884cce>
> 
> 

Reply via email to