Hi, yes, the Avro Schema is not serializable. Can you make the "_schema" field "transient" and then lazily initialize the field when serialize()/deserialize() is called? That way, you initialize the schema on the cluster, so there is no need to transfer it over the network.
I think Flink's own serialization stack should also be able to handle Avro types with Kafka. I'm trying to get the required tooling into Flink 0.10-SNAPSHOT. Let me know if you need more help. Best, Robert On Sat, Aug 29, 2015 at 11:38 AM, Ferenc Turi <turifs...@gmail.com> wrote: > Hi, > > I tried to read avro (RecordSchema) data from Kafka using the flink-kafka > connector but I have problems: > > Exception says at program startup: > > Caused by: java.io.NotSerializableException: > org.apache.avro.Schema$RecordSchema > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > > I know RecordSchema is not serializable so It's ok but how to add > serializer for RecordSchema? > > My Flink initialization: > > LocalStreamEnvironment env = > StreamExecutionEnvironment.createLocalEnvironment(); > env.addSource(new KafkaSource("localhost:2181", "neverwinter", new > MyDeserializer())).print(); > > The deserializer: > > public class MyDeserializer implements DeserializationSchema<String>, > SerializationSchema<String, byte[]>{ > private static final long serialVersionUID = -8314881700393464119L; > private static final EncoderFactory avroEncoderFactory = > EncoderFactory.get(); > private Schema _schema; > > public MyDeserializer(){ > System.out.println("Creating MyDeserializer"); > Schema.Parser parser = new Schema.Parser(); > try { > InputStream is = > getClass().getResourceAsStream("/avro_schema.json"); > if (is != null){ > _schema = parser.parse(is); > }else{ > System.out.println("Unable to load schema file!"); > } > } catch (IOException e) { > e.printStackTrace(); > throw new RuntimeException(e); > } > } > > public TypeInformation<String> getProducedType() { > return TypeExtractor.getForClass(String.class); > } > > public String deserialize(byte[] message) { > String data = null; > try { > DatumReader<GenericRecord> reader = new > GenericDatumReader<GenericRecord>(_schema); > Decoder decoder = DecoderFactory.get().binaryDecoder(message, > null); > GenericRecord result = reader.read(null, decoder); > AvroKafkaData ad = new > AvroKafkaData((Integer)result.get("id"),(Integer)result.get("random"),String.valueOf(result.get("data"))); > System.out.println("Read kafka data: " + data); > data = ad.toString(); > } catch (IOException e) { > throw new RuntimeException(e); > } > return data; > } > > public boolean isEndOfStream(String nextElement) { > return false; > } > > public byte[] serialize(String element) { > System.out.println("Serializing element = " + element); > byte[] data = null; > try { > GenericDatumWriter writer = new > GenericDatumWriter(_schema); > > ByteArrayOutputStream stream = new ByteArrayOutputStream(); > > DatumReader<GenericRecord> reader=new > GenericDatumReader<GenericRecord>(_schema); > Decoder decoder=DecoderFactory.get().jsonDecoder(_schema, > element); > > GenericRecord r=reader.read(null,decoder); > > BinaryEncoder binaryEncoder = > avroEncoderFactory.binaryEncoder(stream, null); > > writer.write(r, binaryEncoder); > binaryEncoder.flush(); > IOUtils.closeStream(stream); > > data = stream.toByteArray(); > } catch (IOException e) { > throw new RuntimeException(e); > } > return data; > } > > } > > Unfortunately as I see only the constructor of MySerializer is called. > > Can somebody could suggest something? > > Thanks, > > Ferenc > > > > -- > Kind Regards, > > Ferenc > > >