Hi Till,

I just tried with Flink 1.4 by compiling the current master branch on
GitHub (as of this morning) and I still find the same problem as before. If
I'm not wrong your PR was merged already, so your fixes should be part of
the binary.

I hope you have time to have a look at the test case in [1].

Best,
Matt

[1] https://gist.github.com/17d82ee7dd921a0d649574a361cc017d

On Thu, Apr 27, 2017 at 10:09 AM, Matt <dromitl...@gmail.com> wrote:

> Hi Till,
>
> Great! Do you know if it's planned to be included in v1.2.x or should we
> wait for v1.3? I'll give it a try as soon as it's merged.
>
> You're right about this approach launching a mini cluster on each Ignite
> node. That is intentional, as described in my previous message on the list
> [1].
>
> The idea is to collocate Flink jobs on Ignite nodes, so each dataflow only
> processes the elements stored on the local in-memory database. I get the
> impression this should be much faster than randomly picking a Flink node
> and sending all the data over the network.
>
> Any insight on this?
>
> Cheers,
> Matt
>
> [1] http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Flink-on-Ignite-Collocation-td12780.html
>
> On Thu, Apr 27, 2017 at 5:33 AM, Till Rohrmann <trohrm...@apache.org>
> wrote:
>
>> I just copied my response because my other email address is not accepted
>> on the user mailing list.
>>
>> Hi Matt,
>>
>> I think Stefan's analysis is correct. I have a PR open [1], where I fix
>> the issue with the class loader.
>>
>> As a side note, by doing what you're doing, you will spawn on each Ignite
>> node a new Flink mini cluster. These mini cluster won't communicate with
>> each other and run independently. Is this what you intend to do?
>>
>> [1] https://github.com/apache/flink/pull/3781
>>
>> Cheers,
>> Till
>>
>> On Wed, Apr 26, 2017 at 11:12 PM, Matt <dromitl...@gmail.com> wrote:
>>
>>> Let's wait for Till then, I hope he can figure this out.
>>>
>>> On Wed, Apr 26, 2017 at 11:03 AM, Stefan Richter <
>>> s.rich...@data-artisans.com> wrote:
>>>
>>>> Ok, now the question is also about what classloaders Ignite is creating
>>>> and how they are used, but the relevant code line in Flink is probably in
>>>> FlinkMiniCluster.scala, line 538 (current master):
>>>>
>>>>  try {
>>>>  JobClient.submitJobAndWait(
>>>>    clientActorSystem,
>>>>    configuration,
>>>>    leaderRetrievalService,
>>>>    jobGraph,
>>>>    timeout,
>>>>    printUpdates,
>>>>    this.getClass.getClassLoader())
>>>> } finally {
>>>>    if(!useSingleActorSystem) {
>>>>      // we have to shutdown the just created actor system
>>>>      shutdownJobClientActorSystem(clientActorSystem)
>>>>    }
>>>>  }
>>>>
>>>>
>>>> This is what is executed as part of executing a job through
>>>> LocalEnvironment. As we can see, the classloader is set to the classloader
>>>> of FlinkMiniCluster. Depending on the classloader structure inside Ignite,
>>>> this classloader might not know your user code. What you could do is
>>>> changing this line in a custom Flink build, changing line 538 for example
>>>> to Thread.currentThread().getContextClassloader() and ensuring that
>>>> the context classloader ins the runnable is a classloader that a) knows the
>>>> user code and b) is a child of the classloader that knows the Ignite and
>>>> Flink classes. Notice that this is not a general solution and should not
>>>> become a general fix.
>>>>
>>>> I have heard that Till is about to change some things about local
>>>> execution, so I included him in CC. Maybe he can provide additional hints
>>>> how your use case might be better supported in the upcoming Flink 1.3.
>>>>
>>>> Best,
>>>> Stefan
>>>>
>>>> Am 25.04.2017 um 22:50 schrieb Matt <dromitl...@gmail.com>:
>>>>
>>>> I updated the code a little bit for clarity, now the line #56 mentioned
>>>> in my previous message is line #25.
>>>>
>>>> In summary the error I'm getting is this:
>>>>
>>>> ---
>>>> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
>>>> Cannot load user class: com.test.Test
>>>> ClassLoader info: URL ClassLoader:
>>>> Class not resolvable through given classloader.
>>>> ---
>>>>
>>>> But if I'm not wrong, after trying to load the class through
>>>> URLClassLoader, Flink should try loading it with its parent ClassLoader,
>>>> which should be the same ClassLoader that executed the environment, and it
>>>> does have access to the class.
>>>>
>>>> Not sure what is wrong.
>>>>
>>>> On Tue, Apr 25, 2017 at 5:38 PM, Matt <dromitl...@gmail.com> wrote:
>>>>
>>>>> Hi Stefan,
>>>>>
>>>>> Check the code here: https://gist.github.com/
>>>>> 17d82ee7dd921a0d649574a361cc017d , the output is at the bottom of the
>>>>> page.
>>>>>
>>>>> Here are the results of the additional tests you mentioned:
>>>>>
>>>>> 1. I was able to instantiate an inner class (Test$Foo) inside the
>>>>> Ignite closure, no problem with that
>>>>> 2. I tried implementing SourceFunction and SinkFunction in Test
>>>>> itself, I was able to instantiate the class inside the Ignite closure
>>>>> 3. I'm not sure what you meant in this point, is it something like
>>>>> what I tried in line #56?
>>>>>
>>>>> Additionally, I tried implementing the SourceFunction and SinkFunction
>>>>> in Test$Foo with the same result: it says "Cannot load user class:
>>>>> com.test.Test$Foo"
>>>>>
>>>>> Looks like Flink is not using the correct ClassLoader. Any idea?
>>>>>
>>>>> Regards,
>>>>> Matt
>>>>>
>>>>> On Tue, Apr 25, 2017 at 7:00 AM, Stefan Richter <
>>>>> s.rich...@data-artisans.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I would expect that the local environment picks up the class path
>>>>>> from the code that launched it. So I think the question is what happens
>>>>>> behind the scenes when you call ignite.compute().broadcast(runnable); .
>>>>>> Which classes are shipped and how is the classpath build in the 
>>>>>> environment
>>>>>> that runs the code. Your example is also not fully conclusive, because
>>>>>> com.myproj.Test (which you can successfully instantiate) and
>>>>>> com.myproj.Test$1$2 (which fails) are different classes, so maybe only 
>>>>>> the
>>>>>> outer class is shipped with the broadcast call. My theory is that not all
>>>>>> classes are shipped (e.g. inner classes), but only Test . You could try
>>>>>> three things to analyze to problem a little more:
>>>>>>
>>>>>> 1) Create another inner class inside Test and try if you are still
>>>>>> able to instantiate also this class via reflection.
>>>>>> 2) Let Test class itself implement the map function (avoiding the
>>>>>> usage of other/inner classes) and see if this works.
>>>>>> 3) Check and set the thread’s context classloader inside the runnable
>>>>>> to something that contains all required classes and see if this gets 
>>>>>> picked
>>>>>> up by Flink.
>>>>>>
>>>>>> Best,
>>>>>> Stefan
>>>>>>
>>>>>> Am 25.04.2017 um 07:27 schrieb Matt <dromitl...@gmail.com>:
>>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I'm trying to run Flink using a local environment, but on an Ignite
>>>>>> node to achieve collocation (as mentioned in my previous message on this
>>>>>> list).
>>>>>>
>>>>>> Have a look at the code in [1]. It's pretty simple, but I'm getting a
>>>>>> "cannot load user class" error as shown in [2].
>>>>>>
>>>>>> If you check line #29 on the code, I'm able to create an instance of
>>>>>> class Test, and it's the same context from which I'm creating the Flink
>>>>>> job. Shouldn't it work provided I'm using a local environment?
>>>>>>
>>>>>> It would be really nice to be able to inject a ClassLoader into the
>>>>>> chunk of code that creates the job. Is this currently possible?
>>>>>>
>>>>>> Any fix or workaround is appreciated!
>>>>>>
>>>>>> Best,
>>>>>> Matt
>>>>>>
>>>>>> [1] https://gist.github.com/f248187b9638023b95ba8bd9d7f06215
>>>>>> [2] https://gist.github.com/796ee05425535ece1736df7b1e884cce
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to