Nope, I think there is neither a fix nor an open issue for this right now.

On Mon, 13 Jun 2016 at 11:31 Maximilian Michels <m...@apache.org> wrote:

> Is there an issue or a fix for proper use of the ClojureCleaner in
> CoGroup.where()?
>
> On Fri, Jun 10, 2016 at 8:07 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
> > Hi,
> > yes, I was talking about a Flink bug. I forgot to mention the work-around
> > that Stephan mentioned.
> >
> > On Thu, 9 Jun 2016 at 20:38 Stephan Ewen <se...@apache.org> wrote:
> >>
> >> You can also make the KeySelector a static inner class. That should work
> >> as well.
> >>
> >> On Thu, Jun 9, 2016 at 7:00 PM, Tarandeep Singh <tarand...@gmail.com>
> >> wrote:
> >>>
> >>> Thank you Aljoscha and Fabian for your replies.
> >>>
> >>> @Aljoscha: when you said "cleaner is not used in CoGroup.where(). I'm
> >>> afraid this is a bug", I am assuming you are referring to Flink engine
> >>> itself.
> >>>
> >>> @Fabian: thanks for the optimization tip.
> >>>
> >>> This is how I have got it working (with a hack): In my dataset, the
> join
> >>> field/key can be null otherwise .where(fieldName) works and I don't get
> >>> not-serializable exception. So I applied a MapFunction to DataSet and
> put a
> >>> dummy value in the join field/key where it was null. Then In the join
> >>> function, I change it back to null.
> >>>
> >>> Best,
> >>> Tarandeep
> >>>
> >>> On Thu, Jun 9, 2016 at 7:06 AM, Aljoscha Krettek <aljos...@apache.org>
> >>> wrote:
> >>>>
> >>>> Hi,
> >>>> the problem is that the KeySelector is an anonymous inner class and as
> >>>> such as a reference to the outer RecordFilterer object. Normally,
> this would
> >>>> be rectified by the closure cleaner but the cleaner is not used in
> >>>> CoGroup.where(). I'm afraid this is a bug.
> >>>>
> >>>> Best,
> >>>> Aljoscha
> >>>>
> >>>>
> >>>> On Thu, 9 Jun 2016 at 14:06 Fabian Hueske <fhue...@gmail.com> wrote:
> >>>>>
> >>>>> Hi Tarandeep,
> >>>>>
> >>>>> the exception suggests that Flink tries to serialize RecordsFilterer
> as
> >>>>> a user function (this happens via Java Serialization).
> >>>>> I said suggests because the code that uses RecordsFilterer is not
> >>>>> included.
> >>>>>
> >>>>> To me it looks like RecordsFilterer should not be used as a user
> >>>>> function. It is a helper class to construct a DataSet program, so it
> should
> >>>>> not be shipped for execution.
> >>>>> You would use such a class as follows:
> >>>>>
> >>>>> DataSet<T> records = ...
> >>>>> DataSet<String> filterIDs = ...
> >>>>>
> >>>>> RecordsFilterer rf = new RecordsFilterer();
> >>>>> DataSet<Tuple2<Boolean, T>> result = rf.addFilterFlag(records,
> >>>>> filterIDs, "myField");
> >>>>>
> >>>>> Regarding the join code, I would suggest an optimization.
> >>>>> Instead of using CoGroup, I would use distinct and an OuterJoin like
> >>>>> this:
> >>>>>
> >>>>> DataSet<String> distIds = filtereredIds.distinct();
> >>>>> DataSet<Tuple2<Boolean, T> result = records
> >>>>>   .leftOuterJoin(distIds)
> >>>>>   .where(KEYSELECTOR)
> >>>>>   .equalTo("*") // use full string as key
> >>>>>   .with(JOINFUNC) // set Bool to false if right == null, true
> otherwise
> >>>>>
> >>>>> Best, Fabian
> >>>>>
> >>>>> 2016-06-09 2:28 GMT+02:00 Tarandeep Singh <tarand...@gmail.com>:
> >>>>>>
> >>>>>> Hi,
> >>>>>>
> >>>>>> I am getting NoSerializableException in this class-
> >>>>>>
> >>>>>> 
> >>>>>>
> >>>>>> public class RecordsFilterer<T extends GenericRecord> {
> >>>>>>
> >>>>>>     public DataSet<Tuple2<Boolean,T>> addFilterFlag(DataSet<T>
> >>>>>> dataset, DataSet<String> filteredIds, String fieldName) {
> >>>>>>         return dataset.coGroup(filteredIds)
> >>>>>>                 .where(new KeySelector<T, String>() {
> >>>>>>                     @Override
> >>>>>>                     public String getKey(T t) throws Exception {
> >>>>>>                         String s = (String) t.get(fieldName);
> >>>>>>                         return s != null ? s :
> >>>>>> UUID.randomUUID().toString();
> >>>>>>                     }
> >>>>>>                 })
> >>>>>>                 .equalTo((KeySelector<String, String>) s -> s)
> >>>>>>                 .with(new CoGroupFunction<T, String,
> >>>>>> Tuple2<Boolean,T>>() {
> >>>>>>                     @Override
> >>>>>>                     public void coGroup(Iterable<T> records,
> >>>>>> Iterable<String> ids,
> >>>>>>                                         Collector<Tuple2<Boolean,T>>
> >>>>>> collector) throws Exception {
> >>>>>>                         boolean filterFlag = false;
> >>>>>>                         for (String id : ids) {
> >>>>>>                             filterFlag = true;
> >>>>>>                         }
> >>>>>>
> >>>>>>                         for (T record : records) {
> >>>>>>                             collector.collect(new
> Tuple2<>(filterFlag,
> >>>>>> record));
> >>>>>>                         }
> >>>>>>                     }
> >>>>>>                 });
> >>>>>>
> >>>>>>     }
> >>>>>> }
> >>>>>>
> >>>>>>
> >>>>>> What I am trying to do is write a generic code that will join Avro
> >>>>>> records (of different types) with String records and there is a
> match add a
> >>>>>> filter flag. This way I can use the same code for different Avro
> record
> >>>>>> types. But I am getting this exception-
> >>>>>>
> >>>>>> Exception in thread "main"
> >>>>>> org.apache.flink.optimizer.CompilerException: Error translating
> node 'Map
> >>>>>> "Key Extractor" : MAP [[ GlobalProperties
> [partitioning=RANDOM_PARTITIONED]
> >>>>>> ]] [[ LocalProperties [ordering=null, grouped=null, unique=null]
> ]]': Could
> >>>>>> not write the user code wrapper class
> >>>>>> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
> >>>>>> java.io.NotSerializableException:
> >>>>>> com.styleseat.dataplatform.etl.jobs.boundary.RecordsFilterer
> >>>>>>     at
> >>>>>>
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:386)
> >>>>>>     at
> >>>>>>
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:109)
> >>>>>>     at
> >>>>>>
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)
> >>>>>>     at
> >>>>>>
> org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
> >>>>>>     at
> >>>>>>
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
> >>>>>>     at
> >>>>>>
> org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
> >>>>>>     at
> >>>>>>
> org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
> >>>>>>     at
> >>>>>>
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:188)
> >>>>>>     at
> >>>>>>
> org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:187)
> >>>>>>     at
> >>>>>>
> org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:90)
> >>>>>>     at
> >>>>>>
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
> >>>>>>     at
> >>>>>>
> com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.runSearchLogProcessor(RunSearchLogProcessorV2.java:57)
> >>>>>>     at
> >>>>>>
> com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.main(RunSearchLogProcessorV2.java:32)
> >>>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >>>>>>     at
> >>>>>>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >>>>>>     at
> >>>>>>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >>>>>>     at java.lang.reflect.Method.invoke(Method.java:497)
> >>>>>>     at
> >>>>>> com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> >>>>>> Caused by:
> >>>>>>
> org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could
> >>>>>> not write the user code wrapper class
> >>>>>> org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
> >>>>>> java.io.NotSerializableException: RecordsFilterer
> >>>>>>     at
> >>>>>>
> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275)
> >>>>>>     at
> >>>>>>
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:843)
> >>>>>>     at
> >>>>>>
> org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:331)
> >>>>>>     ... 17 more
> >>>>>> Caused by: java.io.NotSerializableException: RecordsFilterer
> >>>>>>     at
> >>>>>>
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> >>>>>>     at
> >>>>>>
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> >>>>>>     at
> >>>>>>
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> >>>>>>     at
> >>>>>>
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> >>>>>>     at
> >>>>>>
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> >>>>>>     at
> >>>>>>
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> >>>>>>     at
> >>>>>>
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> >>>>>>     at
> >>>>>>
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> >>>>>>     at
> >>>>>>
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> >>>>>>     at
> >>>>>>
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> >>>>>>     at
> >>>>>>
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> >>>>>>     at
> >>>>>>
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> >>>>>>     at
> >>>>>>
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> >>>>>>     at
> >>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> >>>>>>     at
> >>>>>>
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
> >>>>>>     at
> >>>>>>
> org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:252)
> >>>>>>     at
> >>>>>>
> org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)
> >>>>>>     ... 19 more
> >>>>>>
> >>>>>>
> >>>>>> Please help me understand why I get this exception and how to fix it
> >>>>>> [rewrite code may be?]
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Tarandeep
> >>>>>>
> >>>>>>
> >>>>>
> >>>
> >>
> >
>

Reply via email to