Chesnay, I have filed https://issues.apache.org/jira/browse/FLINK-9267 to keep track of this issue.
Regards, Kedar On Fri, Apr 27, 2018 at 11:50 AM, kedar mhaswade <kedar.mhasw...@gmail.com> wrote: > Thanks again! > > This is strange. With both Flink 1.3.3 and Flink 1.6.0-SNAPSHOT and > 1) copying gradoop-demo-shaded.jar to <Flink>/lib, and > 2) using RemoteEnvironment with just jmHost and jmPort (no Jarfiles) > > I get the same exception [1], caused by: > *Caused by: com.typesafe.config.ConfigException$Missing: No configuration > setting found for key 'akka.remote.log-received-messages'.* > > This key is not documented anywhere, so I am confused. Also, copying with > above, also JM and TM are running, the Flink dashboard on > http://localhost:8081 is *unavailable*! > > With Flink 1.3.3 and Flink 1.6.0-SNAPSHOT > 1) NOT copying gradoop-shaded.jar in <Flink>/lib, and > 2) using RemoteEnvironment with jmHost, jmPort and jarFiles = > {<absolute-path-to-gradoop-shaded.jar>} > > I get the same exception, however the Flink dashboard on > http://localhost:8081 is *available*! This makes me believe that this is > somehow an insidious classloading issue :(. > I am really perplexed with this behavior. Let me stick to Flink 1.3.3 > installation as you suggested for now. > > If you have any other debugging tips, please let me know. But I am running > out of ideas to make it run with non-Local Environment. > > Regards, > Kedar > > > > > [1] Gradoop shaded jar in <Flink>/lib -- exception on the web-app: > org.apache.flink.client.program.ProgramInvocationException: Could not > start the ActorSystem needed to talk to the JobManager. > at org.apache.flink.client.program.ClusterClient.run( > ClusterClient.java:461) > at org.apache.flink.client.program.StandaloneClusterClient.submitJob( > StandaloneClusterClient.java:105) > at org.apache.flink.client.program.ClusterClient.run( > ClusterClient.java:442) > at org.apache.flink.client.program.ClusterClient.run( > ClusterClient.java:429) > at org.apache.flink.client.program.ClusterClient.run( > ClusterClient.java:404) > at org.apache.flink.client.RemoteExecutor.executePlanWithJars( > RemoteExecutor.java:211) > at org.apache.flink.client.RemoteExecutor.executePlan( > RemoteExecutor.java:188) > at org.apache.flink.api.java.RemoteEnvironment.execute( > RemoteEnvironment.java:172) > at org.apache.flink.api.java.ExecutionEnvironment.execute( > ExecutionEnvironment.java:926) > at org.gradoop.demo.server.RequestHandler.getResponse( > RequestHandler.java:447) > at org.gradoop.demo.server.RequestHandler.createResponse( > RequestHandler.java:430) > at org.gradoop.demo.server.RequestHandler.executeCypher( > RequestHandler.java:121) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke( > NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke( > DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at com.sun.jersey.spi.container.JavaMethodInvokerFactory$1.invoke( > JavaMethodInvokerFactory.java:60) > at com.sun.jersey.server.impl.model.method.dispatch. > AbstractResourceMethodDispatchProvider$ResponseOutInvoker._dispatch( > AbstractResourceMethodDispatchProvider.java:205) > at com.sun.jersey.server.impl.model.method.dispatch. > ResourceJavaMethodDispatcher.dispatch(ResourceJavaMethodDispatcher. > java:75) > at com.sun.jersey.server.impl.uri.rules.HttpMethodRule. > accept(HttpMethodRule.java:302) > at com.sun.jersey.server.impl.uri.rules.RightHandPathRule. > accept(RightHandPathRule.java:147) > at com.sun.jersey.server.impl.uri.rules.ResourceClassRule. > accept(ResourceClassRule.java:108) > at com.sun.jersey.server.impl.uri.rules.RightHandPathRule. > accept(RightHandPathRule.java:147) > at com.sun.jersey.server.impl.uri.rules.RootResourceClassesRule.accept( > RootResourceClassesRule.java:84) > at com.sun.jersey.server.impl.application.WebApplicationImpl._ > handleRequest(WebApplicationImpl.java:1542) > at com.sun.jersey.server.impl.application.WebApplicationImpl._ > handleRequest(WebApplicationImpl.java:1473) > at com.sun.jersey.server.impl.application.WebApplicationImpl. > handleRequest(WebApplicationImpl.java:1419) > at com.sun.jersey.server.impl.application.WebApplicationImpl. > handleRequest(WebApplicationImpl.java:1409) > at com.sun.jersey.server.impl.container.grizzly2. > GrizzlyContainer._service(GrizzlyContainer.java:222) > at com.sun.jersey.server.impl.container.grizzly2.GrizzlyContainer.service( > GrizzlyContainer.java:192) > at org.glassfish.grizzly.http.server.HttpHandler.doHandle( > HttpHandler.java:164) > at org.glassfish.grizzly.http.server.HttpHandlerChain. > service(HttpHandlerChain.java:196) > at org.glassfish.grizzly.http.server.HttpHandler.doHandle( > HttpHandler.java:164) > at org.glassfish.grizzly.http.server.HttpServerFilter. > handleRead(HttpServerFilter.java:175) > at org.glassfish.grizzly.filterchain.ExecutorResolver$ > 9.execute(ExecutorResolver.java:119) > at org.glassfish.grizzly.filterchain.DefaultFilterChain.executeFilter( > DefaultFilterChain.java:265) > at org.glassfish.grizzly.filterchain.DefaultFilterChain.executeChainPart( > DefaultFilterChain.java:200) > at org.glassfish.grizzly.filterchain.DefaultFilterChain.execute( > DefaultFilterChain.java:134) > at org.glassfish.grizzly.filterchain.DefaultFilterChain.process( > DefaultFilterChain.java:112) > at org.glassfish.grizzly.ProcessorExecutor.execute( > ProcessorExecutor.java:78) > at org.glassfish.grizzly.nio.transport.TCPNIOTransport. > fireIOEvent(TCPNIOTransport.java:815) > at org.glassfish.grizzly.strategies.AbstractIOStrategy.fireIOEvent( > AbstractIOStrategy.java:112) > at org.glassfish.grizzly.strategies.WorkerThreadIOStrategy.run0( > WorkerThreadIOStrategy.java:115) > at org.glassfish.grizzly.strategies.WorkerThreadIOStrategy.access$ > 100(WorkerThreadIOStrategy.java:55) > at org.glassfish.grizzly.strategies.WorkerThreadIOStrategy$ > WorkerThreadRunnable.run(WorkerThreadIOStrategy.java:135) > at org.glassfish.grizzly.threadpool.AbstractThreadPool$Worker.doWork( > AbstractThreadPool.java:567) > at org.glassfish.grizzly.threadpool.AbstractThreadPool$ > Worker.run(AbstractThreadPool.java:547) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.util.FlinkException: Could not start the > ActorSystem lazily. > at org.apache.flink.client.program.ClusterClient$ > LazyActorSystemLoader.get(ClusterClient.java:230) > at org.apache.flink.client.program.ClusterClient.run( > ClusterClient.java:459) > ... 47 more > *Caused by: com.typesafe.config.ConfigException$Missing: No configuration > setting found for key 'akka.remote.log-received-messages'* > at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124) > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:145) > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:151) > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:151) > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159) > at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164) > at com.typesafe.config.impl.SimpleConfig.getBoolean(SimpleConfig.java:174) > at akka.remote.RemoteSettings.<init>(RemoteSettings.scala:24) > at akka.remote.RemoteActorRefProvider.<init>(RemoteActorRefProvider.scala: > 114) > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at sun.reflect.NativeConstructorAccessorImpl.newInstance( > NativeConstructorAccessorImpl.java:62) > at sun.reflect.DelegatingConstructorAccessorImpl.newInstance( > DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2. > apply(DynamicAccess.scala:78) > at scala.util.Try$.apply(Try.scala:192) > at akka.actor.ReflectiveDynamicAccess.createInstanceFor( > DynamicAccess.scala:73) > at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3. > apply(DynamicAccess.scala:84) > at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3. > apply(DynamicAccess.scala:84) > at scala.util.Success.flatMap(Try.scala:231) > at akka.actor.ReflectiveDynamicAccess.createInstanceFor( > DynamicAccess.scala:84) > at akka.actor.ActorSystemImpl.liftedTree1$1(ActorSystem.scala:585) > at akka.actor.ActorSystemImpl.<init>(ActorSystem.scala:578) > at akka.actor.ActorSystem$.apply(ActorSystem.scala:142) > at akka.actor.ActorSystem$.apply(ActorSystem.scala:119) > at akka.actor.ActorSystem$.create(ActorSystem.scala:67) > at org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem( > AkkaUtils.scala:104) > at org.apache.flink.runtime.akka.AkkaUtils$.createActorSystem( > AkkaUtils.scala:92) > at org.apache.flink.runtime.akka.AkkaUtils.createActorSystem( > AkkaUtils.scala) > at org.apache.flink.client.program.ClusterClient$ > LazyActorSystemLoader.get(ClusterClient.java:226) > > > On Thu, Apr 26, 2018 at 11:52 PM, Chesnay Schepler <ches...@apache.org> > wrote: > >> First, a small correction for my previous mail: >> >> I could reproduce your problems locally when submitting the fat-jar. >> Turns out i never submitted the far-jar, as i didn't pass the jar file >> argument to RemoteEnvironment. >> >> Now on to your questions: >> >> *What version of Flink are you trying with?* >> I got it working *once *with 1.6-SNAPSHOT, but i would recommend >> sticking with 1.3.1 since that is the version gradoop depends on. (i >> haven't tried it with this version yet, but that's the next thing on my >> list) >> >> >> *Are there other config changes (flink-conf.yaml) that you made in your >> cluster? *It was the standard config. >> >> >> *Is org.apache.flink.api.common.io >> <http://org.apache.flink.api.common.io>.FileOutputFormat a good alternative >> to LocalCollectionOutputFormat? *It can be used, but if the result is >> small you could also use accumulators. >> >> >> *Do you think it is better to use jarFiles argument on >> createRemoteEnvironment? *Yes, once we get it working this is the way to >> go. >> >> >> On 26.04.2018 18:42, kedar mhaswade wrote: >> >> Thanks Chesnay for your incredible help! >> >> I will try out the suggestions again. A few questions: >> - What version of Flink are you trying with? I have had issues when I >> placed the gradoop-demo-shaded.jar in the lib folder on Flink >> installation (1.4 even refused to start!). >> - Are there other config changes (flink-conf.yaml) that you made in your >> cluster? >> - Is org.apache.flink.api.common.io.FileOutputFormat a good alternative >> to LocalCollectionOutputFormat, or should I use >> HadoopOutputFormatCommonBase (I do want to run the cluster on YARN >> later; at the moment I am trying on a standalone cluster). >> - Do you think it is better to use jarFiles argument on >> createRemoteEnvironment (which deploys the JAR only for this job and not >> mess with the entire Flink cluster) a better option than placing the JAR(s) >> in the lib folder? >> >> Thanks again, >> Regards, >> Kedar >> >> >> On Thu, Apr 26, 2018 at 3:14 AM, Chesnay Schepler <ches...@apache.org> >> wrote: >> >>> Small update: >>> >>> I could reproduce your problems locally when submitting the fat-jar. >>> I could get the job to run after placing the gradoop-demo-shaded.jar >>> into the lib folder. >>> I have not tried yet placing only the gradoop jars into lib (but my >>> guess is you missed a gradoop jar) >>> >>> Note that the job fails to run since you use >>> "LocalCollectionOutputFormat" which can only be used for local execution, >>> i.e. when the job submission and execution happen in the same JVM. >>> >>> >>> On 25.04.2018 14:23, kedar mhaswade wrote: >>> >>> Thank you for your response! >>> >>> I have not tried the flink run app.jar route because the way the app is >>> set up does not allow me to do it. Basically, the app is a web application >>> which serves the UI and also submits a Flink job for running Cypher >>> queries. It is a proof-of-concept app, but IMO, a very useful one. >>> >>> Here's how you can reproduce: >>> 1) git clone g...@github.com:kedarmhaswade/gradoop_demo.git (this is my >>> fork of gradoop_demo) >>> 2) cd gradoop_demo >>> 3) git checkout dev => dev is the branch where my changes to make >>> gradoop work with remote environment go. >>> 4) mvn clean package => should bring the gradoop JARs that this app >>> needs; these JARs should then be placed in <flink-install>/lib. >>> 5) cp >>> ~/.m2/repository/org/gradoop/gradoop-common/0.3.2/gradoop-common-0.3.2.jar >>> <flink-install>/lib, cp ~/.m2/repository/org/gradoop/g >>> radoop-flink/0.3.2/gradoop-flink-0.3.2.jar <flink-install>/lib, >>> cp target/gradoop-demo-0.2.0.jar <flink-install>/lib. >>> 6) start the local flink cluster (I have tried with latest >>> (built-from-source) 1.6-SNAPSHOT, or 1.4) >>> <flink-install>/bin/start-cluster.sh >>> -- note the JM host and port >>> 7) <gradoop-demo>/start.sh --jmhost <host> --jmport 6123 (adjust host >>> and port per your cluster) => this is now configured to talk to the >>> RemoteEnvironment at given JM host and port. >>> 8) open a browser at: http://localhost:2342/gradoop/html/cypher.html >>> 9) hit the query button => this would throw the exception >>> 10) Ctrl C the process in 7 and just restart it as java -cp target/ >>> classes:target/gradoop-demo-shaded.jar org.gradoop.demo.server.Server >>> => starts LocalEnvironment >>> 11) do 9 again and see the results shown nicely in the browser. >>> >>> Here is the relevant code: >>> 1) Choosing between >>> <https://github.com/kedarmhaswade/gradoop_demo/blob/dev/src/main/java/org/gradoop/demo/server/Server.java#L107> >>> a Remote or a Local Environment. >>> >>> The instructions are correct to my knowledge. Thanks for your >>> willingness to try. I have tried everything I can. With different Flink >>> versions, I get different results (I have also tried on 1.6-SNAPSHOT with >>> class loading config being parent-first, or child-first). >>> >>> Regards, >>> Kedar >>> >>> >>> On Wed, Apr 25, 2018 at 1:08 AM, Chesnay Schepler <ches...@apache.org> >>> wrote: >>> >>>> I couldn't spot any error in what you tried to do. Does the >>>> job-submission succeed if you submit the jar through the command-line >>>> client? >>>> >>>> Can you share the project, or a minimal reproducing version? >>>> >>>> >>>> On 25.04.2018 00:41, kedar mhaswade wrote: >>>> >>>> I am trying to get gradoop_demo >>>> <https://github.com/dbs-leipzig/gradoop_demo> (a gradoop based graph >>>> visualization app) working on Flink with *Remote* Execution >>>> Environment. >>>> >>>> This app, which is based on Gradoop, submits a job to the >>>> *preconfigured* execution environment, collects the results and sends >>>> it to the UI for rendering. >>>> >>>> When the execution environment is configured to be a LocalEnvironment >>>> <https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/api/java/LocalEnvironment.html>, >>>> everything works fine. But when I start a cluster (using < >>>> flink-install-path>/bin/start-cluster.sh), get the Job Manager >>>> endpoint (e.g. localhost:6123) and configure a RemoteEnvironment >>>> <https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/api/java/ExecutionEnvironment.html#createRemoteEnvironment-java.lang.String-int-org.apache.flink.configuration.Configuration-java.lang.String...-> >>>> and >>>> use that environment to run the job, I get exceptions [1]. >>>> >>>> Based on the class loading doc >>>> <https://ci.apache.org/projects/flink/flink-docs-master/monitoring/debugging_classloading.html>, >>>> I copied the gradoop classes (gradoop-flink-0.3.3-SNAPSHOT. >>>> jar, gradoop-common-0.3.3-SNAPSHOT.jar) to the <flink-install-path>/lib >>>> folder (hoping that that way those classes will be available to all the >>>> executors in the cluster). I have ensured that the class that Flink fails >>>> to load is in fact available in the Gradoop jars that I copied to the >>>> /lib folder. >>>> >>>> I have tried using the RemoteEnvironment method with jarFiles argument >>>> where the passed JAR file is a fat jar containing everything (in which case >>>> there is no Gradoop JAR file in /lib folder). >>>> >>>> So, my questions are: >>>> 1) How can I use RemoteEnvironment? >>>> 2) Is there any other way of doing this *programmatically? *(That >>>> means I can't do flink run since I am interested in the job execution >>>> result as a blocking call -- which means ideally I don't want to use the >>>> submit RESTful API as well). I just want RemoteEnvironment to work as well >>>> as LocalEnvironment. >>>> >>>> Regards, >>>> Kedar >>>> >>>> >>>> [1] >>>> 2018-04-24 15:16:02,823 ERROR >>>> org.apache.flink.runtime.jobmanager.JobManager >>>> - Failed to submit job 0c987c8704f8b7eb4d7d38efcb3d708d >>>> (Flink Java Job at Tue Apr 24 15:15:59 PDT 2018) >>>> java.lang.NoClassDefFoundError: Could not initialize class >>>> *org.gradoop.common.model.impl.id.GradoopId* >>>> at java.io.ObjectStreamClass.hasStaticInitializer(Native Method) >>>> at java.io.ObjectStreamClass.computeDefaultSUID(ObjectStreamCla >>>> ss.java:1887) >>>> at java.io.ObjectStreamClass.access$100(ObjectStreamClass.java:79) >>>> at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:263) >>>> at java.io.ObjectStreamClass$1.run(ObjectStreamClass.java:261) >>>> at java.security.AccessController.doPrivileged(Native Method) >>>> at java.io.ObjectStreamClass.getSerialVersionUID(ObjectStreamCl >>>> ass.java:260) >>>> at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:682) >>>> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream >>>> .java:1876) >>>> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.ja >>>> va:1745) >>>> at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1710) >>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1550) >>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427) >>>> at java.util.HashSet.readObject(HashSet.java:341) >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce >>>> ssorImpl.java:62) >>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe >>>> thodAccessorImpl.java:43) >>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass >>>> .java:1158) >>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.j >>>> ava:2169) >>>> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStre >>>> am.java:2060) >>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) >>>> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStrea >>>> m.java:2278) >>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.j >>>> ava:2202) >>>> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStre >>>> am.java:2060) >>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) >>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427) >>>> at org.apache.flink.util.InstantiationUtil.deserializeObject(In >>>> stantiationUtil.java:290).... >>>> >>>> >>>> >>> >>> >> >> >