I will look into it once I have some time (end of this week, or next
week probably)

On Tue, Apr 14, 2015 at 8:51 PM, Robert Metzger <rmetz...@apache.org> wrote:
> Hey Nikolaas,
>
> Thank you for posting on the mailing list. I've met Nikolaas today in
> person and we were talking a bit about an interactive shell for Flink,
> potentially also an integration with Zeppelin.
>
> Great stuff I'm really looking forward to :)
>
> We were wondering if somebody from the list has some experience with the
> scala shell.
> I've pointed Nikolaas also to this PR:
> https://github.com/apache/flink/pull/35.
>
> Best,
> Robert
>
>
> On Tue, Apr 14, 2015 at 5:26 PM, nse sik <nikolaas.steenber...@gmail.com>
> wrote:
>
>> Hi!
>> I am trying to implement a scala shell for flink.
>>
>> I've started with a simple scala object who's main function will drop the
>> user to the interactive scala shell (repl) at one point:
>>
>>
>>
>>
>> import scala.tools.nsc.interpreter.ILoop
>> import scala.tools.nsc.Settings
>>
>> object Job {
>>   def main(args: Array[String]) {
>>
>>     val repl = new ILoop()
>>     repl.settings = new Settings()
>>
>>     // enable this line to use scala in intellij
>>     repl.settings.usejavacp.value = true
>>
>>     repl.createInterpreter()
>>
>>     // start scala interpreter shell
>>     repl.process(repl.settings)
>>
>>     repl.closeInterpreter()
>>     }
>>   }
>>
>>
>>
>>
>> Now I am trying to execute the word count example as in:
>>
>>
>>
>>
>> scala> import org.apache.flink.api.scala._
>>
>> scala> val env = ExecutionEnvironment.getExecutionEnvironment
>>
>> scala> val text = env.fromElements("To be, or not to be,--that is the
>> question:--","Whether 'tis nobler in the mind to suffer", "The slings and
>> arrows of outrageous fortune","Or to take arms against a sea of troubles,")
>>
>> scala> val counts = text.flatMap { _.toLowerCase.split("\\W+") }.map { (_,
>> 1) }.groupBy(0).sum(1)
>>
>> scala> counts.print()
>>
>> scala> env.execute("Flink Scala Api Skeleton")
>>
>>
>>
>>
>>
>>
>> However I am running into following error:
>>
>> env.execute("Flink Scala Api Skeleton")
>> org.apache.flink.runtime.client.JobExecutionException:
>> java.lang.RuntimeException: The initialization of the DataSource's outputs
>> caused an error: The type serializer factory could not load its parameters
>> from the configuration due to missing classes.
>> at
>>
>> org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:89)
>> at
>>
>> org.apache.flink.runtime.execution.RuntimeEnvironment.<init>(RuntimeEnvironment.java:187)
>> at
>>
>> org.apache.flink.runtime.taskmanager.TaskManager.submitTask(TaskManager.java:612)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at org.apache.flink.runtime.ipc.RPC$Server.call(RPC.java:420)
>> at org.apache.flink.runtime.ipc.Server$Handler.run(Server.java:949)
>> Caused by: java.lang.RuntimeException: The type serializer factory could
>> not load its parameters from the configuration due to missing classes.
>> at
>>
>> org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1086)
>> at
>>
>> org.apache.flink.runtime.operators.util.TaskConfig.getOutputSerializer(TaskConfig.java:542)
>> at
>>
>> org.apache.flink.runtime.operators.RegularPactTask.getOutputCollector(RegularPactTask.java:1251)
>> at
>>
>> org.apache.flink.runtime.operators.RegularPactTask.initOutputs(RegularPactTask.java:1359)
>> at
>>
>> org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:288)
>> at
>>
>> org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:87)
>> ... 8 more
>> Caused by: java.lang.ClassNotFoundException: $anon$2$$anon$1
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:274)
>> at
>>
>> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:54)
>> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> at
>>
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:274)
>> at
>>
>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:236)
>> at
>>
>> org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory.readParametersFromConfig(RuntimeSerializerFactory.java:76)
>> at
>>
>> org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1084)
>> ... 13 more
>>
>> at
>>
>> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:349)
>> at
>> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:239)
>> at
>>
>> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:51)
>> at
>>
>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:501)
>> at .<init>(<console>:12)
>> at .<clinit>(<console>)
>> at .<init>(<console>:7)
>> at .<clinit>(<console>)
>> at $print(<console>)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
>> at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
>> at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
>> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
>> at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
>> at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
>> at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
>> at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
>> at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
>> at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
>> at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
>> at
>>
>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
>> at
>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>> at
>> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>> at
>>
>> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>> at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
>> at org.myorg.quickstart.Job$.main(Job.scala:37)
>> at org.myorg.quickstart.Job.main(Job.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
>>
>>
>>
>> I'm pretty new to Scala and Flink, so maybe someone has a suggestion or can
>> point me in some direction?
>>
>> thanks,
>> Nikolaas
>>

Reply via email to