Thanks for your help Till.

I will create a self contained test case in a moment and send you the link,
wait for it.

Cheers,
Matt

On Wed, May 17, 2017 at 4:38 AM, Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Matt,
>
> alright, then we have to look into it again. I tried to run your example,
> however, it does not seem to be self-contained. Using Ignite 2.0.0 with
> -DIGNITE_QUIET=false -Xms512m the Ignite object seems to be stuck in
> Ignite#start. In the logs I see the following warning:
>
> May 17, 2017 9:36:22 AM org.apache.ignite.logger.java.JavaLogger warning
> WARNING: TcpDiscoveryMulticastIpFinder has no pre-configured addresses (it is 
> recommended in production to specify at least one address in 
> TcpDiscoveryMulticastIpFinder.getAddresses() configuration property)
> May 17, 2017 9:36:24 AM org.apache.ignite.logger.java.JavaLogger warning
> WARNING: IP finder returned empty addresses list. Please check IP finder 
> configuration and make sure multicast works on your network. Will retry every 
> 2 secs.
>
> However, I assume that this is not critical.
>
> Maybe you can tell me how I can run your example in order to debug it.
>
> Cheers,
> Till
> ​
>
> On Mon, May 15, 2017 at 10:05 PM, Matt <dromitl...@gmail.com> wrote:
>
>> Hi Till,
>>
>> I just tried with Flink 1.4 by compiling the current master branch on
>> GitHub (as of this morning) and I still find the same problem as before. If
>> I'm not wrong your PR was merged already, so your fixes should be part of
>> the binary.
>>
>> I hope you have time to have a look at the test case in [1].
>>
>> Best,
>> Matt
>>
>> [1] https://gist.github.com/17d82ee7dd921a0d649574a361cc017d
>>
>> On Thu, Apr 27, 2017 at 10:09 AM, Matt <dromitl...@gmail.com> wrote:
>>
>>> Hi Till,
>>>
>>> Great! Do you know if it's planned to be included in v1.2.x or should we
>>> wait for v1.3? I'll give it a try as soon as it's merged.
>>>
>>> You're right about this approach launching a mini cluster on each Ignite
>>> node. That is intentional, as described in my previous message on the list
>>> [1].
>>>
>>> The idea is to collocate Flink jobs on Ignite nodes, so each dataflow
>>> only processes the elements stored on the local in-memory database. I get
>>> the impression this should be much faster than randomly picking a Flink
>>> node and sending all the data over the network.
>>>
>>> Any insight on this?
>>>
>>> Cheers,
>>> Matt
>>>
>>> [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>> ble.com/Flink-on-Ignite-Collocation-td12780.html
>>>
>>> On Thu, Apr 27, 2017 at 5:33 AM, Till Rohrmann <trohrm...@apache.org>
>>> wrote:
>>>
>>>> I just copied my response because my other email address is not
>>>> accepted on the user mailing list.
>>>>
>>>> Hi Matt,
>>>>
>>>> I think Stefan's analysis is correct. I have a PR open [1], where I fix
>>>> the issue with the class loader.
>>>>
>>>> As a side note, by doing what you're doing, you will spawn on each
>>>> Ignite node a new Flink mini cluster. These mini cluster won't communicate
>>>> with each other and run independently. Is this what you intend to do?
>>>>
>>>> [1] https://github.com/apache/flink/pull/3781
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Wed, Apr 26, 2017 at 11:12 PM, Matt <dromitl...@gmail.com> wrote:
>>>>
>>>>> Let's wait for Till then, I hope he can figure this out.
>>>>>
>>>>> On Wed, Apr 26, 2017 at 11:03 AM, Stefan Richter <
>>>>> s.rich...@data-artisans.com> wrote:
>>>>>
>>>>>> Ok, now the question is also about what classloaders Ignite is
>>>>>> creating and how they are used, but the relevant code line in Flink is
>>>>>> probably in FlinkMiniCluster.scala, line 538 (current master):
>>>>>>
>>>>>>  try {
>>>>>>  JobClient.submitJobAndWait(
>>>>>>    clientActorSystem,
>>>>>>    configuration,
>>>>>>    leaderRetrievalService,
>>>>>>    jobGraph,
>>>>>>    timeout,
>>>>>>    printUpdates,
>>>>>>    this.getClass.getClassLoader())
>>>>>> } finally {
>>>>>>    if(!useSingleActorSystem) {
>>>>>>      // we have to shutdown the just created actor system
>>>>>>      shutdownJobClientActorSystem(clientActorSystem)
>>>>>>    }
>>>>>>  }
>>>>>>
>>>>>>
>>>>>> This is what is executed as part of executing a job through
>>>>>> LocalEnvironment. As we can see, the classloader is set to the 
>>>>>> classloader
>>>>>> of FlinkMiniCluster. Depending on the classloader structure inside 
>>>>>> Ignite,
>>>>>> this classloader might not know your user code. What you could do is
>>>>>> changing this line in a custom Flink build, changing line 538 for example
>>>>>> to Thread.currentThread().getContextClassloader() and ensuring that
>>>>>> the context classloader ins the runnable is a classloader that a) knows 
>>>>>> the
>>>>>> user code and b) is a child of the classloader that knows the Ignite and
>>>>>> Flink classes. Notice that this is not a general solution and should not
>>>>>> become a general fix.
>>>>>>
>>>>>> I have heard that Till is about to change some things about local
>>>>>> execution, so I included him in CC. Maybe he can provide additional hints
>>>>>> how your use case might be better supported in the upcoming Flink 1.3.
>>>>>>
>>>>>> Best,
>>>>>> Stefan
>>>>>>
>>>>>> Am 25.04.2017 um 22:50 schrieb Matt <dromitl...@gmail.com>:
>>>>>>
>>>>>> I updated the code a little bit for clarity, now the line #56
>>>>>> mentioned in my previous message is line #25.
>>>>>>
>>>>>> In summary the error I'm getting is this:
>>>>>>
>>>>>> ---
>>>>>> Caused by: org.apache.flink.streaming.run
>>>>>> time.tasks.StreamTaskException: Cannot load user class: com.test.Test
>>>>>> ClassLoader info: URL ClassLoader:
>>>>>> Class not resolvable through given classloader.
>>>>>> ---
>>>>>>
>>>>>> But if I'm not wrong, after trying to load the class through
>>>>>> URLClassLoader, Flink should try loading it with its parent ClassLoader,
>>>>>> which should be the same ClassLoader that executed the environment, and 
>>>>>> it
>>>>>> does have access to the class.
>>>>>>
>>>>>> Not sure what is wrong.
>>>>>>
>>>>>> On Tue, Apr 25, 2017 at 5:38 PM, Matt <dromitl...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Stefan,
>>>>>>>
>>>>>>> Check the code here: https://gist.github.com/
>>>>>>> 17d82ee7dd921a0d649574a361cc017d , the output is at the bottom of
>>>>>>> the page.
>>>>>>>
>>>>>>> Here are the results of the additional tests you mentioned:
>>>>>>>
>>>>>>> 1. I was able to instantiate an inner class (Test$Foo) inside the
>>>>>>> Ignite closure, no problem with that
>>>>>>> 2. I tried implementing SourceFunction and SinkFunction in Test
>>>>>>> itself, I was able to instantiate the class inside the Ignite closure
>>>>>>> 3. I'm not sure what you meant in this point, is it something like
>>>>>>> what I tried in line #56?
>>>>>>>
>>>>>>> Additionally, I tried implementing the SourceFunction and
>>>>>>> SinkFunction in Test$Foo with the same result: it says "Cannot load user
>>>>>>> class: com.test.Test$Foo"
>>>>>>>
>>>>>>> Looks like Flink is not using the correct ClassLoader. Any idea?
>>>>>>>
>>>>>>> Regards,
>>>>>>> Matt
>>>>>>>
>>>>>>> On Tue, Apr 25, 2017 at 7:00 AM, Stefan Richter <
>>>>>>> s.rich...@data-artisans.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I would expect that the local environment picks up the class path
>>>>>>>> from the code that launched it. So I think the question is what happens
>>>>>>>> behind the scenes when you call ignite.compute().broadcas
>>>>>>>> t(runnable); . Which classes are shipped and how is the classpath
>>>>>>>> build in the environment that runs the code. Your example is also not 
>>>>>>>> fully
>>>>>>>> conclusive, because com.myproj.Test (which you can successfully
>>>>>>>> instantiate) and com.myproj.Test$1$2 (which fails) are different 
>>>>>>>> classes,
>>>>>>>> so maybe only the outer class is shipped with the broadcast call. My 
>>>>>>>> theory
>>>>>>>> is that not all classes are shipped (e.g. inner classes), but only 
>>>>>>>> Test .
>>>>>>>> You could try three things to analyze to problem a little more:
>>>>>>>>
>>>>>>>> 1) Create another inner class inside Test and try if you are still
>>>>>>>> able to instantiate also this class via reflection.
>>>>>>>> 2) Let Test class itself implement the map function (avoiding the
>>>>>>>> usage of other/inner classes) and see if this works.
>>>>>>>> 3) Check and set the thread’s context classloader inside the
>>>>>>>> runnable to something that contains all required classes and see if 
>>>>>>>> this
>>>>>>>> gets picked up by Flink.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Stefan
>>>>>>>>
>>>>>>>> Am 25.04.2017 um 07:27 schrieb Matt <dromitl...@gmail.com>:
>>>>>>>>
>>>>>>>> Hi all,
>>>>>>>>
>>>>>>>> I'm trying to run Flink using a local environment, but on an Ignite
>>>>>>>> node to achieve collocation (as mentioned in my previous message on 
>>>>>>>> this
>>>>>>>> list).
>>>>>>>>
>>>>>>>> Have a look at the code in [1]. It's pretty simple, but I'm getting
>>>>>>>> a "cannot load user class" error as shown in [2].
>>>>>>>>
>>>>>>>> If you check line #29 on the code, I'm able to create an instance
>>>>>>>> of class Test, and it's the same context from which I'm creating the 
>>>>>>>> Flink
>>>>>>>> job. Shouldn't it work provided I'm using a local environment?
>>>>>>>>
>>>>>>>> It would be really nice to be able to inject a ClassLoader into the
>>>>>>>> chunk of code that creates the job. Is this currently possible?
>>>>>>>>
>>>>>>>> Any fix or workaround is appreciated!
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Matt
>>>>>>>>
>>>>>>>> [1] https://gist.github.com/f248187b9638023b95ba8bd9d7f06215
>>>>>>>> [2] https://gist.github.com/796ee05425535ece1736df7b1e884cce
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to