Hi,

I am getting NoSerializableException in this class-



public class RecordsFilterer<T extends GenericRecord> {

    public DataSet<Tuple2<Boolean,T>> addFilterFlag(DataSet<T>
dataset, DataSet<String> filteredIds, String fieldName) {
        return dataset.coGroup(filteredIds)
                .where(new KeySelector<T, String>() {
                    @Override
                    public String getKey(T t) throws Exception {
                        String s = (String) t.get(fieldName);
                        return s != null ? s : UUID.randomUUID().toString();
                    }
                })
                .equalTo((KeySelector<String, String>) s -> s)
                .with(new CoGroupFunction<T, String, Tuple2<Boolean,T>>() {
                    @Override
                    public void coGroup(Iterable<T> records,
Iterable<String> ids,
                                        Collector<Tuple2<Boolean,T>>
collector) throws Exception {
                        boolean filterFlag = false;
                        for (String id : ids) {
                            filterFlag = true;
                        }

                        for (T record : records) {
                            collector.collect(new Tuple2<>(filterFlag, record));
                        }
                    }
                });

    }
}


What I am trying to do is write a generic code that will join Avro records
(of different types) with String records and there is a match add a filter
flag. This way I can use the same code for different Avro record types. But
I am getting this exception-

Exception in thread "main" org.apache.flink.optimizer.CompilerException:
Error translating node 'Map "Key Extractor" : MAP [[ GlobalProperties
[partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null,
grouped=null, unique=null] ]]': Could not write the user code wrapper class
org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
java.io.NotSerializableException:
com.styleseat.dataplatform.etl.jobs.boundary.RecordsFilterer
    at
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:386)
    at
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:109)
    at
org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198)
    at
org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163)
    at
org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
    at
org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199)
    at
org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128)
    at
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:188)
    at
org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:187)
    at
org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:90)
    at
org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
    at
com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.runSearchLogProcessor(RunSearchLogProcessorV2.java:57)
    at
com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.main(RunSearchLogProcessorV2.java:32)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by:
org.apache.flink.runtime.operators.util.CorruptConfigurationException:
Could not write the user code wrapper class
org.apache.flink.api.common.operators.util.UserCodeObjectWrapper :
java.io.NotSerializableException: RecordsFilterer
    at
org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275)
    at
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:843)
    at
org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:331)
    ... 17 more
Caused by: java.io.NotSerializableException: RecordsFilterer
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
    at
org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:252)
    at
org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273)
    ... 19 more


Please help me understand why I get this exception and how to fix it
[rewrite code may be?]

Thanks,
Tarandeep

Reply via email to