Hi Matt,

sorry for not coming back to you sooner. We're currently in the release
phase and this consumes a lot of capacities.

I tried to go to the linked repo, but Github tells me that it does not
exist. Have you removed it?

Cheers,
Till

On Wed, May 17, 2017 at 10:56 PM, Matt <dromitl...@gmail.com> wrote:

> Check the repo at [1].
>
> The important step which I think is what you missed is running an Ignite
> node on your computer so the Java code, which launches an Ignite client on
> the JVM, connects to it and executes Flink on that node on a local
> environment.
>
> Be aware "peerClassLoadingEnabled" should be enabled (as in ignite.xml),
> because it must match the config on the client node.
>
> If you follow the Readme file it's everything there, if you have any
> problem let me know!
>
> Cheers,
> Matt
>
> [1] https://github.com/Dromit/FlinkTest
>
> On Wed, May 17, 2017 at 3:49 PM, Matt <dromitl...@gmail.com> wrote:
>
>> Thanks for your help Till.
>>
>> I will create a self contained test case in a moment and send you the
>> link, wait for it.
>>
>> Cheers,
>> Matt
>>
>> On Wed, May 17, 2017 at 4:38 AM, Till Rohrmann <trohrm...@apache.org>
>> wrote:
>>
>>> Hi Matt,
>>>
>>> alright, then we have to look into it again. I tried to run your
>>> example, however, it does not seem to be self-contained. Using Ignite 2.0.0
>>> with -DIGNITE_QUIET=false -Xms512m the Ignite object seems to be stuck in
>>> Ignite#start. In the logs I see the following warning:
>>>
>>> May 17, 2017 9:36:22 AM org.apache.ignite.logger.java.JavaLogger warning
>>> WARNING: TcpDiscoveryMulticastIpFinder has no pre-configured addresses (it 
>>> is recommended in production to specify at least one address in 
>>> TcpDiscoveryMulticastIpFinder.getAddresses() configuration property)
>>> May 17, 2017 9:36:24 AM org.apache.ignite.logger.java.JavaLogger warning
>>> WARNING: IP finder returned empty addresses list. Please check IP finder 
>>> configuration and make sure multicast works on your network. Will retry 
>>> every 2 secs.
>>>
>>> However, I assume that this is not critical.
>>>
>>> Maybe you can tell me how I can run your example in order to debug it.
>>>
>>> Cheers,
>>> Till
>>> ​
>>>
>>> On Mon, May 15, 2017 at 10:05 PM, Matt <dromitl...@gmail.com> wrote:
>>>
>>>> Hi Till,
>>>>
>>>> I just tried with Flink 1.4 by compiling the current master branch on
>>>> GitHub (as of this morning) and I still find the same problem as before. If
>>>> I'm not wrong your PR was merged already, so your fixes should be part of
>>>> the binary.
>>>>
>>>> I hope you have time to have a look at the test case in [1].
>>>>
>>>> Best,
>>>> Matt
>>>>
>>>> [1] https://gist.github.com/17d82ee7dd921a0d649574a361cc017d
>>>>
>>>> On Thu, Apr 27, 2017 at 10:09 AM, Matt <dromitl...@gmail.com> wrote:
>>>>
>>>>> Hi Till,
>>>>>
>>>>> Great! Do you know if it's planned to be included in v1.2.x or should
>>>>> we wait for v1.3? I'll give it a try as soon as it's merged.
>>>>>
>>>>> You're right about this approach launching a mini cluster on each
>>>>> Ignite node. That is intentional, as described in my previous message on
>>>>> the list [1].
>>>>>
>>>>> The idea is to collocate Flink jobs on Ignite nodes, so each dataflow
>>>>> only processes the elements stored on the local in-memory database. I get
>>>>> the impression this should be much faster than randomly picking a Flink
>>>>> node and sending all the data over the network.
>>>>>
>>>>> Any insight on this?
>>>>>
>>>>> Cheers,
>>>>> Matt
>>>>>
>>>>> [1] http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>>>> ble.com/Flink-on-Ignite-Collocation-td12780.html
>>>>>
>>>>> On Thu, Apr 27, 2017 at 5:33 AM, Till Rohrmann <trohrm...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> I just copied my response because my other email address is not
>>>>>> accepted on the user mailing list.
>>>>>>
>>>>>> Hi Matt,
>>>>>>
>>>>>> I think Stefan's analysis is correct. I have a PR open [1], where I
>>>>>> fix the issue with the class loader.
>>>>>>
>>>>>> As a side note, by doing what you're doing, you will spawn on each
>>>>>> Ignite node a new Flink mini cluster. These mini cluster won't 
>>>>>> communicate
>>>>>> with each other and run independently. Is this what you intend to do?
>>>>>>
>>>>>> [1] https://github.com/apache/flink/pull/3781
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Wed, Apr 26, 2017 at 11:12 PM, Matt <dromitl...@gmail.com> wrote:
>>>>>>
>>>>>>> Let's wait for Till then, I hope he can figure this out.
>>>>>>>
>>>>>>> On Wed, Apr 26, 2017 at 11:03 AM, Stefan Richter <
>>>>>>> s.rich...@data-artisans.com> wrote:
>>>>>>>
>>>>>>>> Ok, now the question is also about what classloaders Ignite is
>>>>>>>> creating and how they are used, but the relevant code line in Flink is
>>>>>>>> probably in FlinkMiniCluster.scala, line 538 (current master):
>>>>>>>>
>>>>>>>>  try {
>>>>>>>>  JobClient.submitJobAndWait(
>>>>>>>>    clientActorSystem,
>>>>>>>>    configuration,
>>>>>>>>    leaderRetrievalService,
>>>>>>>>    jobGraph,
>>>>>>>>    timeout,
>>>>>>>>    printUpdates,
>>>>>>>>    this.getClass.getClassLoader())
>>>>>>>> } finally {
>>>>>>>>    if(!useSingleActorSystem) {
>>>>>>>>      // we have to shutdown the just created actor system
>>>>>>>>      shutdownJobClientActorSystem(clientActorSystem)
>>>>>>>>    }
>>>>>>>>  }
>>>>>>>>
>>>>>>>>
>>>>>>>> This is what is executed as part of executing a job through
>>>>>>>> LocalEnvironment. As we can see, the classloader is set to the 
>>>>>>>> classloader
>>>>>>>> of FlinkMiniCluster. Depending on the classloader structure inside 
>>>>>>>> Ignite,
>>>>>>>> this classloader might not know your user code. What you could do is
>>>>>>>> changing this line in a custom Flink build, changing line 538 for 
>>>>>>>> example
>>>>>>>> to Thread.currentThread().getContextClassloader() and ensuring
>>>>>>>> that the context classloader ins the runnable is a classloader that a)
>>>>>>>> knows the user code and b) is a child of the classloader that knows the
>>>>>>>> Ignite and Flink classes. Notice that this is not a general solution 
>>>>>>>> and
>>>>>>>> should not become a general fix.
>>>>>>>>
>>>>>>>> I have heard that Till is about to change some things about local
>>>>>>>> execution, so I included him in CC. Maybe he can provide additional 
>>>>>>>> hints
>>>>>>>> how your use case might be better supported in the upcoming Flink 1.3.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Stefan
>>>>>>>>
>>>>>>>> Am 25.04.2017 um 22:50 schrieb Matt <dromitl...@gmail.com>:
>>>>>>>>
>>>>>>>> I updated the code a little bit for clarity, now the line #56
>>>>>>>> mentioned in my previous message is line #25.
>>>>>>>>
>>>>>>>> In summary the error I'm getting is this:
>>>>>>>>
>>>>>>>> ---
>>>>>>>> Caused by: org.apache.flink.streaming.run
>>>>>>>> time.tasks.StreamTaskException: Cannot load user class:
>>>>>>>> com.test.Test
>>>>>>>> ClassLoader info: URL ClassLoader:
>>>>>>>> Class not resolvable through given classloader.
>>>>>>>> ---
>>>>>>>>
>>>>>>>> But if I'm not wrong, after trying to load the class through
>>>>>>>> URLClassLoader, Flink should try loading it with its parent 
>>>>>>>> ClassLoader,
>>>>>>>> which should be the same ClassLoader that executed the environment, 
>>>>>>>> and it
>>>>>>>> does have access to the class.
>>>>>>>>
>>>>>>>> Not sure what is wrong.
>>>>>>>>
>>>>>>>> On Tue, Apr 25, 2017 at 5:38 PM, Matt <dromitl...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Stefan,
>>>>>>>>>
>>>>>>>>> Check the code here: https://gist.github.com/
>>>>>>>>> 17d82ee7dd921a0d649574a361cc017d , the output is at the bottom of
>>>>>>>>> the page.
>>>>>>>>>
>>>>>>>>> Here are the results of the additional tests you mentioned:
>>>>>>>>>
>>>>>>>>> 1. I was able to instantiate an inner class (Test$Foo) inside the
>>>>>>>>> Ignite closure, no problem with that
>>>>>>>>> 2. I tried implementing SourceFunction and SinkFunction in Test
>>>>>>>>> itself, I was able to instantiate the class inside the Ignite closure
>>>>>>>>> 3. I'm not sure what you meant in this point, is it something like
>>>>>>>>> what I tried in line #56?
>>>>>>>>>
>>>>>>>>> Additionally, I tried implementing the SourceFunction and
>>>>>>>>> SinkFunction in Test$Foo with the same result: it says "Cannot load 
>>>>>>>>> user
>>>>>>>>> class: com.test.Test$Foo"
>>>>>>>>>
>>>>>>>>> Looks like Flink is not using the correct ClassLoader. Any idea?
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Matt
>>>>>>>>>
>>>>>>>>> On Tue, Apr 25, 2017 at 7:00 AM, Stefan Richter <
>>>>>>>>> s.rich...@data-artisans.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> I would expect that the local environment picks up the class path
>>>>>>>>>> from the code that launched it. So I think the question is what 
>>>>>>>>>> happens
>>>>>>>>>> behind the scenes when you call ignite.compute().broadcas
>>>>>>>>>> t(runnable); . Which classes are shipped and how is the
>>>>>>>>>> classpath build in the environment that runs the code. Your example 
>>>>>>>>>> is also
>>>>>>>>>> not fully conclusive, because com.myproj.Test (which you can 
>>>>>>>>>> successfully
>>>>>>>>>> instantiate) and com.myproj.Test$1$2 (which fails) are different 
>>>>>>>>>> classes,
>>>>>>>>>> so maybe only the outer class is shipped with the broadcast call. My 
>>>>>>>>>> theory
>>>>>>>>>> is that not all classes are shipped (e.g. inner classes), but only 
>>>>>>>>>> Test .
>>>>>>>>>> You could try three things to analyze to problem a little more:
>>>>>>>>>>
>>>>>>>>>> 1) Create another inner class inside Test and try if you are
>>>>>>>>>> still able to instantiate also this class via reflection.
>>>>>>>>>> 2) Let Test class itself implement the map function (avoiding the
>>>>>>>>>> usage of other/inner classes) and see if this works.
>>>>>>>>>> 3) Check and set the thread’s context classloader inside the
>>>>>>>>>> runnable to something that contains all required classes and see if 
>>>>>>>>>> this
>>>>>>>>>> gets picked up by Flink.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Stefan
>>>>>>>>>>
>>>>>>>>>> Am 25.04.2017 um 07:27 schrieb Matt <dromitl...@gmail.com>:
>>>>>>>>>>
>>>>>>>>>> Hi all,
>>>>>>>>>>
>>>>>>>>>> I'm trying to run Flink using a local environment, but on an
>>>>>>>>>> Ignite node to achieve collocation (as mentioned in my previous 
>>>>>>>>>> message on
>>>>>>>>>> this list).
>>>>>>>>>>
>>>>>>>>>> Have a look at the code in [1]. It's pretty simple, but I'm
>>>>>>>>>> getting a "cannot load user class" error as shown in [2].
>>>>>>>>>>
>>>>>>>>>> If you check line #29 on the code, I'm able to create an instance
>>>>>>>>>> of class Test, and it's the same context from which I'm creating the 
>>>>>>>>>> Flink
>>>>>>>>>> job. Shouldn't it work provided I'm using a local environment?
>>>>>>>>>>
>>>>>>>>>> It would be really nice to be able to inject a ClassLoader into
>>>>>>>>>> the chunk of code that creates the job. Is this currently possible?
>>>>>>>>>>
>>>>>>>>>> Any fix or workaround is appreciated!
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Matt
>>>>>>>>>>
>>>>>>>>>> [1] https://gist.github.com/f248187b9638023b95ba8bd9d7f06215
>>>>>>>>>> [2] https://gist.github.com/796ee05425535ece1736df7b1e884cce
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to