I'll try to answer this for you. I'm going to assume you are using the
pre-packaged kafka connect distro from confluent.

org.apache.kafka.connect.data.Schema is an abstraction of the type
definition for the data being passed around. How that is defined
generally falls onto the connector being used. The source connector can
provide the schema definition information and make it available for the
sink connector to infer from provided information by the source connector.
How that is done is up to the connector developer (since, as you mention
kafka only cares about bytes). I'll use a specific example to highlight
some of the pieces that play into it.

For instance, the confluent JDBC source connector uses table information
and dynamically generates the o.a.k.c.d.Schema from that. That definition
becomes part of the SourceRecord. When the worker goes to serialize that
payload to send to kafka, it uses a converter class [1]. The specific class
is defined by 'key.converter' and 'value.converter' for the worker
definition. The worker calls those specific classes when it needs to
serialize [2]. This is where the developer can insert logic to inform
downstream consumers of the schema of the data written to kafka. In the
pre-packaged distro, it uses the AvroConverter class (also provided by
confluent) [3]. This class uses custom serializers and deserializers [4] to
interact with the schema registry. The schema is turned into an Avro Schema
and registered with the schema registry. The schema registry in
turn returns an id to use to retrieve the schema at a later time. The id
is serialized in the front of the bytes being written to kafka. Downstream
uses can use the custom deserializer to get back to the original message
generated by the source connector.

I hope this helps.


[1]
https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java

[2]
https://github.com/apache/kafka/blob/41e676d29587042994a72baa5000a8861a075c8c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L182-L183

[3]
https://github.com/confluentinc/schema-registry/blob/master/avro-converter/src/main/java/io/confluent/connect/avro/AvroConverter.java

[4]
https://github.com/confluentinc/schema-registry/tree/master/avro-serializer/src/main/java/io/confluent/kafka/serializers

On Sat, Jul 8, 2017 at 8:55 PM, Koert Kuipers <ko...@tresata.com> wrote:

> i see kafka connect invented its own runtime data type system in
> org.apache.kafka.connect.data
>
> however i struggle to understand how this is used. the payload in kafka is
> bytes. kafka does not carry any "schema" metadata. so how does connect know
> what the schema is of a ConnectRecord?
>
> if i write json data then perhaps i can see how a schema can be inferred
> from the data. is this what is happening? does this means the schema
> inference gets done for every json blob (which seems expensive)?
>
> thanks! koert
>

Reply via email to