It turn out that our flink branch is out-of-date. Sorry for all the noise. :)
Regards, Kien Sent from TypeApp On Dec 20, 2017, 16:42, at 16:42, Kien Truong <duckientru...@gmail.com> wrote: >Upon further investigation, we found out that the reason: > >* The cluster was started on YARN with the hadoop classpath, which >includes Avro. Therefore, Avro's SpecificRecord class was loaded using >sun.misc.Launcher$AppClassLoader > > >* Our LteSession class was submitted with the application jar, and >loaded with the child-first classloader > >* Flink check if LteSession is assignable to SpecificRecord, which >fails. > >* Flink fall back to Reflection-based avro writer, which throws NPE on >null field. > >If we change the classloader to parent-first, everything is ok. Now the >question is why the default doesn't work for us. > >Best regards, >Kien > >Sent from TypeApp > >On Dec 20, 2017, 14:09, at 14:09, Kien Truong <duckientru...@gmail.com> >wrote: >>Hi, >> >>After upgrading to Flink 1.4, we encounter this exception >> >>Caused by: java.lang.NullPointerException: in >>com.viettel.big4g.avro.LteSession in long null of long in field tmsi >of >>com.viettel.big4g.avro.LteSession >>at >>org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:161) >>at >>org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62) >>at >>org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:132) >>at >>org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:176) >>at >>org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:48) >>at >>org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) >>at >>org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:93) >>at >>org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:114) >>at >>org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:89) >>at >>org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:84) >>at >>org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:102) >> >> >>It seems Flink attempts to use the reflection writer instead of the >>specific writer for this schema. This is wrong, because our LteSession >>is an Avro object, and should use the specific writer. >> >>Best regards, >>Kien >> >>Sent from TypeApp