I will look into it once I have some time (end of this week, or next week probably)
On Tue, Apr 14, 2015 at 8:51 PM, Robert Metzger <rmetz...@apache.org> wrote: > Hey Nikolaas, > > Thank you for posting on the mailing list. I've met Nikolaas today in > person and we were talking a bit about an interactive shell for Flink, > potentially also an integration with Zeppelin. > > Great stuff I'm really looking forward to :) > > We were wondering if somebody from the list has some experience with the > scala shell. > I've pointed Nikolaas also to this PR: > https://github.com/apache/flink/pull/35. > > Best, > Robert > > > On Tue, Apr 14, 2015 at 5:26 PM, nse sik <nikolaas.steenber...@gmail.com> > wrote: > >> Hi! >> I am trying to implement a scala shell for flink. >> >> I've started with a simple scala object who's main function will drop the >> user to the interactive scala shell (repl) at one point: >> >> >> >> >> import scala.tools.nsc.interpreter.ILoop >> import scala.tools.nsc.Settings >> >> object Job { >> def main(args: Array[String]) { >> >> val repl = new ILoop() >> repl.settings = new Settings() >> >> // enable this line to use scala in intellij >> repl.settings.usejavacp.value = true >> >> repl.createInterpreter() >> >> // start scala interpreter shell >> repl.process(repl.settings) >> >> repl.closeInterpreter() >> } >> } >> >> >> >> >> Now I am trying to execute the word count example as in: >> >> >> >> >> scala> import org.apache.flink.api.scala._ >> >> scala> val env = ExecutionEnvironment.getExecutionEnvironment >> >> scala> val text = env.fromElements("To be, or not to be,--that is the >> question:--","Whether 'tis nobler in the mind to suffer", "The slings and >> arrows of outrageous fortune","Or to take arms against a sea of troubles,") >> >> scala> val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_, >> 1) }.groupBy(0).sum(1) >> >> scala> counts.print() >> >> scala> env.execute("Flink Scala Api Skeleton") >> >> >> >> >> >> >> However I am running into following error: >> >> env.execute("Flink Scala Api Skeleton") >> org.apache.flink.runtime.client.JobExecutionException: >> java.lang.RuntimeException: The initialization of the DataSource's outputs >> caused an error: The type serializer factory could not load its parameters >> from the configuration due to missing classes. >> at >> >> org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:89) >> at >> >> org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:187) >> at >> >> org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:612) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >> at >> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:606) >> at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420) >> at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:949) >> Caused by: java.lang.RuntimeException: The type serializer factory could >> not load its parameters from the configuration due to missing classes. >> at >> >> org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1086) >> at >> >> org.apache.flink.runtime.operators.util.TaskConfig.getOutputSerializer(TaskConfig.java:542) >> at >> >> org.apache.flink.runtime.operators.RegularPactTask.getOutputCollector(RegularPactTask.java:1251) >> at >> >> org.apache.flink.runtime.operators.RegularPactTask.initOutputs(RegularPactTask.java:1359) >> at >> >> org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:288) >> at >> >> org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:87) >> ... 8 more >> Caused by: java.lang.ClassNotFoundException: $anon$2$$anon$1 >> at java.net.URLClassLoader$1.run(URLClassLoader.java:366) >> at java.net.URLClassLoader$1.run(URLClassLoader.java:355) >> at java.security.AccessController.doPrivileged(Native Method) >> at java.net.URLClassLoader.findClass(URLClassLoader.java:354) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:425) >> at java.lang.ClassLoader.loadClass(ClassLoader.java:358) >> at java.lang.Class.forName0(Native Method) >> at java.lang.Class.forName(Class.java:274) >> at >> >> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:54) >> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) >> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) >> at >> >> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:274) >> at >> >> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:236) >> at >> >> org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory.readParametersFromConfig(RuntimeSerializerFactory.java:76) >> at >> >> org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1084) >> ... 13 more >> >> at >> >> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:349) >> at >> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:239) >> at >> >> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:51) >> at >> >> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:501) >> at .<init>(<console>:12) >> at .<clinit>(<console>) >> at .<init>(<console>:7) >> at .<clinit>(<console>) >> at $print(<console>) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >> at >> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:606) >> at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734) >> at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983) >> at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573) >> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604) >> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568) >> at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760) >> at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805) >> at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717) >> at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581) >> at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588) >> at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591) >> at >> >> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882) >> at >> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837) >> at >> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837) >> at >> >> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) >> at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837) >> at org.myorg.quickstart.Job$.main(Job.scala:37) >> at org.myorg.quickstart.Job.main(Job.scala) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >> at >> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:606) >> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) >> >> >> >> I'm pretty new to Scala and Flink, so maybe someone has a suggestion or can >> point me in some direction? >> >> thanks, >> Nikolaas >>