The 010 consumer extends 09, so I'd guess whatever code is reporting sees the FlinkKafkaConsumer010 as its superclass.
I've seen this error a bunch, and it's because MyDeserializationSchema isn't serializable, or likely one of its fields is not serializable, or one of the fields of its fields - you understand, everything in the object graph has to be serializable. Probably the easiest way to understand that is to write a unit test to make sure that MyDeserializationSchema is serializable, essentially a test to make sure ObjectOutputStream.writeObject will work. That's a pretty useful test because you find out if a change to your MyDeserializationSchema will break the runtime during the test phase instead of waiting until you get to the deploy/run stage. On Fri, Mar 2, 2018 at 10:42 AM, Filipe Couto <filipe.cout...@gmail.com> wrote: > Hello, > > I have a few topics that I want to read from Kafka, which consist mainly > on a key value pair of: timestamp (key) and value (byte array). > > The bite array doesn't really have a class to deserialize from, since the > Avro Record we have comes from a "SELECT * FROM..." that selects several > SQL tables and in each topic we have that table represented. > > We're using a GenericRecord, and since we know the structure of the table > via the name of the topic we know the column names, like > this: genericRecord.get("COLUMN_NAME").toString() > > Given this, we're now trying to read a Kafka topic using Flink, and we > have this: > > The environment is the StreamExecutionEnvironment and the properties are > about the Kafka serialization and deserialization and Kafka and Zookeeper > IP addresses. > > class... > > DataStream<Object> messageStream = environment > .addSource(new FlinkKafkaConsumer010<>(baseTopic, new > MyDeserializationSchema(schema), properties)); > > messageStream.print(); > > try { > environment.execute(); > } catch (Exception e) { > // TODO Auto-generated catch block > e.printStackTrace(); > } > > return false; > } > } > > class MyDeserializationSchema<T> implements DeserializationSchema<T> { > private static final Logger log = LoggerFactory.getLogger( > MyDeserializationSchema.class); > > private final Class<T> avrotype = (Class<T>) org.apache.avro.generic. > GenericRecord.class; > private final Schema schema; > public MyDeserializationSchema(Schema schema) { > this.schema = schema; > } > > @Override > public T deserialize(byte[] arg0) throws IOException { > log.info("Starting deserialization"); > GenericRecord genericRecord; > Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs > .toBinary(schema); > log.info(recordInjection.toString()); > genericRecord = recordInjection.invert(arg0).get(); > log.info(genericRecord.toString()); > return (T) genericRecord; > } > > @Override > public boolean isEndOfStream(T nextElement) { > return false; > } > > @Override > public TypeInformation<T> getProducedType() { > return TypeExtractor.getForClass(avrotype); > } > > } > > Executing this on our server generates the following: > > [2018-03-02 15:59:37,111] WARN Ignoring configured key DeSerializer > (key.deserializer) (org.apache.flink.streaming.connectors.kafka. > FlinkKafkaConsumer09) > > Exception in thread "main" > org.apache.flink.api.common.InvalidProgramException: > The implementation of the FlinkKafkaConsumer09 is not serializable. The > object probably contains or references non serializable fields. > at org.apache.flink.api.java.ClosureCleaner.clean( > ClosureCleaner.java:100) > at org.apache.flink.streaming.api.environment. > StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548) > at org.apache.flink.streaming.api.environment. > StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1460) > at org.apache.flink.streaming.api.environment. > StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1404) > at org.apache.flink.streaming.api.environment. > StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1386) > at com.i2s.analytics.flink.executors.LKTreatyExecutor. > execute(LKTreatyExecutor.java:153) > at com.i2s.analytics.flink.job.DependenciesConsumer.main( > DependenciesConsumer.java:66) > Caused by: java.io.NotSerializableException: org.apache.avro.Schema$ > RecordSchema > at java.io.ObjectOutputStream.writeObject0( > ObjectOutputStream.java:1184) > at java.io.ObjectOutputStream.defaultWriteFields( > ObjectOutputStream.java:1548) > at java.io.ObjectOutputStream.writeSerialData( > ObjectOutputStream.java:1509) > at java.io.ObjectOutputStream.writeOrdinaryObject( > ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0( > ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.defaultWriteFields( > ObjectOutputStream.java:1548) > at java.io.ObjectOutputStream.writeSerialData( > ObjectOutputStream.java:1509) > at java.io.ObjectOutputStream.writeOrdinaryObject( > ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0( > ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.defaultWriteFields( > ObjectOutputStream.java:1548) > at java.io.ObjectOutputStream.writeSerialData( > ObjectOutputStream.java:1509) > at java.io.ObjectOutputStream.writeOrdinaryObject( > ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0( > ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject( > ObjectOutputStream.java:348) > at org.apache.flink.util.InstantiationUtil.serializeObject( > InstantiationUtil.java:315) > at org.apache.flink.api.java.ClosureCleaner.clean( > ClosureCleaner.java:81) > ... 6 more > > > I can't understand why the logs refer to a FlinkKafkaConsumer09 when > we're using the FlinkKafkaConsumer010 version. > And also, how can we deserialize to a GenericRecord so we can access the > record fields like we're doing when we're just reading a Kafka topic > without Flink. > > > Thanks in advance for any help that is given to us. > > -- [image: Img] * Gordon Weakliem*| Sr. Software Engineer *O *303.493.5490 * Boulder* | NYC | London <https://twitter.com/sovrnholdings> <https://www.facebook.com/sovrnholdings/> <https://www.linkedin.com/company/3594890/> <https://community.sovrn.com/> CONFIDENTIALITY. This communication is intended only for the use of the intended recipient(s) and contains information that is privileged and confidential. As a recipient of this confidential and proprietary information, you are prohibited from distributing this information outside of sovrn. Further, if you are not the intended recipient, please note that any dissemination of this communication is prohibited. If you have received this communication in error, please erase all copies of the message, including all attachments, and please also notify the sender immediately. Thank you for your cooperation.