Ah, sorry, I have never used the JsonConverter, so didn't know that was actually a thing. Looking at the code it looks like the converter can handle json with or without the schema [1]. Take a look at the json envelope code to get an idea of how the schema is passed along with the message (also in the json converter code linked below). Setting those configs will enable to schema to travel along with the data. Just make sure those configs are set on both workers, if your sink and source tasks are in different jvms.
[1] https://github.com/apache/kafka/blob/trunk/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L299-L321 On Mon, Jul 10, 2017 at 9:06 AM, Koert Kuipers <ko...@tresata.com> wrote: > thanks for that explanation. > > i use json instead of avro should i use the json serialization that > serializes both schema and data, so that the schema travels with the data > from source to sink? so set key.converter.schemas.enable=true and > value.converter.schemas.enable=true? > > is it a correct assumption that kafka-connect wouldn't work if i chose the > "raw" json serialization that discards the schema? > > On Sun, Jul 9, 2017 at 1:10 PM, Stephen Durfey <sjdur...@gmail.com> wrote: > > > I'll try to answer this for you. I'm going to assume you are using the > > pre-packaged kafka connect distro from confluent. > > > > org.apache.kafka.connect.data.Schema is an abstraction of the type > > definition for the data being passed around. How that is defined > > generally falls onto the connector being used. The source connector can > > provide the schema definition information and make it available for the > > sink connector to infer from provided information by the source > connector. > > How that is done is up to the connector developer (since, as you mention > > kafka only cares about bytes). I'll use a specific example to highlight > > some of the pieces that play into it. > > > > For instance, the confluent JDBC source connector uses table information > > and dynamically generates the o.a.k.c.d.Schema from that. That definition > > becomes part of the SourceRecord. When the worker goes to serialize that > > payload to send to kafka, it uses a converter class [1]. The specific > class > > is defined by 'key.converter' and 'value.converter' for the worker > > definition. The worker calls those specific classes when it needs to > > serialize [2]. This is where the developer can insert logic to inform > > downstream consumers of the schema of the data written to kafka. In the > > pre-packaged distro, it uses the AvroConverter class (also provided by > > confluent) [3]. This class uses custom serializers and deserializers [4] > to > > interact with the schema registry. The schema is turned into an Avro > Schema > > and registered with the schema registry. The schema registry in > > turn returns an id to use to retrieve the schema at a later time. The id > > is serialized in the front of the bytes being written to kafka. > Downstream > > uses can use the custom deserializer to get back to the original message > > generated by the source connector. > > > > I hope this helps. > > > > > > [1] > > https://github.com/apache/kafka/blob/trunk/connect/api/ > > src/main/java/org/apache/kafka/connect/storage/Converter.java > > > > [2] > > https://github.com/apache/kafka/blob/41e676d29587042994a72baa5000a8 > > 861a075c8c/connect/runtime/src/main/java/org/apache/ > kafka/connect/runtime/ > > WorkerSourceTask.java#L182-L183 > > > > [3] > > https://github.com/confluentinc/schema-registry/ > > blob/master/avro-converter/src/main/java/io/confluent/ > > connect/avro/AvroConverter.java > > > > [4] > > https://github.com/confluentinc/schema-registry/ > > tree/master/avro-serializer/src/main/java/io/confluent/kafka/serializers > > > > On Sat, Jul 8, 2017 at 8:55 PM, Koert Kuipers <ko...@tresata.com> wrote: > > > > > i see kafka connect invented its own runtime data type system in > > > org.apache.kafka.connect.data > > > > > > however i struggle to understand how this is used. the payload in kafka > > is > > > bytes. kafka does not carry any "schema" metadata. so how does connect > > know > > > what the schema is of a ConnectRecord? > > > > > > if i write json data then perhaps i can see how a schema can be > inferred > > > from the data. is this what is happening? does this means the schema > > > inference gets done for every json blob (which seems expensive)? > > > > > > thanks! koert > > > > > >