Hi, I am getting NoSerializableException in this class-
public class RecordsFilterer<T extends GenericRecord> { public DataSet<Tuple2<Boolean,T>> addFilterFlag(DataSet<T> dataset, DataSet<String> filteredIds, String fieldName) { return dataset.coGroup(filteredIds) .where(new KeySelector<T, String>() { @Override public String getKey(T t) throws Exception { String s = (String) t.get(fieldName); return s != null ? s : UUID.randomUUID().toString(); } }) .equalTo((KeySelector<String, String>) s -> s) .with(new CoGroupFunction<T, String, Tuple2<Boolean,T>>() { @Override public void coGroup(Iterable<T> records, Iterable<String> ids, Collector<Tuple2<Boolean,T>> collector) throws Exception { boolean filterFlag = false; for (String id : ids) { filterFlag = true; } for (T record : records) { collector.collect(new Tuple2<>(filterFlag, record)); } } }); } } What I am trying to do is write a generic code that will join Avro records (of different types) with String records and there is a match add a filter flag. This way I can use the same code for different Avro record types. But I am getting this exception- Exception in thread "main" org.apache.flink.optimizer.CompilerException: Error translating node 'Map "Key Extractor" : MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]': Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: com.styleseat.dataplatform.etl.jobs.boundary.RecordsFilterer at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:386) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:109) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:198) at org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) at org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:128) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:188) at org.apache.flink.client.LocalExecutor.executePlan(LocalExecutor.java:187) at org.apache.flink.api.java.LocalEnvironment.execute(LocalEnvironment.java:90) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855) at com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.runSearchLogProcessor(RunSearchLogProcessorV2.java:57) at com.styleseat.dataplatform.etl.jobs.search.RunSearchLogProcessorV2.main(RunSearchLogProcessorV2.java:32) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not write the user code wrapper class org.apache.flink.api.common.operators.util.UserCodeObjectWrapper : java.io.NotSerializableException: RecordsFilterer at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:275) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.createSingleInputVertex(JobGraphGenerator.java:843) at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.preVisit(JobGraphGenerator.java:331) ... 17 more Caused by: java.io.NotSerializableException: RecordsFilterer at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300) at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:252) at org.apache.flink.runtime.operators.util.TaskConfig.setStubWrapper(TaskConfig.java:273) ... 19 more Please help me understand why I get this exception and how to fix it [rewrite code may be?] Thanks, Tarandeep