Phew, thanks for letting us know! 😃 And yes, there were some problems with Avro and class loading but I was hoping that we got them all before Flink 1.4.0.
Best, Aljoscha > On 20. Dec 2017, at 10:54, Kien Truong <duckientru...@gmail.com> wrote: > > It turn out that our flink branch is out-of-date. Sorry for all the noise. :) > > Regards, > Kien > > Sent from TypeApp <http://www.typeapp.com/r?b=11479> > On Dec 20, 2017, at 16:42, Kien Truong <duckientru...@gmail.com > <mailto:duckientru...@gmail.com>> wrote: > Upon further investigation, we found out that the reason: > > * The cluster was started on YARN with the hadoop classpath, which includes > Avro. Therefore, Avro's SpecificRecord class was loaded using > sun.misc.Launcher$AppClassLoader > > > * Our LteSession class was submitted with the application jar, and loaded > with the child-first classloader > > * Flink check if LteSession is assignable to SpecificRecord, which fails. > > * Flink fall back to Reflection-based avro writer, which throws NPE on null > field. > > If we change the classloader to parent-first, everything is ok. Now the > question is why the default doesn't work for us. > > Best regards, > Kien > > Sent from TypeApp <http://www.typeapp.com/r?b=11479> > On Dec 20, 2017, at 14:09, Kien Truong < duckientru...@gmail.com > <mailto:duckientru...@gmail.com>> wrote: > Hi, > > After upgrading to Flink 1.4, we encounter this exception > > Caused by: java.lang.NullPointerException: in > com.viettel.big4g.avro.LteSession in long null of long in field tmsi of > com.viettel.big4g.avro.LteSession > at > org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:161) > at > org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62) > at > org.apache.flink.formats.avro.typeutils.AvroSerializer.serialize(AvroSerializer.java:132) > > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:176) > > at > org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:48) > > at > org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54) > > at <http://org.apache.flink.runtime.io/>org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io/>.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:93) > > at <http://org.apache.flink.runtime.io/>org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io/>.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:114) > > at <http://org.apache.flink.runtime.io/>org.apache.flink.runtime.io > <http://org.apache.flink.runtime.io/>.network.api.writer.RecordWriter.emit(RecordWriter.java:89) > > at > <http://org.apache.flink.streaming.runtime.io/>org.apache.flink.streaming.runtime.io > > <http://org.apache.flink.streaming.runtime.io/>.StreamRecordWriter.emit(StreamRecordWriter.java:84) > > at > <http://org.apache.flink.streaming.runtime.io/>org.apache.flink.streaming.runtime.io > > <http://org.apache.flink.streaming.runtime.io/>.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:102) > > > > It seems Flink attempts to use the reflection writer instead of the specific > writer for this schema. This is wrong, because our LteSession is an Avro > object, and should use the specific writer. > > Best regards, > Kien > > Sent from TypeApp <http://www.typeapp.com/r?b=11479>