Hi Max, the problem is that you’re trying to serialize the companion object of scala.util.Random. Try to create an instance of the scala.util.Random class and use this instance within your RIchFilterFunction to generate the random numbers.
Cheers, Till On Mon, Jun 15, 2015 at 1:56 PM Maximilian Alber alber.maximil...@gmail.com <http://mailto:alber.maximil...@gmail.com> wrote: Hi Flinksters, > > I would like to randomly choose a element of my data set. But somehow I > cannot use scala.util inside my filter functions: > > val sample_x = X filter(new RichFilterFunction[Vector](){ > var i: Int = -1 > > override def open(config: Configuration) = { > i = scala.util.Random.nextInt(N) > } > def filter(a: Vector) = a.id == i > }) > val sample_y = Y filter(new RichFilterFunction[Vector](){ > def filter(a: Vector) = a.id == scala.util.Random.nextInt(N) > }) > > That's the error I get: > > Exception in thread "main" org.apache.flink.optimizer.CompilerException: > An error occurred while translating the optimized plan to a nephele > JobGraph: Error translating node 'Filter "Filter at > Test$anonfun$10.apply(test.scala:276)" : FLAT_MAP [[ GlobalProperties > [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, > grouped=null, unique=null] ]]': Could not write the user code wrapper class > org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : > java.io.NotSerializableException: org.apache.flink.api.scala.DataSet > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:578) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:103) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:205) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170) > at > org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:176) > at > org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789) > at > org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576) > at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544) > at Test$delayedInit$body.apply(test.scala:304) > at scala.Function0$class.apply$mcV$sp(Function0.scala:40) > at > scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) > at scala.App$anonfun$main$1.apply(App.scala:71) > at scala.App$anonfun$main$1.apply(App.scala:71) > at scala.collection.immutable.List.foreach(List.scala:318) > at > scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) > at scala.App$class.main(App.scala:71) > at Test$.main(test.scala:45) > at Test.main(test.scala) > Caused by: org.apache.flink.optimizer.CompilerException: Error translating > node 'Filter "Filter at Test$anonfun$10.apply(test.scala:276)" : FLAT_MAP > [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties > [ordering=null, grouped=null, unique=null] ]]': Could not write the user > code wrapper class > org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : > java.io.NotSerializableException: org.apache.flink.api.scala.DataSet > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:202) > at > org.apache.flink.optimizer.plan.BulkIterationPlanNode.acceptForStepFunction(BulkIterationPlanNode.java:137) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:427) > ... 21 more > Caused by: > org.apache.flink.runtime.operators.util.CorruptConfigurationException: > Could not write the user code wrapper class > org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : > java.io.NotSerializableException: org.apache.flink.api.scala.DataSet > at > org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:803) > at > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:305) > ... 26 more > Caused by: java.io.NotSerializableException: > org.apache.flink.api.scala.DataSet > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > aio.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:314) > at > org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:268) > at > org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273) > > > Did I miss something or it is simply not possible? > Thanks! > Cheers, > Max >