just out of curiosity, why does kafka streams not use this runtime data api defined in kafka connect?
On Wed, Jul 26, 2017 at 3:10 AM, Ewen Cheslack-Postava <e...@confluent.io> wrote: > Stephen's explanation is great and accurate :) > > One of the design goals for Kafka Connect was to not rely on any specific > serialization format since that is really orthogonal to getting/sending > data from/to other systems. We define the generic *runtime* data API, which > is what you'll find in the Kafka Connect Java data API. It is intentional > that connectors/tasks only interact with these Java objects and the steps > to convert this to/from the byte[] stored in Kafka is handled independently > by a plugin that the *user* can choose to get different serialization > formats so they can use whatever serialization format they like with any > connector. > > Koert is correct that Kafka itself sticks to opaque byte[] data and has no > understanding of the structure or data itself. Connect and Streams are both > meant to build on top of this raw, low-level functionality and handle some > higher-level functionality. > > -Ewen > > On Mon, Jul 10, 2017 at 8:18 AM, Stephen Durfey <sjdur...@gmail.com> > wrote: > > > Ah, sorry, I have never used the JsonConverter, so didn't know that was > > actually a thing. Looking at the code it looks like the converter can > > handle json with or without the schema [1]. Take a look at the json > > envelope code to get an idea of how the schema is passed along with the > > message (also in the json converter code linked below). Setting those > > configs will enable to schema to travel along with the data. Just make > sure > > those configs are set on both workers, if your sink and source tasks are > in > > different jvms. > > > > [1] > > https://github.com/apache/kafka/blob/trunk/connect/json/ > > src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L299-L321 > > > > On Mon, Jul 10, 2017 at 9:06 AM, Koert Kuipers <ko...@tresata.com> > wrote: > > > > > thanks for that explanation. > > > > > > i use json instead of avro should i use the json serialization that > > > serializes both schema and data, so that the schema travels with the > data > > > from source to sink? so set key.converter.schemas.enable=true and > > > value.converter.schemas.enable=true? > > > > > > is it a correct assumption that kafka-connect wouldn't work if i chose > > the > > > "raw" json serialization that discards the schema? > > > > > > On Sun, Jul 9, 2017 at 1:10 PM, Stephen Durfey <sjdur...@gmail.com> > > wrote: > > > > > > > I'll try to answer this for you. I'm going to assume you are using > the > > > > pre-packaged kafka connect distro from confluent. > > > > > > > > org.apache.kafka.connect.data.Schema is an abstraction of the type > > > > definition for the data being passed around. How that is defined > > > > generally falls onto the connector being used. The source connector > can > > > > provide the schema definition information and make it available for > the > > > > sink connector to infer from provided information by the source > > > connector. > > > > How that is done is up to the connector developer (since, as you > > mention > > > > kafka only cares about bytes). I'll use a specific example to > highlight > > > > some of the pieces that play into it. > > > > > > > > For instance, the confluent JDBC source connector uses table > > information > > > > and dynamically generates the o.a.k.c.d.Schema from that. That > > definition > > > > becomes part of the SourceRecord. When the worker goes to serialize > > that > > > > payload to send to kafka, it uses a converter class [1]. The specific > > > class > > > > is defined by 'key.converter' and 'value.converter' for the worker > > > > definition. The worker calls those specific classes when it needs to > > > > serialize [2]. This is where the developer can insert logic to inform > > > > downstream consumers of the schema of the data written to kafka. In > the > > > > pre-packaged distro, it uses the AvroConverter class (also provided > by > > > > confluent) [3]. This class uses custom serializers and deserializers > > [4] > > > to > > > > interact with the schema registry. The schema is turned into an Avro > > > Schema > > > > and registered with the schema registry. The schema registry in > > > > turn returns an id to use to retrieve the schema at a later time. The > > id > > > > is serialized in the front of the bytes being written to kafka. > > > Downstream > > > > uses can use the custom deserializer to get back to the original > > message > > > > generated by the source connector. > > > > > > > > I hope this helps. > > > > > > > > > > > > [1] > > > > https://github.com/apache/kafka/blob/trunk/connect/api/ > > > > src/main/java/org/apache/kafka/connect/storage/Converter.java > > > > > > > > [2] > > > > https://github.com/apache/kafka/blob/41e676d29587042994a72baa5000a8 > > > > 861a075c8c/connect/runtime/src/main/java/org/apache/ > > > kafka/connect/runtime/ > > > > WorkerSourceTask.java#L182-L183 > > > > > > > > [3] > > > > https://github.com/confluentinc/schema-registry/ > > > > blob/master/avro-converter/src/main/java/io/confluent/ > > > > connect/avro/AvroConverter.java > > > > > > > > [4] > > > > https://github.com/confluentinc/schema-registry/ > > > > tree/master/avro-serializer/src/main/java/io/confluent/ > > kafka/serializers > > > > > > > > On Sat, Jul 8, 2017 at 8:55 PM, Koert Kuipers <ko...@tresata.com> > > wrote: > > > > > > > > > i see kafka connect invented its own runtime data type system in > > > > > org.apache.kafka.connect.data > > > > > > > > > > however i struggle to understand how this is used. the payload in > > kafka > > > > is > > > > > bytes. kafka does not carry any "schema" metadata. so how does > > connect > > > > know > > > > > what the schema is of a ConnectRecord? > > > > > > > > > > if i write json data then perhaps i can see how a schema can be > > > inferred > > > > > from the data. is this what is happening? does this means the > schema > > > > > inference gets done for every json blob (which seems expensive)? > > > > > > > > > > thanks! koert > > > > > > > > > > > > > > >