Let's wait for Till then, I hope he can figure this out.

On Wed, Apr 26, 2017 at 11:03 AM, Stefan Richter <
s.rich...@data-artisans.com> wrote:

> Ok, now the question is also about what classloaders Ignite is creating
> and how they are used, but the relevant code line in Flink is probably in
> FlinkMiniCluster.scala, line 538 (current master):
>
>  try {
>  JobClient.submitJobAndWait(
>    clientActorSystem,
>    configuration,
>    leaderRetrievalService,
>    jobGraph,
>    timeout,
>    printUpdates,
>    this.getClass.getClassLoader())
> } finally {
>    if(!useSingleActorSystem) {
>      // we have to shutdown the just created actor system
>      shutdownJobClientActorSystem(clientActorSystem)
>    }
>  }
>
>
> This is what is executed as part of executing a job through
> LocalEnvironment. As we can see, the classloader is set to the classloader
> of FlinkMiniCluster. Depending on the classloader structure inside Ignite,
> this classloader might not know your user code. What you could do is
> changing this line in a custom Flink build, changing line 538 for example
> to Thread.currentThread().getContextClassloader() and ensuring that the
> context classloader ins the runnable is a classloader that a) knows the
> user code and b) is a child of the classloader that knows the Ignite and
> Flink classes. Notice that this is not a general solution and should not
> become a general fix.
>
> I have heard that Till is about to change some things about local
> execution, so I included him in CC. Maybe he can provide additional hints
> how your use case might be better supported in the upcoming Flink 1.3.
>
> Best,
> Stefan
>
> Am 25.04.2017 um 22:50 schrieb Matt <dromitl...@gmail.com>:
>
> I updated the code a little bit for clarity, now the line #56 mentioned in
> my previous message is line #25.
>
> In summary the error I'm getting is this:
>
> ---
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
> Cannot load user class: com.test.Test
> ClassLoader info: URL ClassLoader:
> Class not resolvable through given classloader.
> ---
>
> But if I'm not wrong, after trying to load the class through
> URLClassLoader, Flink should try loading it with its parent ClassLoader,
> which should be the same ClassLoader that executed the environment, and it
> does have access to the class.
>
> Not sure what is wrong.
>
> On Tue, Apr 25, 2017 at 5:38 PM, Matt <dromitl...@gmail.com> wrote:
>
>> Hi Stefan,
>>
>> Check the code here: https://gist.github.com/
>> 17d82ee7dd921a0d649574a361cc017d , the output is at the bottom of the
>> page.
>>
>> Here are the results of the additional tests you mentioned:
>>
>> 1. I was able to instantiate an inner class (Test$Foo) inside the Ignite
>> closure, no problem with that
>> 2. I tried implementing SourceFunction and SinkFunction in Test itself, I
>> was able to instantiate the class inside the Ignite closure
>> 3. I'm not sure what you meant in this point, is it something like what I
>> tried in line #56?
>>
>> Additionally, I tried implementing the SourceFunction and SinkFunction in
>> Test$Foo with the same result: it says "Cannot load user class:
>> com.test.Test$Foo"
>>
>> Looks like Flink is not using the correct ClassLoader. Any idea?
>>
>> Regards,
>> Matt
>>
>> On Tue, Apr 25, 2017 at 7:00 AM, Stefan Richter <
>> s.rich...@data-artisans.com> wrote:
>>
>>> Hi,
>>>
>>> I would expect that the local environment picks up the class path from
>>> the code that launched it. So I think the question is what happens behind
>>> the scenes when you call ignite.compute().broadcast(runnable); . Which
>>> classes are shipped and how is the classpath build in the environment that
>>> runs the code. Your example is also not fully conclusive, because
>>> com.myproj.Test (which you can successfully instantiate) and
>>> com.myproj.Test$1$2 (which fails) are different classes, so maybe only the
>>> outer class is shipped with the broadcast call. My theory is that not all
>>> classes are shipped (e.g. inner classes), but only Test . You could try
>>> three things to analyze to problem a little more:
>>>
>>> 1) Create another inner class inside Test and try if you are still able
>>> to instantiate also this class via reflection.
>>> 2) Let Test class itself implement the map function (avoiding the usage
>>> of other/inner classes) and see if this works.
>>> 3) Check and set the thread’s context classloader inside the runnable to
>>> something that contains all required classes and see if this gets picked up
>>> by Flink.
>>>
>>> Best,
>>> Stefan
>>>
>>> Am 25.04.2017 um 07:27 schrieb Matt <dromitl...@gmail.com>:
>>>
>>> Hi all,
>>>
>>> I'm trying to run Flink using a local environment, but on an Ignite node
>>> to achieve collocation (as mentioned in my previous message on this list).
>>>
>>> Have a look at the code in [1]. It's pretty simple, but I'm getting a
>>> "cannot load user class" error as shown in [2].
>>>
>>> If you check line #29 on the code, I'm able to create an instance of
>>> class Test, and it's the same context from which I'm creating the Flink
>>> job. Shouldn't it work provided I'm using a local environment?
>>>
>>> It would be really nice to be able to inject a ClassLoader into the
>>> chunk of code that creates the job. Is this currently possible?
>>>
>>> Any fix or workaround is appreciated!
>>>
>>> Best,
>>> Matt
>>>
>>> [1] https://gist.github.com/f248187b9638023b95ba8bd9d7f06215
>>> [2] https://gist.github.com/796ee05425535ece1736df7b1e884cce
>>>
>>>
>>>
>>
>
>

Reply via email to