Hi Matt, sorry for not coming back to you sooner. We're currently in the release phase and this consumes a lot of capacities.
I tried to go to the linked repo, but Github tells me that it does not exist. Have you removed it? Cheers, Till On Wed, May 17, 2017 at 10:56 PM, Matt <dromitl...@gmail.com> wrote: > Check the repo at [1]. > > The important step which I think is what you missed is running an Ignite > node on your computer so the Java code, which launches an Ignite client on > the JVM, connects to it and executes Flink on that node on a local > environment. > > Be aware "peerClassLoadingEnabled" should be enabled (as in ignite.xml), > because it must match the config on the client node. > > If you follow the Readme file it's everything there, if you have any > problem let me know! > > Cheers, > Matt > > [1] https://github.com/Dromit/FlinkTest > > On Wed, May 17, 2017 at 3:49 PM, Matt <dromitl...@gmail.com> wrote: > >> Thanks for your help Till. >> >> I will create a self contained test case in a moment and send you the >> link, wait for it. >> >> Cheers, >> Matt >> >> On Wed, May 17, 2017 at 4:38 AM, Till Rohrmann <trohrm...@apache.org> >> wrote: >> >>> Hi Matt, >>> >>> alright, then we have to look into it again. I tried to run your >>> example, however, it does not seem to be self-contained. Using Ignite 2.0.0 >>> with -DIGNITE_QUIET=false -Xms512m the Ignite object seems to be stuck in >>> Ignite#start. In the logs I see the following warning: >>> >>> May 17, 2017 9:36:22 AM org.apache.ignite.logger.java.JavaLogger warning >>> WARNING: TcpDiscoveryMulticastIpFinder has no pre-configured addresses (it >>> is recommended in production to specify at least one address in >>> TcpDiscoveryMulticastIpFinder.getAddresses() configuration property) >>> May 17, 2017 9:36:24 AM org.apache.ignite.logger.java.JavaLogger warning >>> WARNING: IP finder returned empty addresses list. Please check IP finder >>> configuration and make sure multicast works on your network. Will retry >>> every 2 secs. >>> >>> However, I assume that this is not critical. >>> >>> Maybe you can tell me how I can run your example in order to debug it. >>> >>> Cheers, >>> Till >>> >>> >>> On Mon, May 15, 2017 at 10:05 PM, Matt <dromitl...@gmail.com> wrote: >>> >>>> Hi Till, >>>> >>>> I just tried with Flink 1.4 by compiling the current master branch on >>>> GitHub (as of this morning) and I still find the same problem as before. If >>>> I'm not wrong your PR was merged already, so your fixes should be part of >>>> the binary. >>>> >>>> I hope you have time to have a look at the test case in [1]. >>>> >>>> Best, >>>> Matt >>>> >>>> [1] https://gist.github.com/17d82ee7dd921a0d649574a361cc017d >>>> >>>> On Thu, Apr 27, 2017 at 10:09 AM, Matt <dromitl...@gmail.com> wrote: >>>> >>>>> Hi Till, >>>>> >>>>> Great! Do you know if it's planned to be included in v1.2.x or should >>>>> we wait for v1.3? I'll give it a try as soon as it's merged. >>>>> >>>>> You're right about this approach launching a mini cluster on each >>>>> Ignite node. That is intentional, as described in my previous message on >>>>> the list [1]. >>>>> >>>>> The idea is to collocate Flink jobs on Ignite nodes, so each dataflow >>>>> only processes the elements stored on the local in-memory database. I get >>>>> the impression this should be much faster than randomly picking a Flink >>>>> node and sending all the data over the network. >>>>> >>>>> Any insight on this? >>>>> >>>>> Cheers, >>>>> Matt >>>>> >>>>> [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nab >>>>> ble.com/Flink-on-Ignite-Collocation-td12780.html >>>>> >>>>> On Thu, Apr 27, 2017 at 5:33 AM, Till Rohrmann <trohrm...@apache.org> >>>>> wrote: >>>>> >>>>>> I just copied my response because my other email address is not >>>>>> accepted on the user mailing list. >>>>>> >>>>>> Hi Matt, >>>>>> >>>>>> I think Stefan's analysis is correct. I have a PR open [1], where I >>>>>> fix the issue with the class loader. >>>>>> >>>>>> As a side note, by doing what you're doing, you will spawn on each >>>>>> Ignite node a new Flink mini cluster. These mini cluster won't >>>>>> communicate >>>>>> with each other and run independently. Is this what you intend to do? >>>>>> >>>>>> [1] https://github.com/apache/flink/pull/3781 >>>>>> >>>>>> Cheers, >>>>>> Till >>>>>> >>>>>> On Wed, Apr 26, 2017 at 11:12 PM, Matt <dromitl...@gmail.com> wrote: >>>>>> >>>>>>> Let's wait for Till then, I hope he can figure this out. >>>>>>> >>>>>>> On Wed, Apr 26, 2017 at 11:03 AM, Stefan Richter < >>>>>>> s.rich...@data-artisans.com> wrote: >>>>>>> >>>>>>>> Ok, now the question is also about what classloaders Ignite is >>>>>>>> creating and how they are used, but the relevant code line in Flink is >>>>>>>> probably in FlinkMiniCluster.scala, line 538 (current master): >>>>>>>> >>>>>>>> try { >>>>>>>> JobClient.submitJobAndWait( >>>>>>>> clientActorSystem, >>>>>>>> configuration, >>>>>>>> leaderRetrievalService, >>>>>>>> jobGraph, >>>>>>>> timeout, >>>>>>>> printUpdates, >>>>>>>> this.getClass.getClassLoader()) >>>>>>>> } finally { >>>>>>>> if(!useSingleActorSystem) { >>>>>>>> // we have to shutdown the just created actor system >>>>>>>> shutdownJobClientActorSystem(clientActorSystem) >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> This is what is executed as part of executing a job through >>>>>>>> LocalEnvironment. As we can see, the classloader is set to the >>>>>>>> classloader >>>>>>>> of FlinkMiniCluster. Depending on the classloader structure inside >>>>>>>> Ignite, >>>>>>>> this classloader might not know your user code. What you could do is >>>>>>>> changing this line in a custom Flink build, changing line 538 for >>>>>>>> example >>>>>>>> to Thread.currentThread().getContextClassloader() and ensuring >>>>>>>> that the context classloader ins the runnable is a classloader that a) >>>>>>>> knows the user code and b) is a child of the classloader that knows the >>>>>>>> Ignite and Flink classes. Notice that this is not a general solution >>>>>>>> and >>>>>>>> should not become a general fix. >>>>>>>> >>>>>>>> I have heard that Till is about to change some things about local >>>>>>>> execution, so I included him in CC. Maybe he can provide additional >>>>>>>> hints >>>>>>>> how your use case might be better supported in the upcoming Flink 1.3. >>>>>>>> >>>>>>>> Best, >>>>>>>> Stefan >>>>>>>> >>>>>>>> Am 25.04.2017 um 22:50 schrieb Matt <dromitl...@gmail.com>: >>>>>>>> >>>>>>>> I updated the code a little bit for clarity, now the line #56 >>>>>>>> mentioned in my previous message is line #25. >>>>>>>> >>>>>>>> In summary the error I'm getting is this: >>>>>>>> >>>>>>>> --- >>>>>>>> Caused by: org.apache.flink.streaming.run >>>>>>>> time.tasks.StreamTaskException: Cannot load user class: >>>>>>>> com.test.Test >>>>>>>> ClassLoader info: URL ClassLoader: >>>>>>>> Class not resolvable through given classloader. >>>>>>>> --- >>>>>>>> >>>>>>>> But if I'm not wrong, after trying to load the class through >>>>>>>> URLClassLoader, Flink should try loading it with its parent >>>>>>>> ClassLoader, >>>>>>>> which should be the same ClassLoader that executed the environment, >>>>>>>> and it >>>>>>>> does have access to the class. >>>>>>>> >>>>>>>> Not sure what is wrong. >>>>>>>> >>>>>>>> On Tue, Apr 25, 2017 at 5:38 PM, Matt <dromitl...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi Stefan, >>>>>>>>> >>>>>>>>> Check the code here: https://gist.github.com/ >>>>>>>>> 17d82ee7dd921a0d649574a361cc017d , the output is at the bottom of >>>>>>>>> the page. >>>>>>>>> >>>>>>>>> Here are the results of the additional tests you mentioned: >>>>>>>>> >>>>>>>>> 1. I was able to instantiate an inner class (Test$Foo) inside the >>>>>>>>> Ignite closure, no problem with that >>>>>>>>> 2. I tried implementing SourceFunction and SinkFunction in Test >>>>>>>>> itself, I was able to instantiate the class inside the Ignite closure >>>>>>>>> 3. I'm not sure what you meant in this point, is it something like >>>>>>>>> what I tried in line #56? >>>>>>>>> >>>>>>>>> Additionally, I tried implementing the SourceFunction and >>>>>>>>> SinkFunction in Test$Foo with the same result: it says "Cannot load >>>>>>>>> user >>>>>>>>> class: com.test.Test$Foo" >>>>>>>>> >>>>>>>>> Looks like Flink is not using the correct ClassLoader. Any idea? >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> Matt >>>>>>>>> >>>>>>>>> On Tue, Apr 25, 2017 at 7:00 AM, Stefan Richter < >>>>>>>>> s.rich...@data-artisans.com> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> I would expect that the local environment picks up the class path >>>>>>>>>> from the code that launched it. So I think the question is what >>>>>>>>>> happens >>>>>>>>>> behind the scenes when you call ignite.compute().broadcas >>>>>>>>>> t(runnable); . Which classes are shipped and how is the >>>>>>>>>> classpath build in the environment that runs the code. Your example >>>>>>>>>> is also >>>>>>>>>> not fully conclusive, because com.myproj.Test (which you can >>>>>>>>>> successfully >>>>>>>>>> instantiate) and com.myproj.Test$1$2 (which fails) are different >>>>>>>>>> classes, >>>>>>>>>> so maybe only the outer class is shipped with the broadcast call. My >>>>>>>>>> theory >>>>>>>>>> is that not all classes are shipped (e.g. inner classes), but only >>>>>>>>>> Test . >>>>>>>>>> You could try three things to analyze to problem a little more: >>>>>>>>>> >>>>>>>>>> 1) Create another inner class inside Test and try if you are >>>>>>>>>> still able to instantiate also this class via reflection. >>>>>>>>>> 2) Let Test class itself implement the map function (avoiding the >>>>>>>>>> usage of other/inner classes) and see if this works. >>>>>>>>>> 3) Check and set the thread’s context classloader inside the >>>>>>>>>> runnable to something that contains all required classes and see if >>>>>>>>>> this >>>>>>>>>> gets picked up by Flink. >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Stefan >>>>>>>>>> >>>>>>>>>> Am 25.04.2017 um 07:27 schrieb Matt <dromitl...@gmail.com>: >>>>>>>>>> >>>>>>>>>> Hi all, >>>>>>>>>> >>>>>>>>>> I'm trying to run Flink using a local environment, but on an >>>>>>>>>> Ignite node to achieve collocation (as mentioned in my previous >>>>>>>>>> message on >>>>>>>>>> this list). >>>>>>>>>> >>>>>>>>>> Have a look at the code in [1]. It's pretty simple, but I'm >>>>>>>>>> getting a "cannot load user class" error as shown in [2]. >>>>>>>>>> >>>>>>>>>> If you check line #29 on the code, I'm able to create an instance >>>>>>>>>> of class Test, and it's the same context from which I'm creating the >>>>>>>>>> Flink >>>>>>>>>> job. Shouldn't it work provided I'm using a local environment? >>>>>>>>>> >>>>>>>>>> It would be really nice to be able to inject a ClassLoader into >>>>>>>>>> the chunk of code that creates the job. Is this currently possible? >>>>>>>>>> >>>>>>>>>> Any fix or workaround is appreciated! >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Matt >>>>>>>>>> >>>>>>>>>> [1] https://gist.github.com/f248187b9638023b95ba8bd9d7f06215 >>>>>>>>>> [2] https://gist.github.com/796ee05425535ece1736df7b1e884cce >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >