Hi everyone! Thanks! It seems the variable that makes the problems. Making an inner class solved the issue. Cheers, Max
On Mon, Jun 15, 2015 at 2:58 PM, Kruse, Sebastian <sebastian.kr...@hpi.de> wrote: > Hi everyone, > > > > I did not reenact it, but I think the problem here is rather the anonymous > class. It looks like it is created within a class, not an object. Thus it > is not “static” in Java terms, which means that also its surrounding class > (the job class) will be serialized. And in this job class, there seems to > be a DataSet field, that cannot be serialized. > > > > If that really is the problem, you should either define your anonymous > class within the companion object of your job class or resort directly to a > function (and make sure that you do not pass a variable from your job class > into the scope of the function). > > > > Cheers, > > Sebastian > > > > *From:* Till Rohrmann [mailto:trohrm...@apache.org] > *Sent:* Montag, 15. Juni 2015 14:16 > *To:* user@flink.apache.org > *Subject:* Re: Random Selection > > > > Hi Max, > > the problem is that you’re trying to serialize the companion object of > scala.util.Random. Try to create an instance of the scala.util.Random > class and use this instance within your RIchFilterFunction to generate > the random numbers. > > Cheers, > Till > > On Mon, Jun 15, 2015 at 1:56 PM Maximilian Alber > alber.maximil...@gmail.com <http://mailto:alber.maximil...@gmail.com> > wrote: > > Hi Flinksters, > > I would like to randomly choose a element of my data set. But somehow I > cannot use scala.util inside my filter functions: > > val sample_x = X filter(new RichFilterFunction[Vector](){ > var i: Int = -1 > > override def open(config: Configuration) = { > i = scala.util.Random.nextInt(N) > } > def filter(a: Vector) = a.id == i > }) > val sample_y = Y filter(new RichFilterFunction[Vector](){ > def filter(a: Vector) = a.id == scala.util.Random.nextInt(N) > }) > > That's the error I get: > > Exception in thread "main" org.apache.flink.optimizer.CompilerException: > An error occurred while translating the optimized plan to a nephele > JobGraph: Error translating node 'Filter "Filter at > Test$anonfun$10.apply(test.scala:276)" : FLAT_MAP [[ GlobalProperties > [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, > grouped=null, unique=null] ]]': Could not write the user code wrapper class > org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : > java.io.NotSerializableException: org.apache.flink.api.scala.DataSet > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:578) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:103) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:205) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170) > at > org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:176) > at > org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789) > at > org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576) > at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544) > at Test$delayedInit$body.apply(test.scala:304) > at scala.Function0$class.apply$mcV$sp(Function0.scala:40) > at > scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) > at scala.App$anonfun$main$1.apply(App.scala:71) > at scala.App$anonfun$main$1.apply(App.scala:71) > at scala.collection.immutable.List.foreach(List.scala:318) > at > scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) > at scala.App$class.main(App.scala:71) > at Test$.main(test.scala:45) > at Test.main(test.scala) > Caused by: org.apache.flink.optimizer.CompilerException: Error translating > node 'Filter "Filter at Test$anonfun$10.apply(test.scala:276)" : FLAT_MAP > [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties > [ordering=null, grouped=null, unique=null] ]]': Could not write the user > code wrapper class > org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : > java.io.NotSerializableException: org.apache.flink.api.scala.DataSet > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:202) > at > org.apache.flink.optimizer.plan.BulkIterationPlanNode.acceptForStepFunction(BulkIterationPlanNode.java:137) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:427) > ... 21 more > Caused by: > org.apache.flink.runtime.operators.util.CorruptConfigurationException: > Could not write the user code wrapper class > org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : > java.io.NotSerializableException: org.apache.flink.api.scala.DataSet > at > org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:803) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:305) > ... 26 more > Caused by: java.io.NotSerializableException: > org.apache.flink.api.scala.DataSet > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > aio.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:314) > at > org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:268) > at > org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273) > > Did I miss something or it is simply not possible? > > Thanks! > > Cheers, > > Max > > >