Thanks for your help Till. I will create a self contained test case in a moment and send you the link, wait for it.
Cheers, Matt On Wed, May 17, 2017 at 4:38 AM, Till Rohrmann <trohrm...@apache.org> wrote: > Hi Matt, > > alright, then we have to look into it again. I tried to run your example, > however, it does not seem to be self-contained. Using Ignite 2.0.0 with > -DIGNITE_QUIET=false -Xms512m the Ignite object seems to be stuck in > Ignite#start. In the logs I see the following warning: > > May 17, 2017 9:36:22 AM org.apache.ignite.logger.java.JavaLogger warning > WARNING: TcpDiscoveryMulticastIpFinder has no pre-configured addresses (it is > recommended in production to specify at least one address in > TcpDiscoveryMulticastIpFinder.getAddresses() configuration property) > May 17, 2017 9:36:24 AM org.apache.ignite.logger.java.JavaLogger warning > WARNING: IP finder returned empty addresses list. Please check IP finder > configuration and make sure multicast works on your network. Will retry every > 2 secs. > > However, I assume that this is not critical. > > Maybe you can tell me how I can run your example in order to debug it. > > Cheers, > Till > > > On Mon, May 15, 2017 at 10:05 PM, Matt <dromitl...@gmail.com> wrote: > >> Hi Till, >> >> I just tried with Flink 1.4 by compiling the current master branch on >> GitHub (as of this morning) and I still find the same problem as before. If >> I'm not wrong your PR was merged already, so your fixes should be part of >> the binary. >> >> I hope you have time to have a look at the test case in [1]. >> >> Best, >> Matt >> >> [1] https://gist.github.com/17d82ee7dd921a0d649574a361cc017d >> >> On Thu, Apr 27, 2017 at 10:09 AM, Matt <dromitl...@gmail.com> wrote: >> >>> Hi Till, >>> >>> Great! Do you know if it's planned to be included in v1.2.x or should we >>> wait for v1.3? I'll give it a try as soon as it's merged. >>> >>> You're right about this approach launching a mini cluster on each Ignite >>> node. That is intentional, as described in my previous message on the list >>> [1]. >>> >>> The idea is to collocate Flink jobs on Ignite nodes, so each dataflow >>> only processes the elements stored on the local in-memory database. I get >>> the impression this should be much faster than randomly picking a Flink >>> node and sending all the data over the network. >>> >>> Any insight on this? >>> >>> Cheers, >>> Matt >>> >>> [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nab >>> ble.com/Flink-on-Ignite-Collocation-td12780.html >>> >>> On Thu, Apr 27, 2017 at 5:33 AM, Till Rohrmann <trohrm...@apache.org> >>> wrote: >>> >>>> I just copied my response because my other email address is not >>>> accepted on the user mailing list. >>>> >>>> Hi Matt, >>>> >>>> I think Stefan's analysis is correct. I have a PR open [1], where I fix >>>> the issue with the class loader. >>>> >>>> As a side note, by doing what you're doing, you will spawn on each >>>> Ignite node a new Flink mini cluster. These mini cluster won't communicate >>>> with each other and run independently. Is this what you intend to do? >>>> >>>> [1] https://github.com/apache/flink/pull/3781 >>>> >>>> Cheers, >>>> Till >>>> >>>> On Wed, Apr 26, 2017 at 11:12 PM, Matt <dromitl...@gmail.com> wrote: >>>> >>>>> Let's wait for Till then, I hope he can figure this out. >>>>> >>>>> On Wed, Apr 26, 2017 at 11:03 AM, Stefan Richter < >>>>> s.rich...@data-artisans.com> wrote: >>>>> >>>>>> Ok, now the question is also about what classloaders Ignite is >>>>>> creating and how they are used, but the relevant code line in Flink is >>>>>> probably in FlinkMiniCluster.scala, line 538 (current master): >>>>>> >>>>>> try { >>>>>> JobClient.submitJobAndWait( >>>>>> clientActorSystem, >>>>>> configuration, >>>>>> leaderRetrievalService, >>>>>> jobGraph, >>>>>> timeout, >>>>>> printUpdates, >>>>>> this.getClass.getClassLoader()) >>>>>> } finally { >>>>>> if(!useSingleActorSystem) { >>>>>> // we have to shutdown the just created actor system >>>>>> shutdownJobClientActorSystem(clientActorSystem) >>>>>> } >>>>>> } >>>>>> >>>>>> >>>>>> This is what is executed as part of executing a job through >>>>>> LocalEnvironment. As we can see, the classloader is set to the >>>>>> classloader >>>>>> of FlinkMiniCluster. Depending on the classloader structure inside >>>>>> Ignite, >>>>>> this classloader might not know your user code. What you could do is >>>>>> changing this line in a custom Flink build, changing line 538 for example >>>>>> to Thread.currentThread().getContextClassloader() and ensuring that >>>>>> the context classloader ins the runnable is a classloader that a) knows >>>>>> the >>>>>> user code and b) is a child of the classloader that knows the Ignite and >>>>>> Flink classes. Notice that this is not a general solution and should not >>>>>> become a general fix. >>>>>> >>>>>> I have heard that Till is about to change some things about local >>>>>> execution, so I included him in CC. Maybe he can provide additional hints >>>>>> how your use case might be better supported in the upcoming Flink 1.3. >>>>>> >>>>>> Best, >>>>>> Stefan >>>>>> >>>>>> Am 25.04.2017 um 22:50 schrieb Matt <dromitl...@gmail.com>: >>>>>> >>>>>> I updated the code a little bit for clarity, now the line #56 >>>>>> mentioned in my previous message is line #25. >>>>>> >>>>>> In summary the error I'm getting is this: >>>>>> >>>>>> --- >>>>>> Caused by: org.apache.flink.streaming.run >>>>>> time.tasks.StreamTaskException: Cannot load user class: com.test.Test >>>>>> ClassLoader info: URL ClassLoader: >>>>>> Class not resolvable through given classloader. >>>>>> --- >>>>>> >>>>>> But if I'm not wrong, after trying to load the class through >>>>>> URLClassLoader, Flink should try loading it with its parent ClassLoader, >>>>>> which should be the same ClassLoader that executed the environment, and >>>>>> it >>>>>> does have access to the class. >>>>>> >>>>>> Not sure what is wrong. >>>>>> >>>>>> On Tue, Apr 25, 2017 at 5:38 PM, Matt <dromitl...@gmail.com> wrote: >>>>>> >>>>>>> Hi Stefan, >>>>>>> >>>>>>> Check the code here: https://gist.github.com/ >>>>>>> 17d82ee7dd921a0d649574a361cc017d , the output is at the bottom of >>>>>>> the page. >>>>>>> >>>>>>> Here are the results of the additional tests you mentioned: >>>>>>> >>>>>>> 1. I was able to instantiate an inner class (Test$Foo) inside the >>>>>>> Ignite closure, no problem with that >>>>>>> 2. I tried implementing SourceFunction and SinkFunction in Test >>>>>>> itself, I was able to instantiate the class inside the Ignite closure >>>>>>> 3. I'm not sure what you meant in this point, is it something like >>>>>>> what I tried in line #56? >>>>>>> >>>>>>> Additionally, I tried implementing the SourceFunction and >>>>>>> SinkFunction in Test$Foo with the same result: it says "Cannot load user >>>>>>> class: com.test.Test$Foo" >>>>>>> >>>>>>> Looks like Flink is not using the correct ClassLoader. Any idea? >>>>>>> >>>>>>> Regards, >>>>>>> Matt >>>>>>> >>>>>>> On Tue, Apr 25, 2017 at 7:00 AM, Stefan Richter < >>>>>>> s.rich...@data-artisans.com> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> I would expect that the local environment picks up the class path >>>>>>>> from the code that launched it. So I think the question is what happens >>>>>>>> behind the scenes when you call ignite.compute().broadcas >>>>>>>> t(runnable); . Which classes are shipped and how is the classpath >>>>>>>> build in the environment that runs the code. Your example is also not >>>>>>>> fully >>>>>>>> conclusive, because com.myproj.Test (which you can successfully >>>>>>>> instantiate) and com.myproj.Test$1$2 (which fails) are different >>>>>>>> classes, >>>>>>>> so maybe only the outer class is shipped with the broadcast call. My >>>>>>>> theory >>>>>>>> is that not all classes are shipped (e.g. inner classes), but only >>>>>>>> Test . >>>>>>>> You could try three things to analyze to problem a little more: >>>>>>>> >>>>>>>> 1) Create another inner class inside Test and try if you are still >>>>>>>> able to instantiate also this class via reflection. >>>>>>>> 2) Let Test class itself implement the map function (avoiding the >>>>>>>> usage of other/inner classes) and see if this works. >>>>>>>> 3) Check and set the thread’s context classloader inside the >>>>>>>> runnable to something that contains all required classes and see if >>>>>>>> this >>>>>>>> gets picked up by Flink. >>>>>>>> >>>>>>>> Best, >>>>>>>> Stefan >>>>>>>> >>>>>>>> Am 25.04.2017 um 07:27 schrieb Matt <dromitl...@gmail.com>: >>>>>>>> >>>>>>>> Hi all, >>>>>>>> >>>>>>>> I'm trying to run Flink using a local environment, but on an Ignite >>>>>>>> node to achieve collocation (as mentioned in my previous message on >>>>>>>> this >>>>>>>> list). >>>>>>>> >>>>>>>> Have a look at the code in [1]. It's pretty simple, but I'm getting >>>>>>>> a "cannot load user class" error as shown in [2]. >>>>>>>> >>>>>>>> If you check line #29 on the code, I'm able to create an instance >>>>>>>> of class Test, and it's the same context from which I'm creating the >>>>>>>> Flink >>>>>>>> job. Shouldn't it work provided I'm using a local environment? >>>>>>>> >>>>>>>> It would be really nice to be able to inject a ClassLoader into the >>>>>>>> chunk of code that creates the job. Is this currently possible? >>>>>>>> >>>>>>>> Any fix or workaround is appreciated! >>>>>>>> >>>>>>>> Best, >>>>>>>> Matt >>>>>>>> >>>>>>>> [1] https://gist.github.com/f248187b9638023b95ba8bd9d7f06215 >>>>>>>> [2] https://gist.github.com/796ee05425535ece1736df7b1e884cce >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >