Thanks for the feedback guys! Apparently The Scala Shell compiles the Shell input to some kind of virtual directory. It should be possible to create a jar from it's content and then hand it over to Flink for execution in some way. I will further investigate..
cheers, Nikolaas 2015-04-15 11:20 GMT+02:00 Stephan Ewen <se...@apache.org>: > To give a bit of context for the exception: > > To execute a program, the classes of the user functions need to be > available the executing TaskManagers. > > - If you execute locally from the IDE, all classes are in the classpath > anyways. > - If you use the remote environment, you need to attach the jar file to > environment. > > - In your case (repl), you need to make sure that the generated classes > are given to the TaskManager. In that sense, the approach is probably > similar to the case of executing with a remote environment - only that you > do not have a jar file up front, but need to generate it on the fly. As > Robert mentioned, https://github.com/apache/flink/pull/35 may have a first > solution to that. Other approaches are also possible, like simply always > bundling all classes in the directory where the repl puts its generated > classes. > > Greetings, > Stephan > > > On Tue, Apr 14, 2015 at 11:49 PM, Aljoscha Krettek <aljos...@apache.org> > wrote: > > > I will look into it once I have some time (end of this week, or next > > week probably) > > > > On Tue, Apr 14, 2015 at 8:51 PM, Robert Metzger <rmetz...@apache.org> > > wrote: > > > Hey Nikolaas, > > > > > > Thank you for posting on the mailing list. I've met Nikolaas today in > > > person and we were talking a bit about an interactive shell for Flink, > > > potentially also an integration with Zeppelin. > > > > > > Great stuff I'm really looking forward to :) > > > > > > We were wondering if somebody from the list has some experience with > the > > > scala shell. > > > I've pointed Nikolaas also to this PR: > > > https://github.com/apache/flink/pull/35. > > > > > > Best, > > > Robert > > > > > > > > > On Tue, Apr 14, 2015 at 5:26 PM, nse sik < > nikolaas.steenber...@gmail.com > > > > > > wrote: > > > > > >> Hi! > > >> I am trying to implement a scala shell for flink. > > >> > > >> I've started with a simple scala object who's main function will drop > > the > > >> user to the interactive scala shell (repl) at one point: > > >> > > >> > > >> > > >> > > >> import scala.tools.nsc.interpreter.ILoop > > >> import scala.tools.nsc.Settings > > >> > > >> object Job { > > >> def main(args: Array[String]) { > > >> > > >> val repl = new ILoop() > > >> repl.settings = new Settings() > > >> > > >> // enable this line to use scala in intellij > > >> repl.settings.usejavacp.value = true > > >> > > >> repl.createInterpreter() > > >> > > >> // start scala interpreter shell > > >> repl.process(repl.settings) > > >> > > >> repl.closeInterpreter() > > >> } > > >> } > > >> > > >> > > >> > > >> > > >> Now I am trying to execute the word count example as in: > > >> > > >> > > >> > > >> > > >> scala> import org.apache.flink.api.scala._ > > >> > > >> scala> val env = ExecutionEnvironment.getExecutionEnvironment > > >> > > >> scala> val text = env.fromElements("To be, or not to be,--that is the > > >> question:--","Whether 'tis nobler in the mind to suffer", "The slings > > and > > >> arrows of outrageous fortune","Or to take arms against a sea of > > troubles,") > > >> > > >> scala> val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { > > (_, > > >> 1) }.groupBy(0).sum(1) > > >> > > >> scala> counts.print() > > >> > > >> scala> env.execute("Flink Scala Api Skeleton") > > >> > > >> > > >> > > >> > > >> > > >> > > >> However I am running into following error: > > >> > > >> env.execute("Flink Scala Api Skeleton") > > >> org.apache.flink.runtime.client.JobExecutionException: > > >> java.lang.RuntimeException: The initialization of the DataSource's > > outputs > > >> caused an error: The type serializer factory could not load its > > parameters > > >> from the configuration due to missing classes. > > >> at > > >> > > >> > > > org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:89) > > >> at > > >> > > >> > > > org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:187) > > >> at > > >> > > >> > > > org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:612) > > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > >> at > > >> > > >> > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > > >> at > > >> > > >> > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > >> at java.lang.reflect.Method.invoke(Method.java:606) > > >> at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420) > > >> at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:949) > > >> Caused by: java.lang.RuntimeException: The type serializer factory > could > > >> not load its parameters from the configuration due to missing classes. > > >> at > > >> > > >> > > > org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1086) > > >> at > > >> > > >> > > > org.apache.flink.runtime.operators.util.TaskConfig.getOutputSerializer(TaskConfig.java:542) > > >> at > > >> > > >> > > > org.apache.flink.runtime.operators.RegularPactTask.getOutputCollector(RegularPactTask.java:1251) > > >> at > > >> > > >> > > > org.apache.flink.runtime.operators.RegularPactTask.initOutputs(RegularPactTask.java:1359) > > >> at > > >> > > >> > > > org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:288) > > >> at > > >> > > >> > > > org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:87) > > >> ... 8 more > > >> Caused by: java.lang.ClassNotFoundException: $anon$2$$anon$1 > > >> at java.net.URLClassLoader$1.run(URLClassLoader.java:366) > > >> at java.net.URLClassLoader$1.run(URLClassLoader.java:355) > > >> at java.security.AccessController.doPrivileged(Native Method) > > >> at java.net.URLClassLoader.findClass(URLClassLoader.java:354) > > >> at java.lang.ClassLoader.loadClass(ClassLoader.java:425) > > >> at java.lang.ClassLoader.loadClass(ClassLoader.java:358) > > >> at java.lang.Class.forName0(Native Method) > > >> at java.lang.Class.forName(Class.java:274) > > >> at > > >> > > >> > > > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:54) > > >> at > > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) > > >> at > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) > > >> at > > >> > > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) > > >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) > > >> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) > > >> at > > >> > > >> > > > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:274) > > >> at > > >> > > >> > > > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:236) > > >> at > > >> > > >> > > > org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory.readParametersFromConfig(RuntimeSerializerFactory.java:76) > > >> at > > >> > > >> > > > org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1084) > > >> ... 13 more > > >> > > >> at > > >> > > >> > > > org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:349) > > >> at > > >> > > org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:239) > > >> at > > >> > > >> > > > org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:51) > > >> at > > >> > > >> > > > org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:501) > > >> at .<init>(<console>:12) > > >> at .<clinit>(<console>) > > >> at .<init>(<console>:7) > > >> at .<clinit>(<console>) > > >> at $print(<console>) > > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > >> at > > >> > > >> > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > > >> at > > >> > > >> > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > >> at java.lang.reflect.Method.invoke(Method.java:606) > > >> at > scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734) > > >> at > scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983) > > >> at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573) > > >> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604) > > >> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568) > > >> at > scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760) > > >> at > > scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805) > > >> at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717) > > >> at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581) > > >> at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588) > > >> at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591) > > >> at > > >> > > >> > > > scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882) > > >> at > > >> > > > scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837) > > >> at > > >> > > > scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837) > > >> at > > >> > > >> > > > scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) > > >> at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837) > > >> at org.myorg.quickstart.Job$.main(Job.scala:37) > > >> at org.myorg.quickstart.Job.main(Job.scala) > > >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > >> at > > >> > > >> > > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > > >> at > > >> > > >> > > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > >> at java.lang.reflect.Method.invoke(Method.java:606) > > >> at > com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) > > >> > > >> > > >> > > >> I'm pretty new to Scala and Flink, so maybe someone has a suggestion > or > > can > > >> point me in some direction? > > >> > > >> thanks, > > >> Nikolaas > > >> > > >