I'll try to answer this for you. I'm going to assume you are using the pre-packaged kafka connect distro from confluent.
org.apache.kafka.connect.data.Schema is an abstraction of the type definition for the data being passed around. How that is defined generally falls onto the connector being used. The source connector can provide the schema definition information and make it available for the sink connector to infer from provided information by the source connector. How that is done is up to the connector developer (since, as you mention kafka only cares about bytes). I'll use a specific example to highlight some of the pieces that play into it. For instance, the confluent JDBC source connector uses table information and dynamically generates the o.a.k.c.d.Schema from that. That definition becomes part of the SourceRecord. When the worker goes to serialize that payload to send to kafka, it uses a converter class [1]. The specific class is defined by 'key.converter' and 'value.converter' for the worker definition. The worker calls those specific classes when it needs to serialize [2]. This is where the developer can insert logic to inform downstream consumers of the schema of the data written to kafka. In the pre-packaged distro, it uses the AvroConverter class (also provided by confluent) [3]. This class uses custom serializers and deserializers [4] to interact with the schema registry. The schema is turned into an Avro Schema and registered with the schema registry. The schema registry in turn returns an id to use to retrieve the schema at a later time. The id is serialized in the front of the bytes being written to kafka. Downstream uses can use the custom deserializer to get back to the original message generated by the source connector. I hope this helps. [1] https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java [2] https://github.com/apache/kafka/blob/41e676d29587042994a72baa5000a8861a075c8c/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L182-L183 [3] https://github.com/confluentinc/schema-registry/blob/master/avro-converter/src/main/java/io/confluent/connect/avro/AvroConverter.java [4] https://github.com/confluentinc/schema-registry/tree/master/avro-serializer/src/main/java/io/confluent/kafka/serializers On Sat, Jul 8, 2017 at 8:55 PM, Koert Kuipers <ko...@tresata.com> wrote: > i see kafka connect invented its own runtime data type system in > org.apache.kafka.connect.data > > however i struggle to understand how this is used. the payload in kafka is > bytes. kafka does not carry any "schema" metadata. so how does connect know > what the schema is of a ConnectRecord? > > if i write json data then perhaps i can see how a schema can be inferred > from the data. is this what is happening? does this means the schema > inference gets done for every json blob (which seems expensive)? > > thanks! koert >