Hi Till, I just tried with Flink 1.4 by compiling the current master branch on GitHub (as of this morning) and I still find the same problem as before. If I'm not wrong your PR was merged already, so your fixes should be part of the binary.
I hope you have time to have a look at the test case in [1]. Best, Matt [1] https://gist.github.com/17d82ee7dd921a0d649574a361cc017d On Thu, Apr 27, 2017 at 10:09 AM, Matt <dromitl...@gmail.com> wrote: > Hi Till, > > Great! Do you know if it's planned to be included in v1.2.x or should we > wait for v1.3? I'll give it a try as soon as it's merged. > > You're right about this approach launching a mini cluster on each Ignite > node. That is intentional, as described in my previous message on the list > [1]. > > The idea is to collocate Flink jobs on Ignite nodes, so each dataflow only > processes the elements stored on the local in-memory database. I get the > impression this should be much faster than randomly picking a Flink node > and sending all the data over the network. > > Any insight on this? > > Cheers, > Matt > > [1] http://apache-flink-user-mailing-list-archive.2336050. > n4.nabble.com/Flink-on-Ignite-Collocation-td12780.html > > On Thu, Apr 27, 2017 at 5:33 AM, Till Rohrmann <trohrm...@apache.org> > wrote: > >> I just copied my response because my other email address is not accepted >> on the user mailing list. >> >> Hi Matt, >> >> I think Stefan's analysis is correct. I have a PR open [1], where I fix >> the issue with the class loader. >> >> As a side note, by doing what you're doing, you will spawn on each Ignite >> node a new Flink mini cluster. These mini cluster won't communicate with >> each other and run independently. Is this what you intend to do? >> >> [1] https://github.com/apache/flink/pull/3781 >> >> Cheers, >> Till >> >> On Wed, Apr 26, 2017 at 11:12 PM, Matt <dromitl...@gmail.com> wrote: >> >>> Let's wait for Till then, I hope he can figure this out. >>> >>> On Wed, Apr 26, 2017 at 11:03 AM, Stefan Richter < >>> s.rich...@data-artisans.com> wrote: >>> >>>> Ok, now the question is also about what classloaders Ignite is creating >>>> and how they are used, but the relevant code line in Flink is probably in >>>> FlinkMiniCluster.scala, line 538 (current master): >>>> >>>> try { >>>> JobClient.submitJobAndWait( >>>> clientActorSystem, >>>> configuration, >>>> leaderRetrievalService, >>>> jobGraph, >>>> timeout, >>>> printUpdates, >>>> this.getClass.getClassLoader()) >>>> } finally { >>>> if(!useSingleActorSystem) { >>>> // we have to shutdown the just created actor system >>>> shutdownJobClientActorSystem(clientActorSystem) >>>> } >>>> } >>>> >>>> >>>> This is what is executed as part of executing a job through >>>> LocalEnvironment. As we can see, the classloader is set to the classloader >>>> of FlinkMiniCluster. Depending on the classloader structure inside Ignite, >>>> this classloader might not know your user code. What you could do is >>>> changing this line in a custom Flink build, changing line 538 for example >>>> to Thread.currentThread().getContextClassloader() and ensuring that >>>> the context classloader ins the runnable is a classloader that a) knows the >>>> user code and b) is a child of the classloader that knows the Ignite and >>>> Flink classes. Notice that this is not a general solution and should not >>>> become a general fix. >>>> >>>> I have heard that Till is about to change some things about local >>>> execution, so I included him in CC. Maybe he can provide additional hints >>>> how your use case might be better supported in the upcoming Flink 1.3. >>>> >>>> Best, >>>> Stefan >>>> >>>> Am 25.04.2017 um 22:50 schrieb Matt <dromitl...@gmail.com>: >>>> >>>> I updated the code a little bit for clarity, now the line #56 mentioned >>>> in my previous message is line #25. >>>> >>>> In summary the error I'm getting is this: >>>> >>>> --- >>>> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: >>>> Cannot load user class: com.test.Test >>>> ClassLoader info: URL ClassLoader: >>>> Class not resolvable through given classloader. >>>> --- >>>> >>>> But if I'm not wrong, after trying to load the class through >>>> URLClassLoader, Flink should try loading it with its parent ClassLoader, >>>> which should be the same ClassLoader that executed the environment, and it >>>> does have access to the class. >>>> >>>> Not sure what is wrong. >>>> >>>> On Tue, Apr 25, 2017 at 5:38 PM, Matt <dromitl...@gmail.com> wrote: >>>> >>>>> Hi Stefan, >>>>> >>>>> Check the code here: https://gist.github.com/ >>>>> 17d82ee7dd921a0d649574a361cc017d , the output is at the bottom of the >>>>> page. >>>>> >>>>> Here are the results of the additional tests you mentioned: >>>>> >>>>> 1. I was able to instantiate an inner class (Test$Foo) inside the >>>>> Ignite closure, no problem with that >>>>> 2. I tried implementing SourceFunction and SinkFunction in Test >>>>> itself, I was able to instantiate the class inside the Ignite closure >>>>> 3. I'm not sure what you meant in this point, is it something like >>>>> what I tried in line #56? >>>>> >>>>> Additionally, I tried implementing the SourceFunction and SinkFunction >>>>> in Test$Foo with the same result: it says "Cannot load user class: >>>>> com.test.Test$Foo" >>>>> >>>>> Looks like Flink is not using the correct ClassLoader. Any idea? >>>>> >>>>> Regards, >>>>> Matt >>>>> >>>>> On Tue, Apr 25, 2017 at 7:00 AM, Stefan Richter < >>>>> s.rich...@data-artisans.com> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I would expect that the local environment picks up the class path >>>>>> from the code that launched it. So I think the question is what happens >>>>>> behind the scenes when you call ignite.compute().broadcast(runnable); . >>>>>> Which classes are shipped and how is the classpath build in the >>>>>> environment >>>>>> that runs the code. Your example is also not fully conclusive, because >>>>>> com.myproj.Test (which you can successfully instantiate) and >>>>>> com.myproj.Test$1$2 (which fails) are different classes, so maybe only >>>>>> the >>>>>> outer class is shipped with the broadcast call. My theory is that not all >>>>>> classes are shipped (e.g. inner classes), but only Test . You could try >>>>>> three things to analyze to problem a little more: >>>>>> >>>>>> 1) Create another inner class inside Test and try if you are still >>>>>> able to instantiate also this class via reflection. >>>>>> 2) Let Test class itself implement the map function (avoiding the >>>>>> usage of other/inner classes) and see if this works. >>>>>> 3) Check and set the thread’s context classloader inside the runnable >>>>>> to something that contains all required classes and see if this gets >>>>>> picked >>>>>> up by Flink. >>>>>> >>>>>> Best, >>>>>> Stefan >>>>>> >>>>>> Am 25.04.2017 um 07:27 schrieb Matt <dromitl...@gmail.com>: >>>>>> >>>>>> Hi all, >>>>>> >>>>>> I'm trying to run Flink using a local environment, but on an Ignite >>>>>> node to achieve collocation (as mentioned in my previous message on this >>>>>> list). >>>>>> >>>>>> Have a look at the code in [1]. It's pretty simple, but I'm getting a >>>>>> "cannot load user class" error as shown in [2]. >>>>>> >>>>>> If you check line #29 on the code, I'm able to create an instance of >>>>>> class Test, and it's the same context from which I'm creating the Flink >>>>>> job. Shouldn't it work provided I'm using a local environment? >>>>>> >>>>>> It would be really nice to be able to inject a ClassLoader into the >>>>>> chunk of code that creates the job. Is this currently possible? >>>>>> >>>>>> Any fix or workaround is appreciated! >>>>>> >>>>>> Best, >>>>>> Matt >>>>>> >>>>>> [1] https://gist.github.com/f248187b9638023b95ba8bd9d7f06215 >>>>>> [2] https://gist.github.com/796ee05425535ece1736df7b1e884cce >>>>>> >>>>>> >>>>>> >>>>> >>>> >>>> >>> >> >