Yes, the confluent SerDe's support nested avro records. Underneath the covers they are using avro classes (DatumReader and DatumWriter) to carry out those operations. So, as long as you're sending valid avro data to be produced or consumed, the confluent SerDe's will handle it just fine.
________________________________ From: Kidong Lee <mykid...@gmail.com> Sent: Thursday, August 17, 2017 11:56:16 PM To: users@kafka.apache.org Subject: Re: Avro With Kafka You can send avro record to kafka and consume it without schema registry. In my approach, avro schema file avsc must be in the classpath on both producer and consumer side. On producer side, first write value avro serializer and set the properties of key.serializer and value.serializer to kafka producer configuration. For instance, the following class is avro serializer for value: import domain.Event; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumWriter; import org.apache.avro.io.Encoder; import org.apache.avro.io.EncoderFactory; import org.apache.kafka.common.serialization.Serializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.Map; /** * Created by mykidong on 2016-05-17. */ public class KafkaAvroEventSerializer implements Serializer<Event> { private static Logger log = LoggerFactory.getLogger(KafkaAvroEventSerializer.class); private Schema schema; @Override public void configure(Map<String, ?> map, boolean b) { // get avro avsc schema path from kafka configuration. String avroSchemaPath = (String) map.get("event.avro.schema.path"); Schema.Parser parser = new Schema.Parser(); try { // construct avro schema instance from the classpath. schema = parser.parse(getClass().getResourceAsStream(avroSchemaPath)); }catch (IOException e) { throw new RuntimeException(e); } } @Override public byte[] serialize(String s, Event event) { try { GenericRecord datum = new GenericData.Record(schema); datum.put("eventType", event.getEventType()); datum.put("value", event.getValue()); ByteArrayOutputStream out = new ByteArrayOutputStream(); DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema); Encoder encoder = EncoderFactory.get().binaryEncoder(out, null); writer.write(datum, encoder); encoder.flush(); byte[] avroBytes = out.toByteArray(); out.close(); return avroBytes; } catch (Exception e) { throw new RuntimeException(e); } } @Override public void close() { } } Kafka producer will send Event which should be replaced with your message: Properties kafkaProp = new Properties(); .. kafkaProp.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); // the value avro serializer written in the above. kafkaProp.put("value.serializer", "your.package.KafkaAvroEventSerializer"); .. producer = new KafkaProducer<>(kafkaProp); producer.send(new ProducerRecord<Integer, Event>(eventType, event)); On consumer side, avro schema instance should be cached, because the messages consumed from kafka must be deserialized, which costs some latency. Avro schema instance can be constructed from the classpath and mapped with event type key like this: import api.dao.AvroSchemaRegistryDao; import org.apache.avro.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; public class MapAvroSchemaRegistryDao implements AvroSchemaRegistryDao, InitializingBean { private static Logger log = LoggerFactory.getLogger(MapAvroSchemaRegistryDao.class); private final Object LOCK = new Object(); private ConcurrentMap<String, Schema> schemaMap = new ConcurrentHashMap<>(); private Properties eventTypeProps; public void setEventTypeProps(Properties eventTypeProps) { this.eventTypeProps = eventTypeProps; } @Override public void afterPropertiesSet() throws Exception { for(String eventType : eventTypeProps.stringPropertyNames()) { String schemaPath = eventTypeProps.getProperty(eventType); Schema.Parser parser = new Schema.Parser(); try { Schema schema = parser.parse(getClass().getResourceAsStream(schemaPath)); schemaMap.put(eventType, schema); log.info("loaded avro schema " + eventType); }catch (Exception e) { log.error("load fail avro schema " + eventType); throw new RuntimeException(e); } } } @Override public Schema getSchema(String eventType) { return schemaMap.get(eventType); } @Override public void update(String eventType, String jsonSchema) { synchronized (LOCK) { Schema.Parser parser = new Schema.Parser(); try { Schema schema = parser.parse(jsonSchema); schemaMap.put(eventType, schema); } catch (Exception e) { throw new RuntimeException(e); } } } } Avro Deserialize Service Implementation: import ieiot.api.dao.AvroSchemaRegistryDao; import ieiot.api.service.AvroDeserializeService; import ieiot.domain.avro.DeserializedEvent; import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; import java.util.List; /** * Created by mykidong on 2016-05-19. */ public class AvroDeserializeServiceImpl implements AvroDeserializeService { private static Logger log = LoggerFactory.getLogger(AvroDeserializeServiceImpl.class); private AvroSchemaRegistryDao avroSchemaRegistryDao; // MapAvroSchemaRegistryDao above injected. public void setAvroSchemaRegistryDao(AvroSchemaRegistryDao avroSchemaRegistryDao) { this.avroSchemaRegistryDao = avroSchemaRegistryDao; } /* Kafka Message Deserialize */ @Override public GenericRecord deserializeAvro(String eventType, byte[] avroBytes) { // get Avro schema instance by MapAvroSchemaRegistryDao. Schema schema = this.avroSchemaRegistryDao.getSchema(eventType); DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema); Decoder decoder = DecoderFactory.get().binaryDecoder(avroBytes, null); try { GenericRecord genericRecord = reader.read(null, decoder); return genericRecord; }catch (Exception e) { throw new RuntimeException(e); } } } Kafka Consumer consumes avro bytes from kafka: Properties props = new Properties(); props.put("bootstrap.servers", "xxx"); props.put("group.id", "xxx"); ... props.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); ... KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(props); consumer.subscribe(topics); while(true) { ConsumerRecords<Integer, byte[]> records = consumer.poll(this.timeout); for (ConsumerRecord<Integer, byte[]> record : records) { // deserialize avro bytes to GenericRecord, but you can get your own message type with modifying deserialize service implementation. GenericRecord genericRecord = avroDeserializeService.deserializeAvro(topic, value); ... } } In my case, Avro bytes consumed from kafka are deserialized to GenericRecord, but you can get your own message type with modifying deserialize service implementation. I had also questions about if schema registry from confluent should be used or not. I have written and used classpath schema registry like the above or the schema registry using consul. Good luck. Cheers, - Kidong. 2017-08-17 3:51 GMT+09:00 Nishanth S <nishanth.2...@gmail.com>: > Hello, > We are investigating on ingesting avro records to kafka using avro kafka > serializer. Our schemas are nested and are of type record .Does the current > avro kafka serializer support avro record type ?. > > If not is there a way to ingest records and consume using a consumer > without using avro kafka serializer.Is the avro serializer component part > of apache kafka or confuent?. > I did not see a way to use this serializer without using schema registry. > > Thanks, > Nishanth >