StormConfig is set a a global job parameter FlinkClient.java line 337ff
> ExecutionConfig flinkConfig = topology.getExecutionEnvironment().getConfig(); > flinkConfig.setGlobalJobParameters(new StormConfig(conf)); On 03/31/2016 05:05 PM, Stephan Ewen wrote: > Hmm, it is wrong that the JobManager tries to load that class directly from > the actor message. > All user code should be deserialized lazily. > > How is that class passed? Implicitly through some config? > > On Thu, Mar 31, 2016 at 4:51 PM, Matthias J. Sax <mj...@apache.org> wrote: > >> Here we go... >> >> StormConfig.class is contained in the user jar file. I guess I need to >> "register" it somehow? Or is it a class loading issue? >> >> >>> 2016-03-31 16:47:33,095 ERROR akka.remote.EndpointWriter >> - AssociationError [akka.tcp://flink@127.0.0.1:6123] >> <- [akka.tcp://flink@127.0.0.1:32775]: Error >> [org.apache.flink.storm.util.StormConfig] [ >>> java.lang.ClassNotFoundException: org.apache.flink.storm.util.StormConfig >>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366) >>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355) >>> at java.security.AccessController.doPrivileged(Native Method) >>> at java.net.URLClassLoader.findClass(URLClassLoader.java:354) >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:425) >>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) >>> at java.lang.ClassLoader.loadClass(ClassLoader.java:358) >>> at java.lang.Class.forName0(Native Method) >>> at java.lang.Class.forName(Class.java:278) >>> at >> java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:625) >>> at >> akka.util.ClassLoaderObjectInputStream.resolveClass(ClassLoaderObjectInputStream.scala:19) >>> at >> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) >>> at >> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) >>> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) >>> at >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>> at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) >>> at >> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) >>> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>> at >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>> at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) >>> at >> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) >>> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>> at >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>> at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) >>> at >> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) >>> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>> at >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>> at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) >>> at >> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) >>> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>> at >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) >>> at >> akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136) >>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) >>> at >> akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136) >>> at >> akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104) >>> at scala.util.Try$.apply(Try.scala:161) >>> at >> akka.serialization.Serialization.deserialize(Serialization.scala:98) >>> at >> akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23) >>> at >> akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:58) >>> at >> akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:58) >>> at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:76) >>> at >> akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:937) >>> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) >>> at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415) >>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) >>> at akka.dispatch.Mailbox.run(Mailbox.scala:221) >>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231) >>> at >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>> at >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>> ] >> >> >> >> On 03/31/2016 04:31 PM, Till Rohrmann wrote: >>> Could you please rerun the whole job with debug log level and logging of >>> Akka's lifecycle events turned on? >>> >>> Cheers, >>> Till >>> >>> On Thu, Mar 31, 2016 at 4:21 PM, Matthias J. Sax <mj...@apache.org> >> wrote: >>> >>>> enclosed the logs.. maybe you can make some sense out if them. >>>> >>>> On 03/31/2016 02:52 PM, Till Rohrmann wrote: >>>>> I would assume that something went wrong on the JobManager side. Could >>>> you >>>>> check the logs if they contain something suspicious? Additionally you >>>> could >>>>> turn on lifecycle event logging​ for Akka. >>>>> >>>>> Cheers, >>>>> Till >>>>> ​ >>>>> >>>> >>> >> >> >
signature.asc
Description: OpenPGP digital signature