Hi,
I tried to read avro (RecordSchema) data from Kafka using the flink-kafka
connector but I have problems:
Exception says at program startup:
Caused by: java.io.NotSerializableException:
org.apache.avro.Schema$RecordSchema
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
I know RecordSchema is not serializable so It's ok but how to add
serializer for RecordSchema?
My Flink initialization:
LocalStreamEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment();
env.addSource(new KafkaSource("localhost:2181", "neverwinter", new
MyDeserializer())).print();
The deserializer:
public class MyDeserializer implements DeserializationSchema<String>,
SerializationSchema<String, byte[]>{
private static final long serialVersionUID = -8314881700393464119L;
private static final EncoderFactory avroEncoderFactory =
EncoderFactory.get();
private Schema _schema;
public MyDeserializer(){
System.out.println("Creating MyDeserializer");
Schema.Parser parser = new Schema.Parser();
try {
InputStream is =
getClass().getResourceAsStream("/avro_schema.json");
if (is != null){
_schema = parser.parse(is);
}else{
System.out.println("Unable to load schema file!");
}
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
public TypeInformation<String> getProducedType() {
return TypeExtractor.getForClass(String.class);
}
public String deserialize(byte[] message) {
String data = null;
try {
DatumReader<GenericRecord> reader = new
GenericDatumReader<GenericRecord>(_schema);
Decoder decoder = DecoderFactory.get().binaryDecoder(message,
null);
GenericRecord result = reader.read(null, decoder);
AvroKafkaData ad = new
AvroKafkaData((Integer)result.get("id"),(Integer)result.get("random"),String.valueOf(result.get("data")));
System.out.println("Read kafka data: " + data);
data = ad.toString();
} catch (IOException e) {
throw new RuntimeException(e);
}
return data;
}
public boolean isEndOfStream(String nextElement) {
return false;
}
public byte[] serialize(String element) {
System.out.println("Serializing element = " + element);
byte[] data = null;
try {
GenericDatumWriter writer = new GenericDatumWriter(_schema);
ByteArrayOutputStream stream = new ByteArrayOutputStream();
DatumReader<GenericRecord> reader=new
GenericDatumReader<GenericRecord>(_schema);
Decoder decoder=DecoderFactory.get().jsonDecoder(_schema,
element);
GenericRecord r=reader.read(null,decoder);
BinaryEncoder binaryEncoder =
avroEncoderFactory.binaryEncoder(stream, null);
writer.write(r, binaryEncoder);
binaryEncoder.flush();
IOUtils.closeStream(stream);
data = stream.toByteArray();
} catch (IOException e) {
throw new RuntimeException(e);
}
return data;
}
}
Unfortunately as I see only the constructor of MySerializer is called.
Can somebody could suggest something?
Thanks,
Ferenc
--
Kind Regards,
Ferenc