Sounds great Matt :-) On Tue, May 30, 2017 at 12:17 AM, Matt <dromitl...@gmail.com> wrote:
> Thanks for looking into it Till! > > I'll try changing that line locally and then send a JIRA issue. When it > gets officially fixed I'll probably create an Ignite-Flink connector to > replace the older and less efficient one [1]. Users will be able to create > Flink jobs on Ignite nodes, right where the data is stored. > > [1] https://apacheignite-mix.readme.io/docs/flink-streamer > > Best, > Matt > > > > Matt > > On Mon, May 29, 2017 at 9:37 AM, Till Rohrmann <trohrm...@apache.org> > wrote: > >> Hi Matt, >> >> I looked into it and it seems that the Task does not respect the context >> class loader. The problem is that the local mode was not developed with the >> intention to be executed within something like Ignite or an application >> server. It rather assumes that you have a user code jar which is sent to >> the TaskManager. This jar is then added to an URLClassLoader which is used >> for user code class loading. In the case of the local execution mode, Flink >> assumes that all user code jars are in the system class loader (which >> usually holds true when running examples from the IDE). That is the reason >> why we don’t check the TCCL. In order to fix your problem you can replace >> BlobLibraryCacheManager.java:298 with this.classLoader = new >> FlinkUserCodeClassLoader(libraryURLs, Thread.currentThread().getCont >> extClassLoader());. Alternatively, you can build your job, copy the user >> code jar to IGNITE_HOME/libs and then restart ignite. >> >> If you want to get the TCCL problem properly fixed, I suggest to open a >> JIRA issue here [1]. >> >> [1] https://issues.apache.org/jira/browse/FLINK/?selectedTab= >> com.atlassian.jira.jira-projects-plugin:summary-panel >> >> Cheers, >> Till >> >> >> On Mon, May 29, 2017 at 12:02 PM, Matt <dromitl...@gmail.com> wrote: >> >>> Hi Till, >>> >>> Have you found anything or are you still busy with the release? I have >>> no idea what may be wrong, but let me know if I can help you in any way to >>> find what may be going on. >>> >>> Best, >>> Matt >>> >>> On Wed, May 24, 2017 at 5:37 AM, Till Rohrmann <trohrm...@apache.org> >>> wrote: >>> >>>> Hi Matt, >>>> >>>> sorry for not coming back to you sooner. We're currently in the release >>>> phase and this consumes a lot of capacities. >>>> >>>> I tried to go to the linked repo, but Github tells me that it does not >>>> exist. Have you removed it? >>>> >>>> Cheers, >>>> Till >>>> >>>> On Wed, May 17, 2017 at 10:56 PM, Matt <dromitl...@gmail.com> wrote: >>>> >>>>> Check the repo at [1]. >>>>> >>>>> The important step which I think is what you missed is running an >>>>> Ignite node on your computer so the Java code, which launches an Ignite >>>>> client on the JVM, connects to it and executes Flink on that node on a >>>>> local environment. >>>>> >>>>> Be aware "peerClassLoadingEnabled" should be enabled (as in >>>>> ignite.xml), because it must match the config on the client node. >>>>> >>>>> If you follow the Readme file it's everything there, if you have any >>>>> problem let me know! >>>>> >>>>> Cheers, >>>>> Matt >>>>> >>>>> [1] https://github.com/Dromit/FlinkTest >>>>> >>>>> On Wed, May 17, 2017 at 3:49 PM, Matt <dromitl...@gmail.com> wrote: >>>>> >>>>>> Thanks for your help Till. >>>>>> >>>>>> I will create a self contained test case in a moment and send you the >>>>>> link, wait for it. >>>>>> >>>>>> Cheers, >>>>>> Matt >>>>>> >>>>>> On Wed, May 17, 2017 at 4:38 AM, Till Rohrmann <trohrm...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> Hi Matt, >>>>>>> >>>>>>> alright, then we have to look into it again. I tried to run your >>>>>>> example, however, it does not seem to be self-contained. Using Ignite >>>>>>> 2.0.0 >>>>>>> with -DIGNITE_QUIET=false -Xms512m the Ignite object seems to be stuck >>>>>>> in >>>>>>> Ignite#start. In the logs I see the following warning: >>>>>>> >>>>>>> May 17, 2017 9:36:22 AM org.apache.ignite.logger.java.JavaLogger warning >>>>>>> WARNING: TcpDiscoveryMulticastIpFinder has no pre-configured addresses >>>>>>> (it is recommended in production to specify at least one address in >>>>>>> TcpDiscoveryMulticastIpFinder.getAddresses() configuration property) >>>>>>> May 17, 2017 9:36:24 AM org.apache.ignite.logger.java.JavaLogger warning >>>>>>> WARNING: IP finder returned empty addresses list. Please check IP >>>>>>> finder configuration and make sure multicast works on your network. >>>>>>> Will retry every 2 secs. >>>>>>> >>>>>>> However, I assume that this is not critical. >>>>>>> >>>>>>> Maybe you can tell me how I can run your example in order to debug >>>>>>> it. >>>>>>> >>>>>>> Cheers, >>>>>>> Till >>>>>>> >>>>>>> >>>>>>> On Mon, May 15, 2017 at 10:05 PM, Matt <dromitl...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi Till, >>>>>>>> >>>>>>>> I just tried with Flink 1.4 by compiling the current master branch >>>>>>>> on GitHub (as of this morning) and I still find the same problem as >>>>>>>> before. >>>>>>>> If I'm not wrong your PR was merged already, so your fixes should be >>>>>>>> part >>>>>>>> of the binary. >>>>>>>> >>>>>>>> I hope you have time to have a look at the test case in [1]. >>>>>>>> >>>>>>>> Best, >>>>>>>> Matt >>>>>>>> >>>>>>>> [1] https://gist.github.com/17d82ee7dd921a0d649574a361cc017d >>>>>>>> >>>>>>>> On Thu, Apr 27, 2017 at 10:09 AM, Matt <dromitl...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Till, >>>>>>>>> >>>>>>>>> Great! Do you know if it's planned to be included in v1.2.x or >>>>>>>>> should we wait for v1.3? I'll give it a try as soon as it's merged. >>>>>>>>> >>>>>>>>> You're right about this approach launching a mini cluster on each >>>>>>>>> Ignite node. That is intentional, as described in my previous message >>>>>>>>> on >>>>>>>>> the list [1]. >>>>>>>>> >>>>>>>>> The idea is to collocate Flink jobs on Ignite nodes, so each >>>>>>>>> dataflow only processes the elements stored on the local in-memory >>>>>>>>> database. I get the impression this should be much faster than >>>>>>>>> randomly >>>>>>>>> picking a Flink node and sending all the data over the network. >>>>>>>>> >>>>>>>>> Any insight on this? >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Matt >>>>>>>>> >>>>>>>>> [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nab >>>>>>>>> ble.com/Flink-on-Ignite-Collocation-td12780.html >>>>>>>>> >>>>>>>>> On Thu, Apr 27, 2017 at 5:33 AM, Till Rohrmann < >>>>>>>>> trohrm...@apache.org> wrote: >>>>>>>>> >>>>>>>>>> I just copied my response because my other email address is not >>>>>>>>>> accepted on the user mailing list. >>>>>>>>>> >>>>>>>>>> Hi Matt, >>>>>>>>>> >>>>>>>>>> I think Stefan's analysis is correct. I have a PR open [1], where >>>>>>>>>> I fix the issue with the class loader. >>>>>>>>>> >>>>>>>>>> As a side note, by doing what you're doing, you will spawn on >>>>>>>>>> each Ignite node a new Flink mini cluster. These mini cluster won't >>>>>>>>>> communicate with each other and run independently. Is this what you >>>>>>>>>> intend >>>>>>>>>> to do? >>>>>>>>>> >>>>>>>>>> [1] https://github.com/apache/flink/pull/3781 >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Till >>>>>>>>>> >>>>>>>>>> On Wed, Apr 26, 2017 at 11:12 PM, Matt <dromitl...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Let's wait for Till then, I hope he can figure this out. >>>>>>>>>>> >>>>>>>>>>> On Wed, Apr 26, 2017 at 11:03 AM, Stefan Richter < >>>>>>>>>>> s.rich...@data-artisans.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Ok, now the question is also about what classloaders Ignite is >>>>>>>>>>>> creating and how they are used, but the relevant code line in >>>>>>>>>>>> Flink is >>>>>>>>>>>> probably in FlinkMiniCluster.scala, line 538 (current master): >>>>>>>>>>>> >>>>>>>>>>>> try { >>>>>>>>>>>> JobClient.submitJobAndWait( >>>>>>>>>>>> clientActorSystem, >>>>>>>>>>>> configuration, >>>>>>>>>>>> leaderRetrievalService, >>>>>>>>>>>> jobGraph, >>>>>>>>>>>> timeout, >>>>>>>>>>>> printUpdates, >>>>>>>>>>>> this.getClass.getClassLoader()) >>>>>>>>>>>> } finally { >>>>>>>>>>>> if(!useSingleActorSystem) { >>>>>>>>>>>> // we have to shutdown the just created actor system >>>>>>>>>>>> shutdownJobClientActorSystem(clientActorSystem) >>>>>>>>>>>> } >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> This is what is executed as part of executing a job through >>>>>>>>>>>> LocalEnvironment. As we can see, the classloader is set to the >>>>>>>>>>>> classloader >>>>>>>>>>>> of FlinkMiniCluster. Depending on the classloader structure inside >>>>>>>>>>>> Ignite, >>>>>>>>>>>> this classloader might not know your user code. What you could do >>>>>>>>>>>> is >>>>>>>>>>>> changing this line in a custom Flink build, changing line 538 for >>>>>>>>>>>> example >>>>>>>>>>>> to Thread.currentThread().getContextClassloader() and ensuring >>>>>>>>>>>> that the context classloader ins the runnable is a classloader >>>>>>>>>>>> that a) >>>>>>>>>>>> knows the user code and b) is a child of the classloader that >>>>>>>>>>>> knows the >>>>>>>>>>>> Ignite and Flink classes. Notice that this is not a general >>>>>>>>>>>> solution and >>>>>>>>>>>> should not become a general fix. >>>>>>>>>>>> >>>>>>>>>>>> I have heard that Till is about to change some things about >>>>>>>>>>>> local execution, so I included him in CC. Maybe he can provide >>>>>>>>>>>> additional >>>>>>>>>>>> hints how your use case might be better supported in the upcoming >>>>>>>>>>>> Flink 1.3. >>>>>>>>>>>> >>>>>>>>>>>> Best, >>>>>>>>>>>> Stefan >>>>>>>>>>>> >>>>>>>>>>>> Am 25.04.2017 um 22:50 schrieb Matt <dromitl...@gmail.com>: >>>>>>>>>>>> >>>>>>>>>>>> I updated the code a little bit for clarity, now the line #56 >>>>>>>>>>>> mentioned in my previous message is line #25. >>>>>>>>>>>> >>>>>>>>>>>> In summary the error I'm getting is this: >>>>>>>>>>>> >>>>>>>>>>>> --- >>>>>>>>>>>> Caused by: org.apache.flink.streaming.run >>>>>>>>>>>> time.tasks.StreamTaskException: Cannot load user class: >>>>>>>>>>>> com.test.Test >>>>>>>>>>>> ClassLoader info: URL ClassLoader: >>>>>>>>>>>> Class not resolvable through given classloader. >>>>>>>>>>>> --- >>>>>>>>>>>> >>>>>>>>>>>> But if I'm not wrong, after trying to load the class through >>>>>>>>>>>> URLClassLoader, Flink should try loading it with its parent >>>>>>>>>>>> ClassLoader, >>>>>>>>>>>> which should be the same ClassLoader that executed the >>>>>>>>>>>> environment, and it >>>>>>>>>>>> does have access to the class. >>>>>>>>>>>> >>>>>>>>>>>> Not sure what is wrong. >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Apr 25, 2017 at 5:38 PM, Matt <dromitl...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Stefan, >>>>>>>>>>>>> >>>>>>>>>>>>> Check the code here: https://gist.github.com/ >>>>>>>>>>>>> 17d82ee7dd921a0d649574a361cc017d , the output is at the >>>>>>>>>>>>> bottom of the page. >>>>>>>>>>>>> >>>>>>>>>>>>> Here are the results of the additional tests you mentioned: >>>>>>>>>>>>> >>>>>>>>>>>>> 1. I was able to instantiate an inner class (Test$Foo) inside >>>>>>>>>>>>> the Ignite closure, no problem with that >>>>>>>>>>>>> 2. I tried implementing SourceFunction and SinkFunction in >>>>>>>>>>>>> Test itself, I was able to instantiate the class inside the >>>>>>>>>>>>> Ignite closure >>>>>>>>>>>>> 3. I'm not sure what you meant in this point, is it something >>>>>>>>>>>>> like what I tried in line #56? >>>>>>>>>>>>> >>>>>>>>>>>>> Additionally, I tried implementing the SourceFunction and >>>>>>>>>>>>> SinkFunction in Test$Foo with the same result: it says "Cannot >>>>>>>>>>>>> load user >>>>>>>>>>>>> class: com.test.Test$Foo" >>>>>>>>>>>>> >>>>>>>>>>>>> Looks like Flink is not using the correct ClassLoader. Any >>>>>>>>>>>>> idea? >>>>>>>>>>>>> >>>>>>>>>>>>> Regards, >>>>>>>>>>>>> Matt >>>>>>>>>>>>> >>>>>>>>>>>>> On Tue, Apr 25, 2017 at 7:00 AM, Stefan Richter < >>>>>>>>>>>>> s.rich...@data-artisans.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> >>>>>>>>>>>>>> I would expect that the local environment picks up the class >>>>>>>>>>>>>> path from the code that launched it. So I think the question is >>>>>>>>>>>>>> what >>>>>>>>>>>>>> happens behind the scenes when you call ignite.compute(). >>>>>>>>>>>>>> broadcast(runnable); . Which classes are shipped and how is >>>>>>>>>>>>>> the classpath build in the environment that runs the code. Your >>>>>>>>>>>>>> example is >>>>>>>>>>>>>> also not fully conclusive, because com.myproj.Test (which you can >>>>>>>>>>>>>> successfully instantiate) and com.myproj.Test$1$2 (which fails) >>>>>>>>>>>>>> are >>>>>>>>>>>>>> different classes, so maybe only the outer class is shipped with >>>>>>>>>>>>>> the >>>>>>>>>>>>>> broadcast call. My theory is that not all classes are shipped >>>>>>>>>>>>>> (e.g. inner >>>>>>>>>>>>>> classes), but only Test . You could try three things to analyze >>>>>>>>>>>>>> to problem >>>>>>>>>>>>>> a little more: >>>>>>>>>>>>>> >>>>>>>>>>>>>> 1) Create another inner class inside Test and try if you are >>>>>>>>>>>>>> still able to instantiate also this class via reflection. >>>>>>>>>>>>>> 2) Let Test class itself implement the map function (avoiding >>>>>>>>>>>>>> the usage of other/inner classes) and see if this works. >>>>>>>>>>>>>> 3) Check and set the thread’s context classloader inside the >>>>>>>>>>>>>> runnable to something that contains all required classes and see >>>>>>>>>>>>>> if this >>>>>>>>>>>>>> gets picked up by Flink. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Best, >>>>>>>>>>>>>> Stefan >>>>>>>>>>>>>> >>>>>>>>>>>>>> Am 25.04.2017 um 07:27 schrieb Matt <dromitl...@gmail.com>: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>> >>>>>>>>>>>>>> I'm trying to run Flink using a local environment, but on an >>>>>>>>>>>>>> Ignite node to achieve collocation (as mentioned in my previous >>>>>>>>>>>>>> message on >>>>>>>>>>>>>> this list). >>>>>>>>>>>>>> >>>>>>>>>>>>>> Have a look at the code in [1]. It's pretty simple, but I'm >>>>>>>>>>>>>> getting a "cannot load user class" error as shown in [2]. >>>>>>>>>>>>>> >>>>>>>>>>>>>> If you check line #29 on the code, I'm able to create an >>>>>>>>>>>>>> instance of class Test, and it's the same context from which I'm >>>>>>>>>>>>>> creating >>>>>>>>>>>>>> the Flink job. Shouldn't it work provided I'm using a local >>>>>>>>>>>>>> environment? >>>>>>>>>>>>>> >>>>>>>>>>>>>> It would be really nice to be able to inject a ClassLoader >>>>>>>>>>>>>> into the chunk of code that creates the job. Is this currently >>>>>>>>>>>>>> possible? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Any fix or workaround is appreciated! >>>>>>>>>>>>>> >>>>>>>>>>>>>> Best, >>>>>>>>>>>>>> Matt >>>>>>>>>>>>>> >>>>>>>>>>>>>> [1] https://gist.github.com/f248187b9638023b95ba8bd9d7f06215 >>>>>>>>>>>>>> [2] https://gist.github.com/796ee05425535ece1736df7b1e884cce >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >