thanks for that explanation.

i use json instead of avro should i use the json serialization that
serializes both schema and data, so that the schema travels with the data
from source to sink? so set key.converter.schemas.enable=true and
value.converter.schemas.enable=true?

is it a correct assumption that kafka-connect wouldn't work if i chose the
"raw" json serialization that discards the schema?

On Sun, Jul 9, 2017 at 1:10 PM, Stephen Durfey <sjdur...@gmail.com> wrote:

> I'll try to answer this for you. I'm going to assume you are using the
> pre-packaged kafka connect distro from confluent.
>
> org.apache.kafka.connect.data.Schema is an abstraction of the type
> definition for the data being passed around. How that is defined
> generally falls onto the connector being used. The source connector can
> provide the schema definition information and make it available for the
> sink connector to infer from provided information by the source connector.
> How that is done is up to the connector developer (since, as you mention
> kafka only cares about bytes). I'll use a specific example to highlight
> some of the pieces that play into it.
>
> For instance, the confluent JDBC source connector uses table information
> and dynamically generates the o.a.k.c.d.Schema from that. That definition
> becomes part of the SourceRecord. When the worker goes to serialize that
> payload to send to kafka, it uses a converter class [1]. The specific class
> is defined by 'key.converter' and 'value.converter' for the worker
> definition. The worker calls those specific classes when it needs to
> serialize [2]. This is where the developer can insert logic to inform
> downstream consumers of the schema of the data written to kafka. In the
> pre-packaged distro, it uses the AvroConverter class (also provided by
> confluent) [3]. This class uses custom serializers and deserializers [4] to
> interact with the schema registry. The schema is turned into an Avro Schema
> and registered with the schema registry. The schema registry in
> turn returns an id to use to retrieve the schema at a later time. The id
> is serialized in the front of the bytes being written to kafka. Downstream
> uses can use the custom deserializer to get back to the original message
> generated by the source connector.
>
> I hope this helps.
>
>
> [1]
> https://github.com/apache/kafka/blob/trunk/connect/api/
> src/main/java/org/apache/kafka/connect/storage/Converter.java
>
> [2]
> https://github.com/apache/kafka/blob/41e676d29587042994a72baa5000a8
> 861a075c8c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/
> WorkerSourceTask.java#L182-L183
>
> [3]
> https://github.com/confluentinc/schema-registry/
> blob/master/avro-converter/src/main/java/io/confluent/
> connect/avro/AvroConverter.java
>
> [4]
> https://github.com/confluentinc/schema-registry/
> tree/master/avro-serializer/src/main/java/io/confluent/kafka/serializers
>
> On Sat, Jul 8, 2017 at 8:55 PM, Koert Kuipers <ko...@tresata.com> wrote:
>
> > i see kafka connect invented its own runtime data type system in
> > org.apache.kafka.connect.data
> >
> > however i struggle to understand how this is used. the payload in kafka
> is
> > bytes. kafka does not carry any "schema" metadata. so how does connect
> know
> > what the schema is of a ConnectRecord?
> >
> > if i write json data then perhaps i can see how a schema can be inferred
> > from the data. is this what is happening? does this means the schema
> > inference gets done for every json blob (which seems expensive)?
> >
> > thanks! koert
> >
>

Reply via email to