There was a private member variable that was not serializable and was not marked transient. Thanks for the pointer.
On Thu, Feb 23, 2017 at 11:44 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > Thanks for clarifying. > > From the looks of your exception: > > Caused by: java.io.NotSerializableException: >>>>> com.sy.flink.test.Tuple2Serializerr$1 >>>>> at java.io.ObjectOutputStream.wri >>>>> teObject0(ObjectOutputStream.java:1184) >>>>> at java.io.ObjectOutputStream.def >>>>> aultWriteFields(ObjectOutputStream.java:1548) >>>>> >>>> > com.sy.flink.test.Tuple2Serializerr$1: this states that an anonymous > inner class in `Tuple2Serializerr` is not serializable. > > Could you check if that’s the case? > > > > On February 24, 2017 at 3:10:58 PM, Mohit Anchlia (mohitanch...@gmail.com) > wrote: > > But it is not an inner class. > > On Thu, Feb 23, 2017 at 11:09 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org > > wrote: > >> Since I don’t have your complete code, I’m guessing this is the problem: >> Is your `Tuple2Serializer` an inner class? If yes, you should be able to >> solve the problem by declaring `Tuple2Serializer` to be `static`. >> >> This is more of a Java problem - >> It isn’t serializable if it isn’t static, because it will contain an >> implicit reference to the enclosing outer class, and therefore serializing >> it will result in serializing the outer class instance as well. >> >> >> On February 24, 2017 at 2:43:38 PM, Mohit Anchlia (mohitanch...@gmail.com) >> wrote: >> >> This is at high level what I am doing: >> >> Serialize: >> >> String s = tuple.getPos(0) + "," + tuple.getPos(1); >> return s.getBytes() >> >> Deserialize: >> >> String s = new String(message); >> String [] sarr = s.split(","); >> Tuple2<Integer, Integer> tuple = new Tuple2<>(Integer.valueOf(sarr[0]), >> Integer.valueOf(sarr[1])); >> >> return tuple; >> >> >> On Thu, Feb 23, 2017 at 10:22 PM, Tzu-Li (Gordon) Tai < >> tzuli...@apache.org> wrote: >> >>> Hi Mohit, >>> >>> As 刘彪 pointed out in his reply, the problem is that your >>> `Tuple2Serializer` contains fields that are not serializable, so >>> `Tuple2Serializer` itself is not serializable. >>> Could you perhaps share your `Tuple2Serializer` implementation with us >>> so we can pinpoint the problem? >>> >>> A snippet of the class fields and constructor will do, so you don’t have >>> to provide the whole `serialize` / `deserialize` implementation if you >>> don’t want to. >>> >>> Cheers, >>> Gordon >>> >>> >>> On February 24, 2017 at 11:04:34 AM, Mohit Anchlia ( >>> mohitanch...@gmail.com) wrote: >>> >>> I am using String inside to convert into bytes. >>> >>> On Thu, Feb 23, 2017 at 6:50 PM, 刘彪 <mmyy1...@gmail.com> wrote: >>> >>>> Hi Mohit >>>> As you did not give the whole codes of Tuple2Serializerr. I guess the >>>> reason is some fields of Tuple2Serializerr do not implement Serializable. >>>> >>>> 2017-02-24 9:07 GMT+08:00 Mohit Anchlia <mohitanch...@gmail.com>: >>>> >>>>> I wrote a key serialization class to write to kafka however I am >>>>> getting this error. Not sure why as I've already implemented the >>>>> interfaces. >>>>> >>>>> Caused by: java.io.NotSerializableException: >>>>> com.sy.flink.test.Tuple2Serializerr$1 >>>>> at java.io.ObjectOutputStream.wri >>>>> teObject0(ObjectOutputStream.java:1184) >>>>> at java.io.ObjectOutputStream.def >>>>> aultWriteFields(ObjectOutputStream.java:1548) >>>>> >>>>> And the class implements the following: >>>>> >>>>> *public* *class* *Tuple2Serializerr* *implements* >>>>> >>>>> DeserializationSchema<Tuple2<Integer, Integer>>, >>>>> >>>>> SerializationSchema<Tuple2<Integer, Integer>> { >>>>> >>>>> And called like this: >>>>> >>>>> >>>>> FlinkKafkaProducer010<Tuple2<Integer, Integer>> myProducer = *new* >>>>> FlinkKafkaProducer010<Tuple2<Integer, Integer>>( >>>>> >>>>> "10.22.4.15:9092", // broker list >>>>> >>>>> "my-topic", // target topic >>>>> >>>>> *new* Tuple2Serializerr()); // serialization schema >>>>> >>>>> >>>>> >>>>> >>>> >>> >> >