Nope, I think there is neither a fix nor an open issue for this right now. On Mon, 13 Jun 2016 at 11:31 Maximilian Michels <m...@apache.org> wrote:
> Is there an issue or a fix for proper use of the ClojureCleaner in > CoGroup.where()? > > On Fri, Jun 10, 2016 at 8:07 AM, Aljoscha Krettek <aljos...@apache.org> > wrote: > > Hi, > > yes, I was talking about a Flink bug. I forgot to mention the work-around > > that Stephan mentioned. > > > > On Thu, 9 Jun 2016 at 20:38 Stephan Ewen <se...@apache.org> wrote: > >> > >> You can also make the KeySelector a static inner class. That should work > >> as well. > >> > >> On Thu, Jun 9, 2016 at 7:00 PM, Tarandeep Singh <tarand...@gmail.com> > >> wrote: > >>> > >>> Thank you Aljoscha and Fabian for your replies. > >>> > >>> @Aljoscha: when you said "cleaner is not used in CoGroup.where(). I'm > >>> afraid this is a bug", I am assuming you are referring to Flink engine > >>> itself. > >>> > >>> @Fabian: thanks for the optimization tip. > >>> > >>> This is how I have got it working (with a hack): In my dataset, the > join > >>> field/key can be null otherwise .where(fieldName) works and I don't get > >>> not-serializable exception. So I applied a MapFunction to DataSet and > put a > >>> dummy value in the join field/key where it was null. Then In the join > >>> function, I change it back to null. > >>> > >>> Best, > >>> Tarandeep > >>> > >>> On Thu, Jun 9, 2016 at 7:06 AM, Aljoscha Krettek <aljos...@apache.org> > >>> wrote: > >>>> > >>>> Hi, > >>>> the problem is that the KeySelector is an anonymous inner class and as > >>>> such as a reference to the outer RecordFilterer object. Normally, > this would > >>>> be rectified by the closure cleaner but the cleaner is not used in > >>>> CoGroup.where(). I'm afraid this is a bug. > >>>> > >>>> Best, > >>>> Aljoscha > >>>> > >>>> > >>>> On Thu, 9 Jun 2016 at 14:06 Fabian Hueske <fhue...@gmail.com> wrote: > >>>>> > >>>>> Hi Tarandeep, > >>>>> > >>>>> the exception suggests that Flink tries to serialize RecordsFilterer > as > >>>>> a user function (this happens via Java Serialization). > >>>>> I said suggests because the code that uses RecordsFilterer is not > >>>>> included. > >>>>> > >>>>> To me it looks like RecordsFilterer should not be used as a user > >>>>> function. It is a helper class to construct a DataSet program, so it > should > >>>>> not be shipped for execution. > >>>>> You would use such a class as follows: > >>>>> > >>>>> DataSet<T> records = ... > >>>>> DataSet<String> filterIDs = ... > >>>>> > >>>>> RecordsFilterer rf = new RecordsFilterer(); > >>>>> DataSet<Tuple2<Boolean, T>> result = rf.addFilterFlag(records, > >>>>> filterIDs, "myField"); > >>>>> > >>>>> Regarding the join code, I would suggest an optimization. > >>>>> Instead of using CoGroup, I would use distinct and an OuterJoin like > >>>>> this: > >>>>> > >>>>> DataSet<String> distIds = filtereredIds.distinct(); > >>>>> DataSet<Tuple2<Boolean, T> result = records > >>>>> .leftOuterJoin(distIds) > >>>>> .where(KEYSELECTOR) > >>>>> .equalTo("*") // use full string as key > >>>>> .with(JOINFUNC) // set Bool to false if right == null, true > otherwise > >>>>> > >>>>> Best, Fabian > >>>>> > >>>>> 2016-06-09 2:28 GMT+02:00 Tarandeep Singh <tarand...@gmail.com>: > >>>>>> > >>>>>> Hi, > >>>>>> > >>>>>> I am getting NoSerializableException in this class- > >>>>>> > >>>>>> > >>>>>> > >>>>>> public class RecordsFilterer<T extends GenericRecord> { > >>>>>> > >>>>>> public DataSet<Tuple2<Boolean,T>> addFilterFlag(DataSet<T> > >>>>>> dataset, DataSet<String> filteredIds, String fieldName) { > >>>>>> return dataset.coGroup(filteredIds) > >>>>>> .where(new KeySelector<T, String>() { > >>>>>> @Override > >>>>>> public String getKey(T t) throws Exception { > >>>>>> String s = (String) t.get(fieldName); > >>>>>> return s != null ? s : > >>>>>> UUID.randomUUID().toString(); > >>>>>> } > >>>>>> }) > >>>>>> .equalTo((KeySelector<String, String>) s -> s) > >>>>>> .with(new CoGroupFunction<T, String, > >>>>>> Tuple2<Boolean,T>>() { > >>>>>> @Override > >>>>>> public void coGroup(Iterable<T> records, > >>>>>> Iterable<String> ids, > >>>>>> Collector<Tuple2<Boolean,T>> > >>>>>> collector) throws Exception { > >>>>>> boolean filterFlag = false; > >>>>>> for (String id : ids) { > >>>>>> filterFlag = true; > >>>>>> } > >>>>>> > >>>>>> for (T record : records) { > >>>>>> collector.collect(new > Tuple2<>(filterFlag, > >>>>>> record)); > >>>>>> } > >>>>>> } > >>>>>> }); > >>>>>> > >>>>>> } > >>>>>> } > >>>>>> > >>>>>> > >>>>>> What I am trying to do is write a generic code that will join Avro > >>>>>> records (of different types) with String records and there is a > match add a > >>>>>> filter flag. This way I can use the same code for different Avro > record > >>>>>> types. But I am getting this exception- > >>>>>> > >>>>>> Exception in thread "main" > >>>>>> org.apache.flink.optimizer.CompilerException: Error translating > node 'Map > >>>>>> "Key Extractor" : MAP [[ GlobalProperties > [partitioning=RANDOM_PARTITIONED] > >>>>>> ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] > ]]': Could > >>>>>> not write the user code wrapper class > >>>>>> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : > >>>>>> java.io.NotSerializableException: > >>>>>> com.styleseat.dataplatform.etl.jobs.boundary.RecordsFilterer > >>>>>> at > >>>>>> > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:386) > >>>>>> at > >>>>>> > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:109) > >>>>>> at > >>>>>> > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198) > >>>>>> at > >>>>>> > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163) > >>>>>> at > >>>>>> > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > >>>>>> at > >>>>>> > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > >>>>>> at > >>>>>> > org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128) > >>>>>> at > >>>>>> > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:188) > >>>>>> at > >>>>>> > org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:187) > >>>>>> at > >>>>>> > org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:90) > >>>>>> at > >>>>>> > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855) > >>>>>> at > >>>>>> > com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.runSearchLogProcessor(RunSearchLogProcessorV2.java:57) > >>>>>> at > >>>>>> > com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.main(RunSearchLogProcessorV2.java:32) > >>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > >>>>>> at > >>>>>> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > >>>>>> at > >>>>>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >>>>>> at java.lang.reflect.Method.invoke(Method.java:497) > >>>>>> at > >>>>>> com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) > >>>>>> Caused by: > >>>>>> > org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could > >>>>>> not write the user code wrapper class > >>>>>> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : > >>>>>> java.io.NotSerializableException: RecordsFilterer > >>>>>> at > >>>>>> > org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275) > >>>>>> at > >>>>>> > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:843) > >>>>>> at > >>>>>> > org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:331) > >>>>>> ... 17 more > >>>>>> Caused by: java.io.NotSerializableException: RecordsFilterer > >>>>>> at > >>>>>> > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) > >>>>>> at > >>>>>> > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > >>>>>> at > >>>>>> > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > >>>>>> at > >>>>>> > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > >>>>>> at > >>>>>> > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > >>>>>> at > >>>>>> > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > >>>>>> at > >>>>>> > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > >>>>>> at > >>>>>> > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > >>>>>> at > >>>>>> > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > >>>>>> at > >>>>>> > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > >>>>>> at > >>>>>> > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > >>>>>> at > >>>>>> > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > >>>>>> at > >>>>>> > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > >>>>>> at > >>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > >>>>>> at > >>>>>> > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300) > >>>>>> at > >>>>>> > org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:252) > >>>>>> at > >>>>>> > org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273) > >>>>>> ... 19 more > >>>>>> > >>>>>> > >>>>>> Please help me understand why I get this exception and how to fix it > >>>>>> [rewrite code may be?] > >>>>>> > >>>>>> Thanks, > >>>>>> Tarandeep > >>>>>> > >>>>>> > >>>>> > >>> > >> > > >