Hi Ewen,

Sorry but I didn't understand much of that.

I currently have an implementation of the Converter interface that uses Avro's
BinaryEncoder/Decoder, SpecificDatumReader/Writer.

The main mismatch I faced is that I need to use org.apache.avro.Schema for 
serialization whereas the Converter interface requires a 
org.apache.kafka.connect.data.Schema schema.

In the absence of a transformer to interconvert between these Schema 
representations (are any available?) I have, for now, gone for the slightly 
fragile approach of inferring the schema from the topic name (we currently have 
a topic per event type).  This means I ignore the schema parameter in 
fromConnectData and return a null schema in toConnectData.

With this I can create a simple Kafka consumer that correctly reads these 
binary Avro encoded events generated by my Kafka Connect source, once I've set 
the Kafka value.deserializer property to my serializer class which implements 
Deserializer<SpecificRecord>, which in turn (re)uses my Kafka Connect converter 
class internally.

However, I've noticed something odd: the fromConnectData  invocations come in 2 
forms:

1. schema = null, record = null
2. schema = Schema{BYTES}, record = a JSON structure

Schema{BYTES} is, I presume, because I specify Schema.BYTES_SCHEMA as the 4th 
arg to the SourceRecord ctr.

Any idea why form 1 occurs?

Thanks again,
David







-----Original Message-----
From: Ewen Cheslack-Postava [mailto:e...@confluent.io] 
Sent: 07 November 2016 04:35
To: dev@kafka.apache.org
Subject: Re: Kafka Connect key.converter and value.converter properties for 
Avro encoding

You won't be accepting/returning SpecificRecords directly when working with 
Connect's API. Connect intentionally uses an interface that is different from 
Kafka serializers because we deal with structured data that the connectors need 
to be able to understand. We define a generic runtime representation for data 
(under org.apache.kafka.connect.data) and Converters are responsible for taking 
the data all the way through any byte[] -> serialization-specific format (e.g. 
SpecificRecord) -> Connect Data API.

Even though your approach to handling Avro isn't exactly the same, I'd still 
suggest taking a look at our implementation. You'll be able to see how we 
separate this into those two steps, utilizing our normal Avro(De)Serializer to 
do byte[] <-> Avro conversions and then a separate class to do Avro <-> Connect 
Data API conversions. You could probably reuse the Avro <-> Connect Data API 
directly and only use the small bit of code you included for doing the byte[] 
<-> Avro conversion.

re: configure(), yes, it's safe for it to be a noop as long as your Converter 
really doesn't require *any* configuration. But I would guess it at least needs 
to know the SpecificRecord class or schema you are trying to (de)serialize.

-Ewen

On Thu, Nov 3, 2016 at 7:25 AM, <david.frank...@bt.com> wrote:

> Thanks to Gwen and Tommy Baker for their helpful replies.
>
> Currently, the environment I need to work with doesn't use the Schema 
> Registry; hopefully one day it will but for now that's not an option.
> Events are written to Kafka without the schema embedded and each side 
> of the interface assumes a given schema, with the consequent risks accepted.
>
> To serialize a SpecificRecord for the 
> org.apache.kafka.connect.storage.Converter
> interface (in the absence of access to the Confluent implementation
> classes) I was thinking of something along these lines to Avro encode 
> a
> SpecificRecord:
>
>     private byte[] toAvro(Schema schema, SpecificRecord record) throws 
> IOException{
>         SpecificDatumWriter<SpecificRecord> writer = new 
> SpecificDatumWriter<>(schema);
>         ByteArrayOutputStream baos = new ByteArrayOutputStream();
>         BinaryEncoder binaryEncoder = null;
>         binaryEncoder = new EncoderFactory().binaryEncoder(baos,
> binaryEncoder);
>         writer.write(record, binaryEncoder);
>         return baos.toByteArray();
>     }
>
> To work with Kafka Connect I need to comply with the 
> org.apache.kafka.connect.storage .Converter interface The Converter 
> interface defines the following methods:
>
> void configure(Map<String, ?> configs, boolean isKey); byte[] 
> fromConnectData(String topic, Schema schema, Object value); 
> SchemaAndValue toConnectData(String topic, byte[] value);
>
> Is it safe to provide a no-op implementation for configure().
>
> The toConnectData() method will presumably be achieved via a 
> corresponding SpecificDatumReader.
>
> Does this look a reasonable approach?
>
> Many thanks if you've read this far!
>
> Regards,
> David
>
>
> -----Original Message-----
> From: Gwen Shapira [mailto:g...@confluent.io]
> Sent: 02 November 2016 21:18
> To: dev@kafka.apache.org
> Subject: Re: Kafka Connect key.converter and value.converter 
> properties for Avro encoding
>
> Both the Confluent Avro Converter and the Confluent Avro Serializer 
> use the Schema Registry. The reason is, as Tommy Becker mentioned 
> below, to avoid storing the entire schema in each record (which the 
> JSON serializer in Apache Kafka does). It has few other benefits schema 
> validation and such.
>
> If you are interested in trying this approach, you will want to use 
> the Converter, since it was written specifically to integrate with Connect.
> If you prefer another approach, without the Schema Registry, you can 
> write your own Converter - that's why we made them pluggable. Feel 
> free to copy ours and modify it as fits your Avro approach.
>
> Gwen
>
> On Wed, Nov 2, 2016 at 2:48 AM, <david.frank...@bt.com> wrote:
>
> > I am using Kafka Connect in source mode i.e. using it to send events 
> > to Kafka topics.
> >
> > With the key.converter and value.converter properties set to 
> > org.apache.kafka.connect.storage.StringConverter I can attach a 
> > consumer to the topics and see the events in a readable form.  This 
> > is helpful and reassuring but it is not the desired representation 
> > for my downstream consumers - these require the events to be Avro encoded.
> >
> > It seems that to write the events to Kafka Avro encoded, these 
> > properties need to be set to 
> > io.confluent.kafka.serializers.KafkaAvroSerializer.  Is this correct?
> >
> > I am not using the Confluent platform, merely the standard Kafka 10 
> > download, and have been unable to find out how to get at these from 
> > a Maven repository jar.
> > http://docs.confluent.io/3.0.0/app-development.html#java
> > suggest that these are available via:
> >
> >                <dependency>
> >          <groupId>io.confluent</groupId>
> >          <artifactId>kafka-avro-serializer</artifactId>
> >          <version>3.0.0</version>
> >      </dependency>
> >
> > But it doesn't appear to be true.  The class exists in
> > https://raw.githubusercontent.com/confluentinc/schema-
> > registry/master/avro-converter/src/main/java/io/confluent/connect/av
> > ro / AvroConverter.java but this seems to use the Schema Registry 
> > which is something I'd rather avoid.
> >
> > I'd be grateful for any pointers on the simplest way of getting Avro 
> > encoded events written to Kafka from a Kafka Connect source
> connector/task.
> >
> > Also in the task which creates SourceRecords, I'm choosing 
> > Schema.BYTES_SCHEMA for the 4th arg in the constructor.  But I'm not 
> > clear what this achieves - some light shed on that would also be helpful.
> >
> > Many thanks,
> > David
> >
>
>
>
> --
> *Gwen Shapira*
> Product Manager | Confluent
> 650.450.2760 | @gwenshap
> Follow us: Twitter <https://twitter.com/ConfluentInc> | blog < 
> http://www.confluent.io/blog>
>



--
Thanks,
Ewen

Reply via email to