Hi, I tried to read avro (RecordSchema) data from Kafka using the flink-kafka connector but I have problems:
Exception says at program startup: Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) I know RecordSchema is not serializable so It's ok but how to add serializer for RecordSchema? My Flink initialization: LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); env.addSource(new KafkaSource("localhost:2181", "neverwinter", new MyDeserializer())).print(); The deserializer: public class MyDeserializer implements DeserializationSchema<String>, SerializationSchema<String, byte[]>{ private static final long serialVersionUID = -8314881700393464119L; private static final EncoderFactory avroEncoderFactory = EncoderFactory.get(); private Schema _schema; public MyDeserializer(){ System.out.println("Creating MyDeserializer"); Schema.Parser parser = new Schema.Parser(); try { InputStream is = getClass().getResourceAsStream("/avro_schema.json"); if (is != null){ _schema = parser.parse(is); }else{ System.out.println("Unable to load schema file!"); } } catch (IOException e) { e.printStackTrace(); throw new RuntimeException(e); } } public TypeInformation<String> getProducedType() { return TypeExtractor.getForClass(String.class); } public String deserialize(byte[] message) { String data = null; try { DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(_schema); Decoder decoder = DecoderFactory.get().binaryDecoder(message, null); GenericRecord result = reader.read(null, decoder); AvroKafkaData ad = new AvroKafkaData((Integer)result.get("id"),(Integer)result.get("random"),String.valueOf(result.get("data"))); System.out.println("Read kafka data: " + data); data = ad.toString(); } catch (IOException e) { throw new RuntimeException(e); } return data; } public boolean isEndOfStream(String nextElement) { return false; } public byte[] serialize(String element) { System.out.println("Serializing element = " + element); byte[] data = null; try { GenericDatumWriter writer = new GenericDatumWriter(_schema); ByteArrayOutputStream stream = new ByteArrayOutputStream(); DatumReader<GenericRecord> reader=new GenericDatumReader<GenericRecord>(_schema); Decoder decoder=DecoderFactory.get().jsonDecoder(_schema, element); GenericRecord r=reader.read(null,decoder); BinaryEncoder binaryEncoder = avroEncoderFactory.binaryEncoder(stream, null); writer.write(r, binaryEncoder); binaryEncoder.flush(); IOUtils.closeStream(stream); data = stream.toByteArray(); } catch (IOException e) { throw new RuntimeException(e); } return data; } } Unfortunately as I see only the constructor of MySerializer is called. Can somebody could suggest something? Thanks, Ferenc -- Kind Regards, Ferenc