No clue. I used the current branch aka 0.9-SNAPSHOT. Or is this something related to Scala?
On Mon, Jun 22, 2015 at 4:45 PM, Stephan Ewen <se...@apache.org> wrote: > Actually, the closure cleaner is supposed to take care of the "anonymous > inner class" situation. > > Did you deactivate that one, by any chance? > > On Mon, Jun 15, 2015 at 5:31 PM, Maximilian Alber < > alber.maximil...@gmail.com> wrote: > >> Hi everyone! >> Thanks! It seems the variable that makes the problems. Making an inner >> class solved the issue. >> Cheers, >> Max >> >> On Mon, Jun 15, 2015 at 2:58 PM, Kruse, Sebastian <sebastian.kr...@hpi.de >> > wrote: >> >>> Hi everyone, >>> >>> >>> >>> I did not reenact it, but I think the problem here is rather the >>> anonymous class. It looks like it is created within a class, not an object. >>> Thus it is not “static” in Java terms, which means that also its >>> surrounding class (the job class) will be serialized. And in this job >>> class, there seems to be a DataSet field, that cannot be serialized. >>> >>> >>> >>> If that really is the problem, you should either define your anonymous >>> class within the companion object of your job class or resort directly to a >>> function (and make sure that you do not pass a variable from your job class >>> into the scope of the function). >>> >>> >>> >>> Cheers, >>> >>> Sebastian >>> >>> >>> >>> *From:* Till Rohrmann [mailto:trohrm...@apache.org] >>> *Sent:* Montag, 15. Juni 2015 14:16 >>> *To:* user@flink.apache.org >>> *Subject:* Re: Random Selection >>> >>> >>> >>> Hi Max, >>> >>> the problem is that you’re trying to serialize the companion object of >>> scala.util.Random. Try to create an instance of the scala.util.Random >>> class and use this instance within your RIchFilterFunction to generate >>> the random numbers. >>> >>> Cheers, >>> Till >>> >>> On Mon, Jun 15, 2015 at 1:56 PM Maximilian Alber >>> alber.maximil...@gmail.com <http://mailto:alber.maximil...@gmail.com> >>> wrote: >>> >>> Hi Flinksters, >>> >>> I would like to randomly choose a element of my data set. But somehow I >>> cannot use scala.util inside my filter functions: >>> >>> val sample_x = X filter(new RichFilterFunction[Vector](){ >>> var i: Int = -1 >>> >>> override def open(config: Configuration) = { >>> i = scala.util.Random.nextInt(N) >>> } >>> def filter(a: Vector) = a.id == i >>> }) >>> val sample_y = Y filter(new RichFilterFunction[Vector](){ >>> def filter(a: Vector) = a.id == scala.util.Random.nextInt(N) >>> }) >>> >>> That's the error I get: >>> >>> Exception in thread "main" org.apache.flink.optimizer.CompilerException: >>> An error occurred while translating the optimized plan to a nephele >>> JobGraph: Error translating node 'Filter "Filter at >>> Test$anonfun$10.apply(test.scala:276)" : FLAT_MAP [[ GlobalProperties >>> [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, >>> grouped=null, unique=null] ]]': Could not write the user code wrapper class >>> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : >>> java.io.NotSerializableException: org.apache.flink.api.scala.DataSet >>> at >>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:578) >>> at >>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:103) >>> at >>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:205) >>> at >>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) >>> at >>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) >>> at >>> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127) >>> at >>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:170) >>> at >>> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:176) >>> at >>> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:54) >>> at >>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789) >>> at >>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576) >>> at org.apache.flink.api.scala.DataSet.collect(DataSet.scala:544) >>> at Test$delayedInit$body.apply(test.scala:304) >>> at scala.Function0$class.apply$mcV$sp(Function0.scala:40) >>> at >>> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) >>> at scala.App$anonfun$main$1.apply(App.scala:71) >>> at scala.App$anonfun$main$1.apply(App.scala:71) >>> at scala.collection.immutable.List.foreach(List.scala:318) >>> at >>> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32) >>> at scala.App$class.main(App.scala:71) >>> at Test$.main(test.scala:45) >>> at Test.main(test.scala) >>> Caused by: org.apache.flink.optimizer.CompilerException: Error >>> translating node 'Filter "Filter at Test$anonfun$10.apply(test.scala:276)" >>> : FLAT_MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ >>> LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not >>> write the user code wrapper class >>> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : >>> java.io.NotSerializableException: org.apache.flink.api.scala.DataSet >>> at >>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:360) >>> at >>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:103) >>> at >>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198) >>> at >>> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:202) >>> at >>> org.apache.flink.optimizer.plan.BulkIterationPlanNode.acceptForStepFunction(BulkIterationPlanNode.java:137) >>> at >>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.postVisit(JobGraphGenerator.java:427) >>> ... 21 more >>> Caused by: >>> org.apache.flink.runtime.operators.util.CorruptConfigurationException: >>> Could not write the user code wrapper class >>> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : >>> java.io.NotSerializableException: org.apache.flink.api.scala.DataSet >>> at >>> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275) >>> at >>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:803) >>> at >>> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:305) >>> ... 26 more >>> Caused by: java.io.NotSerializableException: >>> org.apache.flink.api.scala.DataSet >>> at >>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) >>> at >>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) >>> at >>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) >>> at >>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) >>> aio.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) >>> at >>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) >>> at >>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) >>> at >>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) >>> at >>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) >>> at >>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) >>> at >>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) >>> at >>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) >>> at >>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) >>> at >>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) >>> at >>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) >>> at >>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) >>> at >>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) >>> at >>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) >>> at >>> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:314) >>> at >>> org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:268) >>> at >>> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273) >>> >>> Did I miss something or it is simply not possible? >>> >>> Thanks! >>> >>> Cheers, >>> >>> Max >>> >>> >>> >> >> >