Hello, I have a few topics that I want to read from Kafka, which consist mainly on a key value pair of: timestamp (key) and value (byte array).
The bite array doesn't really have a class to deserialize from, since the Avro Record we have comes from a "SELECT * FROM..." that selects several SQL tables and in each topic we have that table represented. We're using a GenericRecord, and since we know the structure of the table via the name of the topic we know the column names, like this: genericRecord.get("COLUMN_NAME").toString() Given this, we're now trying to read a Kafka topic using Flink, and we have this: The environment is the StreamExecutionEnvironment and the properties are about the Kafka serialization and deserialization and Kafka and Zookeeper IP addresses. class... DataStream<Object> messageStream = environment .addSource(new FlinkKafkaConsumer010<>(baseTopic, new MyDeserializationSchema(schema), properties)); messageStream.print(); try { environment.execute(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } return false; } } class MyDeserializationSchema<T> implements DeserializationSchema<T> { private static final Logger log = LoggerFactory.getLogger(MyDeserializationSchema.class); private final Class<T> avrotype = (Class<T>) org.apache.avro.generic.GenericRecord.class; private final Schema schema; public MyDeserializationSchema(Schema schema) { this.schema = schema; } @Override public T deserialize(byte[] arg0) throws IOException { log.info("Starting deserialization"); GenericRecord genericRecord; Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs .toBinary(schema); log.info(recordInjection.toString()); genericRecord = recordInjection.invert(arg0).get(); log.info(genericRecord.toString()); return (T) genericRecord; } @Override public boolean isEndOfStream(T nextElement) { return false; } @Override public TypeInformation<T> getProducedType() { return TypeExtractor.getForClass(avrotype); } } Executing this on our server generates the following: [2018-03-02 15:59:37,111] WARN Ignoring configured key DeSerializer (key.deserializer) (org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09) Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the FlinkKafkaConsumer09 is not serializable. The object probably contains or references non serializable fields. at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1460) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1404) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1386) at com.i2s.analytics.flink.executors.LKTreatyExecutor.execute(LKTreatyExecutor.java:153) at com.i2s.analytics.flink.job.DependenciesConsumer.main(DependenciesConsumer.java:66) Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315) at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81) ... 6 more I can't understand why the logs refer to a FlinkKafkaConsumer09 when we're using the FlinkKafkaConsumer010 version. And also, how can we deserialize to a GenericRecord so we can access the record fields like we're doing when we're just reading a Kafka topic without Flink. Thanks in advance for any help that is given to us.