Thanks for looking into it Till! I'll try changing that line locally and then send a JIRA issue. When it gets officially fixed I'll probably create an Ignite-Flink connector to replace the older and less efficient one [1]. Users will be able to create Flink jobs on Ignite nodes, right where the data is stored.
[1] https://apacheignite-mix.readme.io/docs/flink-streamer Best, Matt Matt On Mon, May 29, 2017 at 9:37 AM, Till Rohrmann <trohrm...@apache.org> wrote: > Hi Matt, > > I looked into it and it seems that the Task does not respect the context > class loader. The problem is that the local mode was not developed with the > intention to be executed within something like Ignite or an application > server. It rather assumes that you have a user code jar which is sent to > the TaskManager. This jar is then added to an URLClassLoader which is used > for user code class loading. In the case of the local execution mode, Flink > assumes that all user code jars are in the system class loader (which > usually holds true when running examples from the IDE). That is the reason > why we don’t check the TCCL. In order to fix your problem you can replace > BlobLibraryCacheManager.java:298 with this.classLoader = new > FlinkUserCodeClassLoader(libraryURLs, Thread.currentThread(). > getContextClassLoader());. Alternatively, you can build your job, copy > the user code jar to IGNITE_HOME/libs and then restart ignite. > > If you want to get the TCCL problem properly fixed, I suggest to open a > JIRA issue here [1]. > > [1] https://issues.apache.org/jira/browse/FLINK/? > selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel > > Cheers, > Till > > > On Mon, May 29, 2017 at 12:02 PM, Matt <dromitl...@gmail.com> wrote: > >> Hi Till, >> >> Have you found anything or are you still busy with the release? I have no >> idea what may be wrong, but let me know if I can help you in any way to >> find what may be going on. >> >> Best, >> Matt >> >> On Wed, May 24, 2017 at 5:37 AM, Till Rohrmann <trohrm...@apache.org> >> wrote: >> >>> Hi Matt, >>> >>> sorry for not coming back to you sooner. We're currently in the release >>> phase and this consumes a lot of capacities. >>> >>> I tried to go to the linked repo, but Github tells me that it does not >>> exist. Have you removed it? >>> >>> Cheers, >>> Till >>> >>> On Wed, May 17, 2017 at 10:56 PM, Matt <dromitl...@gmail.com> wrote: >>> >>>> Check the repo at [1]. >>>> >>>> The important step which I think is what you missed is running an >>>> Ignite node on your computer so the Java code, which launches an Ignite >>>> client on the JVM, connects to it and executes Flink on that node on a >>>> local environment. >>>> >>>> Be aware "peerClassLoadingEnabled" should be enabled (as in >>>> ignite.xml), because it must match the config on the client node. >>>> >>>> If you follow the Readme file it's everything there, if you have any >>>> problem let me know! >>>> >>>> Cheers, >>>> Matt >>>> >>>> [1] https://github.com/Dromit/FlinkTest >>>> >>>> On Wed, May 17, 2017 at 3:49 PM, Matt <dromitl...@gmail.com> wrote: >>>> >>>>> Thanks for your help Till. >>>>> >>>>> I will create a self contained test case in a moment and send you the >>>>> link, wait for it. >>>>> >>>>> Cheers, >>>>> Matt >>>>> >>>>> On Wed, May 17, 2017 at 4:38 AM, Till Rohrmann <trohrm...@apache.org> >>>>> wrote: >>>>> >>>>>> Hi Matt, >>>>>> >>>>>> alright, then we have to look into it again. I tried to run your >>>>>> example, however, it does not seem to be self-contained. Using Ignite >>>>>> 2.0.0 >>>>>> with -DIGNITE_QUIET=false -Xms512m the Ignite object seems to be stuck in >>>>>> Ignite#start. In the logs I see the following warning: >>>>>> >>>>>> May 17, 2017 9:36:22 AM org.apache.ignite.logger.java.JavaLogger warning >>>>>> WARNING: TcpDiscoveryMulticastIpFinder has no pre-configured addresses >>>>>> (it is recommended in production to specify at least one address in >>>>>> TcpDiscoveryMulticastIpFinder.getAddresses() configuration property) >>>>>> May 17, 2017 9:36:24 AM org.apache.ignite.logger.java.JavaLogger warning >>>>>> WARNING: IP finder returned empty addresses list. Please check IP finder >>>>>> configuration and make sure multicast works on your network. Will retry >>>>>> every 2 secs. >>>>>> >>>>>> However, I assume that this is not critical. >>>>>> >>>>>> Maybe you can tell me how I can run your example in order to debug it. >>>>>> >>>>>> Cheers, >>>>>> Till >>>>>> >>>>>> >>>>>> On Mon, May 15, 2017 at 10:05 PM, Matt <dromitl...@gmail.com> wrote: >>>>>> >>>>>>> Hi Till, >>>>>>> >>>>>>> I just tried with Flink 1.4 by compiling the current master branch >>>>>>> on GitHub (as of this morning) and I still find the same problem as >>>>>>> before. >>>>>>> If I'm not wrong your PR was merged already, so your fixes should be >>>>>>> part >>>>>>> of the binary. >>>>>>> >>>>>>> I hope you have time to have a look at the test case in [1]. >>>>>>> >>>>>>> Best, >>>>>>> Matt >>>>>>> >>>>>>> [1] https://gist.github.com/17d82ee7dd921a0d649574a361cc017d >>>>>>> >>>>>>> On Thu, Apr 27, 2017 at 10:09 AM, Matt <dromitl...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi Till, >>>>>>>> >>>>>>>> Great! Do you know if it's planned to be included in v1.2.x or >>>>>>>> should we wait for v1.3? I'll give it a try as soon as it's merged. >>>>>>>> >>>>>>>> You're right about this approach launching a mini cluster on each >>>>>>>> Ignite node. That is intentional, as described in my previous message >>>>>>>> on >>>>>>>> the list [1]. >>>>>>>> >>>>>>>> The idea is to collocate Flink jobs on Ignite nodes, so each >>>>>>>> dataflow only processes the elements stored on the local in-memory >>>>>>>> database. I get the impression this should be much faster than randomly >>>>>>>> picking a Flink node and sending all the data over the network. >>>>>>>> >>>>>>>> Any insight on this? >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Matt >>>>>>>> >>>>>>>> [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nab >>>>>>>> ble.com/Flink-on-Ignite-Collocation-td12780.html >>>>>>>> >>>>>>>> On Thu, Apr 27, 2017 at 5:33 AM, Till Rohrmann < >>>>>>>> trohrm...@apache.org> wrote: >>>>>>>> >>>>>>>>> I just copied my response because my other email address is not >>>>>>>>> accepted on the user mailing list. >>>>>>>>> >>>>>>>>> Hi Matt, >>>>>>>>> >>>>>>>>> I think Stefan's analysis is correct. I have a PR open [1], where >>>>>>>>> I fix the issue with the class loader. >>>>>>>>> >>>>>>>>> As a side note, by doing what you're doing, you will spawn on each >>>>>>>>> Ignite node a new Flink mini cluster. These mini cluster won't >>>>>>>>> communicate >>>>>>>>> with each other and run independently. Is this what you intend to do? >>>>>>>>> >>>>>>>>> [1] https://github.com/apache/flink/pull/3781 >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Till >>>>>>>>> >>>>>>>>> On Wed, Apr 26, 2017 at 11:12 PM, Matt <dromitl...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Let's wait for Till then, I hope he can figure this out. >>>>>>>>>> >>>>>>>>>> On Wed, Apr 26, 2017 at 11:03 AM, Stefan Richter < >>>>>>>>>> s.rich...@data-artisans.com> wrote: >>>>>>>>>> >>>>>>>>>>> Ok, now the question is also about what classloaders Ignite is >>>>>>>>>>> creating and how they are used, but the relevant code line in Flink >>>>>>>>>>> is >>>>>>>>>>> probably in FlinkMiniCluster.scala, line 538 (current master): >>>>>>>>>>> >>>>>>>>>>> try { >>>>>>>>>>> JobClient.submitJobAndWait( >>>>>>>>>>> clientActorSystem, >>>>>>>>>>> configuration, >>>>>>>>>>> leaderRetrievalService, >>>>>>>>>>> jobGraph, >>>>>>>>>>> timeout, >>>>>>>>>>> printUpdates, >>>>>>>>>>> this.getClass.getClassLoader()) >>>>>>>>>>> } finally { >>>>>>>>>>> if(!useSingleActorSystem) { >>>>>>>>>>> // we have to shutdown the just created actor system >>>>>>>>>>> shutdownJobClientActorSystem(clientActorSystem) >>>>>>>>>>> } >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> This is what is executed as part of executing a job through >>>>>>>>>>> LocalEnvironment. As we can see, the classloader is set to the >>>>>>>>>>> classloader >>>>>>>>>>> of FlinkMiniCluster. Depending on the classloader structure inside >>>>>>>>>>> Ignite, >>>>>>>>>>> this classloader might not know your user code. What you could do is >>>>>>>>>>> changing this line in a custom Flink build, changing line 538 for >>>>>>>>>>> example >>>>>>>>>>> to Thread.currentThread().getContextClassloader() and ensuring >>>>>>>>>>> that the context classloader ins the runnable is a classloader that >>>>>>>>>>> a) >>>>>>>>>>> knows the user code and b) is a child of the classloader that knows >>>>>>>>>>> the >>>>>>>>>>> Ignite and Flink classes. Notice that this is not a general >>>>>>>>>>> solution and >>>>>>>>>>> should not become a general fix. >>>>>>>>>>> >>>>>>>>>>> I have heard that Till is about to change some things about >>>>>>>>>>> local execution, so I included him in CC. Maybe he can provide >>>>>>>>>>> additional >>>>>>>>>>> hints how your use case might be better supported in the upcoming >>>>>>>>>>> Flink 1.3. >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Stefan >>>>>>>>>>> >>>>>>>>>>> Am 25.04.2017 um 22:50 schrieb Matt <dromitl...@gmail.com>: >>>>>>>>>>> >>>>>>>>>>> I updated the code a little bit for clarity, now the line #56 >>>>>>>>>>> mentioned in my previous message is line #25. >>>>>>>>>>> >>>>>>>>>>> In summary the error I'm getting is this: >>>>>>>>>>> >>>>>>>>>>> --- >>>>>>>>>>> Caused by: org.apache.flink.streaming.run >>>>>>>>>>> time.tasks.StreamTaskException: Cannot load user class: >>>>>>>>>>> com.test.Test >>>>>>>>>>> ClassLoader info: URL ClassLoader: >>>>>>>>>>> Class not resolvable through given classloader. >>>>>>>>>>> --- >>>>>>>>>>> >>>>>>>>>>> But if I'm not wrong, after trying to load the class through >>>>>>>>>>> URLClassLoader, Flink should try loading it with its parent >>>>>>>>>>> ClassLoader, >>>>>>>>>>> which should be the same ClassLoader that executed the environment, >>>>>>>>>>> and it >>>>>>>>>>> does have access to the class. >>>>>>>>>>> >>>>>>>>>>> Not sure what is wrong. >>>>>>>>>>> >>>>>>>>>>> On Tue, Apr 25, 2017 at 5:38 PM, Matt <dromitl...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Stefan, >>>>>>>>>>>> >>>>>>>>>>>> Check the code here: https://gist.github.com/ >>>>>>>>>>>> 17d82ee7dd921a0d649574a361cc017d , the output is at the bottom >>>>>>>>>>>> of the page. >>>>>>>>>>>> >>>>>>>>>>>> Here are the results of the additional tests you mentioned: >>>>>>>>>>>> >>>>>>>>>>>> 1. I was able to instantiate an inner class (Test$Foo) inside >>>>>>>>>>>> the Ignite closure, no problem with that >>>>>>>>>>>> 2. I tried implementing SourceFunction and SinkFunction in Test >>>>>>>>>>>> itself, I was able to instantiate the class inside the Ignite >>>>>>>>>>>> closure >>>>>>>>>>>> 3. I'm not sure what you meant in this point, is it something >>>>>>>>>>>> like what I tried in line #56? >>>>>>>>>>>> >>>>>>>>>>>> Additionally, I tried implementing the SourceFunction and >>>>>>>>>>>> SinkFunction in Test$Foo with the same result: it says "Cannot >>>>>>>>>>>> load user >>>>>>>>>>>> class: com.test.Test$Foo" >>>>>>>>>>>> >>>>>>>>>>>> Looks like Flink is not using the correct ClassLoader. Any idea? >>>>>>>>>>>> >>>>>>>>>>>> Regards, >>>>>>>>>>>> Matt >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Apr 25, 2017 at 7:00 AM, Stefan Richter < >>>>>>>>>>>> s.rich...@data-artisans.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi, >>>>>>>>>>>>> >>>>>>>>>>>>> I would expect that the local environment picks up the class >>>>>>>>>>>>> path from the code that launched it. So I think the question is >>>>>>>>>>>>> what >>>>>>>>>>>>> happens behind the scenes when you call ignite.compute(). >>>>>>>>>>>>> broadcast(runnable); . Which classes are shipped and how is >>>>>>>>>>>>> the classpath build in the environment that runs the code. Your >>>>>>>>>>>>> example is >>>>>>>>>>>>> also not fully conclusive, because com.myproj.Test (which you can >>>>>>>>>>>>> successfully instantiate) and com.myproj.Test$1$2 (which fails) >>>>>>>>>>>>> are >>>>>>>>>>>>> different classes, so maybe only the outer class is shipped with >>>>>>>>>>>>> the >>>>>>>>>>>>> broadcast call. My theory is that not all classes are shipped >>>>>>>>>>>>> (e.g. inner >>>>>>>>>>>>> classes), but only Test . You could try three things to analyze >>>>>>>>>>>>> to problem >>>>>>>>>>>>> a little more: >>>>>>>>>>>>> >>>>>>>>>>>>> 1) Create another inner class inside Test and try if you are >>>>>>>>>>>>> still able to instantiate also this class via reflection. >>>>>>>>>>>>> 2) Let Test class itself implement the map function (avoiding >>>>>>>>>>>>> the usage of other/inner classes) and see if this works. >>>>>>>>>>>>> 3) Check and set the thread’s context classloader inside the >>>>>>>>>>>>> runnable to something that contains all required classes and see >>>>>>>>>>>>> if this >>>>>>>>>>>>> gets picked up by Flink. >>>>>>>>>>>>> >>>>>>>>>>>>> Best, >>>>>>>>>>>>> Stefan >>>>>>>>>>>>> >>>>>>>>>>>>> Am 25.04.2017 um 07:27 schrieb Matt <dromitl...@gmail.com>: >>>>>>>>>>>>> >>>>>>>>>>>>> Hi all, >>>>>>>>>>>>> >>>>>>>>>>>>> I'm trying to run Flink using a local environment, but on an >>>>>>>>>>>>> Ignite node to achieve collocation (as mentioned in my previous >>>>>>>>>>>>> message on >>>>>>>>>>>>> this list). >>>>>>>>>>>>> >>>>>>>>>>>>> Have a look at the code in [1]. It's pretty simple, but I'm >>>>>>>>>>>>> getting a "cannot load user class" error as shown in [2]. >>>>>>>>>>>>> >>>>>>>>>>>>> If you check line #29 on the code, I'm able to create an >>>>>>>>>>>>> instance of class Test, and it's the same context from which I'm >>>>>>>>>>>>> creating >>>>>>>>>>>>> the Flink job. Shouldn't it work provided I'm using a local >>>>>>>>>>>>> environment? >>>>>>>>>>>>> >>>>>>>>>>>>> It would be really nice to be able to inject a ClassLoader >>>>>>>>>>>>> into the chunk of code that creates the job. Is this currently >>>>>>>>>>>>> possible? >>>>>>>>>>>>> >>>>>>>>>>>>> Any fix or workaround is appreciated! >>>>>>>>>>>>> >>>>>>>>>>>>> Best, >>>>>>>>>>>>> Matt >>>>>>>>>>>>> >>>>>>>>>>>>> [1] https://gist.github.com/f248187b9638023b95ba8bd9d7f06215 >>>>>>>>>>>>> [2] https://gist.github.com/796ee05425535ece1736df7b1e884cce >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >