Hi Greg,

I’ve found the problem’s cause. In fact you spotted a really mean and
subtle bug introduced by FLINK-3327. In a nutshell, the problem is that we
now send the ExecutionConfig as part of the JobGraph to the JobManager. The
ExecutionConfig can contain classes (Class<BlockInfo>) of automatically
registered user code types, such as BlockInfo. Since the information is
directly sent to the JobManager instead of serializing it and sending the
byte array, Akka tries to load the classes upon deserialization. Since the
types are part of the user code, the class loader used by Akka cannot load
them and fails with a ClassNotFoundException. This causes the SubmitJob
message to be silently dropped.

I’ve also opened a JIRA [1] for it. I’ll try to fix the problem.

[1] https://issues.apache.org/jira/browse/FLINK-3633

Cheers,
Till
​

On Wed, Mar 16, 2016 at 8:52 PM, Greg Hogan <c...@greghogan.com> wrote:

> I realized shortly after responding that since I had moved the offending
> code into Gelly, which jar I then copied into the lib folder, that this
> would not be interfacing the user classloader. So I have merged the Gelly
> jar as well as the Gelly examples jar into a single jar which is included
> in my build at:
>
> https://s3.amazonaws.com/apache-flink/flink-1.1-SNAPSHOT.txz
>
> Are you able to replicate with the following command:
>
> $ ./bin/flink run -c org.apache.flink.graph.examples.Graph500
> flink-gelly_with_examples_2.10-1.1-SNAPSHOT.jar
>
> On Tue, Mar 15, 2016 at 5:16 PM, Greg Hogan <c...@greghogan.com> wrote:
>
> > Hi Till,
> >
> > The code in question is part of FLINK-2909 which is currently running on
> > Travis but which does not trigger this issue. I'll keep looking.
> >
> > Thanks,
> > Greg
> >
> > On Tue, Mar 15, 2016 at 11:30 AM, Till Rohrmann <trohrm...@apache.org>
> > wrote:
> >
> >> Hi Greg,
> >>
> >> could you share an example program with us which reproduces the
> problem? I
> >> suspect that, somehow, your user code class BlockInfo is sent directly
> to
> >> the JobManager where it is deserialized without the user code class
> >> loader.
> >>
> >> Cheers,
> >> Till
> >> ​
> >>
> >> On Tue, Mar 15, 2016 at 4:19 PM, Greg Hogan <c...@greghogan.com> wrote:
> >>
> >> > I am seeing a failure running my code starting with commit 0f8d76c6
> >> > (ExecutionConfig to JobGraph).
> >> >
> >> > Logs and stack trace are below.
> >> >
> >> > Using default configuration so a single TaskManager. From the web UI,
> >> data
> >> > port is 33245 and path is akka.tcp://
> >> > flink@192.168.14.134:41339/user/taskmanager.
> >> >
> >> > Placing a breakpoint in ReliableDeliverySupervisor.supervisorStrategy,
> >> > e.detailMessage is "java.lang.ClassNotFoundException:
> >> > generator.rmat.random.BlockInfo" and remoteAddress is "akka.tcp://
> >> > flink@127.0.0.1:51428".
> >> >
> >> > I have an old version of BlockInfo which works without error:
> >> >
> >> > public class BlockInfo {
> >> >
> >> >     public long seed;
> >> >
> >> >     public long edges;
> >> > }
> >> >
> >> > The new version of BlockInfo leading to the error:
> >> >
> >> > public class BlockInfo<T extends RandomGenerator> {
> >> >
> >> >     private final RandomGenerable<T> randomGenerable;
> >> >
> >> >     private final int blockIndex;
> >> >
> >> >     private final long firstElement;
> >> >
> >> >     private final long elementCount;
> >> >
> >> >     ...
> >> > }
> >> >
> >> > In this execution the RandomGenerable has only a single field, a long.
> >> I am
> >> > puzzled and unsure where to look next.
> >> >
> >> > Greg
> >> >
> >> >
> >> >
> >> > Client log:
> >> >
> >> > 2016-03-15 10:07:25,745 WARN
> >> > akka.remote.ReliableDeliverySupervisor                        -
> >> Association
> >> > with remote system [akka.tcp://flink@127.0.0.1:6123] has failed,
> >> address
> >> > is
> >> > now gated for [5000] ms. Reason is: [Disassociated].
> >> > 2016-03-15 10:08:25,692 INFO
> >> > org.apache.flink.runtime.client.JobClientActor                -
> >> Terminate
> >> > JobClientActor.
> >> >
> >> >
> >> > JobManager log:
> >> >
> >> > 2016-03-15 10:07:25,514 DEBUG
> >> > akka.serialization.Serialization(akka://flink)                - Using
> >> > serializer[akka.serialization.JavaSerializer] for message
> >> > [akka.actor.ActorIdentity]
> >> > 2016-03-15 10:07:25,533 DEBUG
> >> > akka.serialization.Serialization(akka://flink)                - Using
> >> > serializer[akka.serialization.JavaSerializer] for message
> >> > [java.lang.Integer]
> >> > 2016-03-15 10:07:25,547 DEBUG
> >> > org.apache.flink.runtime.blob.BlobServerConnection            -
> Received
> >> > PUT request for content addressable BLOB
> >> > 2016-03-15 10:07:25,731 WARN
> >> > akka.remote.ReliableDeliverySupervisor                        -
> >> Association
> >> > with remote system [akka.tcp://flink@127.0.0.1:49738] has failed,
> >> address
> >> > is now gated for [5000] ms. Reason is:
> >> [generator.rmat.random.BlockInfo].
> >> >
> >> >
> >> > Client stack trace:
> >> >
> >> > The program finished with the following exception:
> >> >
> >> > org.apache.flink.client.program.ProgramInvocationException: The
> program
> >> > execution failed: Communication with JobManager failed: Job submission
> >> to
> >> > the JobManager timed out.
> >> >     at
> >> org.apache.flink.client.program.Client.runBlocking(Client.java:381)
> >> >     at
> >> org.apache.flink.client.program.Client.runBlocking(Client.java:355)
> >> >     at
> >> org.apache.flink.client.program.Client.runBlocking(Client.java:315)
> >> >     at
> >> >
> >> >
> >>
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:60)
> >> >     at
> >> >
> >> >
> >>
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
> >> >     at Driver.main(Driver.java:462)
> >> >     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >> >     at
> >> >
> >> >
> >>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >> >     at
> >> >
> >> >
> >>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> >     at java.lang.reflect.Method.invoke(Method.java:497)
> >> >     at
> >> >
> >> >
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
> >> >     at
> >> >
> >> >
> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
> >> >     at
> >> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
> >> >     at
> >> >
> >> >
> >>
> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
> >> >     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
> >> >     at
> >> >
> >>
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
> >> >     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> >> > Caused by: org.apache.flink.runtime.client.JobExecutionException:
> >> > Communication with JobManager failed: Job submission to the JobManager
> >> > timed out.
> >> >     at
> >> >
> >> >
> >>
> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:141)
> >> >     at
> >> org.apache.flink.client.program.Client.runBlocking(Client.java:379)
> >> >     ... 16 more
> >> > Caused by:
> >> >
> >>
> org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException:
> >> > Job submission to the JobManager timed out.
> >> >     at
> >> >
> >> >
> >>
> org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:256)
> >> >     at
> >> >
> >> >
> >>
> org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88)
> >> >     at
> >> >
> >> >
> >>
> org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
> >> >     at
> >> >
> >> >
> >>
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
> >> >     at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> >> >     at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
> >> >     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> >> >     at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> >> >     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> >> >     at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> >> >     at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> >> >     at
> >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >> >     at
> >> >
> >> >
> >>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
> >> >     at
> >> >
> >> >
> >>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
> >> >     at
> >> >
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >> >     at
> >> >
> >> >
> >>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >> >
> >>
> >
> >
>

Reply via email to