Hi Till,

Great! Do you know if it's planned to be included in v1.2.x or should we
wait for v1.3? I'll give it a try as soon as it's merged.

You're right about this approach launching a mini cluster on each Ignite
node. That is intentional, as described in my previous message on the list
[1].

The idea is to collocate Flink jobs on Ignite nodes, so each dataflow only
processes the elements stored on the local in-memory database. I get the
impression this should be much faster than randomly picking a Flink node
and sending all the data over the network.

Any insight on this?

Cheers,
Matt

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-on-Ignite-Collocation-td12780.html

On Thu, Apr 27, 2017 at 5:33 AM, Till Rohrmann <trohrm...@apache.org> wrote:

> I just copied my response because my other email address is not accepted
> on the user mailing list.
>
> Hi Matt,
>
> I think Stefan's analysis is correct. I have a PR open [1], where I fix
> the issue with the class loader.
>
> As a side note, by doing what you're doing, you will spawn on each Ignite
> node a new Flink mini cluster. These mini cluster won't communicate with
> each other and run independently. Is this what you intend to do?
>
> [1] https://github.com/apache/flink/pull/3781
>
> Cheers,
> Till
>
> On Wed, Apr 26, 2017 at 11:12 PM, Matt <dromitl...@gmail.com> wrote:
>
>> Let's wait for Till then, I hope he can figure this out.
>>
>> On Wed, Apr 26, 2017 at 11:03 AM, Stefan Richter <
>> s.rich...@data-artisans.com> wrote:
>>
>>> Ok, now the question is also about what classloaders Ignite is creating
>>> and how they are used, but the relevant code line in Flink is probably in
>>> FlinkMiniCluster.scala, line 538 (current master):
>>>
>>>  try {
>>>  JobClient.submitJobAndWait(
>>>    clientActorSystem,
>>>    configuration,
>>>    leaderRetrievalService,
>>>    jobGraph,
>>>    timeout,
>>>    printUpdates,
>>>    this.getClass.getClassLoader())
>>> } finally {
>>>    if(!useSingleActorSystem) {
>>>      // we have to shutdown the just created actor system
>>>      shutdownJobClientActorSystem(clientActorSystem)
>>>    }
>>>  }
>>>
>>>
>>> This is what is executed as part of executing a job through
>>> LocalEnvironment. As we can see, the classloader is set to the classloader
>>> of FlinkMiniCluster. Depending on the classloader structure inside Ignite,
>>> this classloader might not know your user code. What you could do is
>>> changing this line in a custom Flink build, changing line 538 for example
>>> to Thread.currentThread().getContextClassloader() and ensuring that the
>>> context classloader ins the runnable is a classloader that a) knows the
>>> user code and b) is a child of the classloader that knows the Ignite and
>>> Flink classes. Notice that this is not a general solution and should not
>>> become a general fix.
>>>
>>> I have heard that Till is about to change some things about local
>>> execution, so I included him in CC. Maybe he can provide additional hints
>>> how your use case might be better supported in the upcoming Flink 1.3.
>>>
>>> Best,
>>> Stefan
>>>
>>> Am 25.04.2017 um 22:50 schrieb Matt <dromitl...@gmail.com>:
>>>
>>> I updated the code a little bit for clarity, now the line #56 mentioned
>>> in my previous message is line #25.
>>>
>>> In summary the error I'm getting is this:
>>>
>>> ---
>>> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
>>> Cannot load user class: com.test.Test
>>> ClassLoader info: URL ClassLoader:
>>> Class not resolvable through given classloader.
>>> ---
>>>
>>> But if I'm not wrong, after trying to load the class through
>>> URLClassLoader, Flink should try loading it with its parent ClassLoader,
>>> which should be the same ClassLoader that executed the environment, and it
>>> does have access to the class.
>>>
>>> Not sure what is wrong.
>>>
>>> On Tue, Apr 25, 2017 at 5:38 PM, Matt <dromitl...@gmail.com> wrote:
>>>
>>>> Hi Stefan,
>>>>
>>>> Check the code here: https://gist.github.com/
>>>> 17d82ee7dd921a0d649574a361cc017d , the output is at the bottom of the
>>>> page.
>>>>
>>>> Here are the results of the additional tests you mentioned:
>>>>
>>>> 1. I was able to instantiate an inner class (Test$Foo) inside the
>>>> Ignite closure, no problem with that
>>>> 2. I tried implementing SourceFunction and SinkFunction in Test itself,
>>>> I was able to instantiate the class inside the Ignite closure
>>>> 3. I'm not sure what you meant in this point, is it something like what
>>>> I tried in line #56?
>>>>
>>>> Additionally, I tried implementing the SourceFunction and SinkFunction
>>>> in Test$Foo with the same result: it says "Cannot load user class:
>>>> com.test.Test$Foo"
>>>>
>>>> Looks like Flink is not using the correct ClassLoader. Any idea?
>>>>
>>>> Regards,
>>>> Matt
>>>>
>>>> On Tue, Apr 25, 2017 at 7:00 AM, Stefan Richter <
>>>> s.rich...@data-artisans.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I would expect that the local environment picks up the class path from
>>>>> the code that launched it. So I think the question is what happens behind
>>>>> the scenes when you call ignite.compute().broadcast(runnable); .
>>>>> Which classes are shipped and how is the classpath build in the 
>>>>> environment
>>>>> that runs the code. Your example is also not fully conclusive, because
>>>>> com.myproj.Test (which you can successfully instantiate) and
>>>>> com.myproj.Test$1$2 (which fails) are different classes, so maybe only the
>>>>> outer class is shipped with the broadcast call. My theory is that not all
>>>>> classes are shipped (e.g. inner classes), but only Test . You could try
>>>>> three things to analyze to problem a little more:
>>>>>
>>>>> 1) Create another inner class inside Test and try if you are still
>>>>> able to instantiate also this class via reflection.
>>>>> 2) Let Test class itself implement the map function (avoiding the
>>>>> usage of other/inner classes) and see if this works.
>>>>> 3) Check and set the thread’s context classloader inside the runnable
>>>>> to something that contains all required classes and see if this gets 
>>>>> picked
>>>>> up by Flink.
>>>>>
>>>>> Best,
>>>>> Stefan
>>>>>
>>>>> Am 25.04.2017 um 07:27 schrieb Matt <dromitl...@gmail.com>:
>>>>>
>>>>> Hi all,
>>>>>
>>>>> I'm trying to run Flink using a local environment, but on an Ignite
>>>>> node to achieve collocation (as mentioned in my previous message on this
>>>>> list).
>>>>>
>>>>> Have a look at the code in [1]. It's pretty simple, but I'm getting a
>>>>> "cannot load user class" error as shown in [2].
>>>>>
>>>>> If you check line #29 on the code, I'm able to create an instance of
>>>>> class Test, and it's the same context from which I'm creating the Flink
>>>>> job. Shouldn't it work provided I'm using a local environment?
>>>>>
>>>>> It would be really nice to be able to inject a ClassLoader into the
>>>>> chunk of code that creates the job. Is this currently possible?
>>>>>
>>>>> Any fix or workaround is appreciated!
>>>>>
>>>>> Best,
>>>>> Matt
>>>>>
>>>>> [1] https://gist.github.com/f248187b9638023b95ba8bd9d7f06215
>>>>> [2] https://gist.github.com/796ee05425535ece1736df7b1e884cce
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>
>

Reply via email to