It actually is possible to do so if you adapt the Connect Converter API to streams. There are a couple of good reasons why we shouldn't require everyone to just use the same schema:
1. Efficiency Connect favors a little bit of inefficiency (translating byte[] -> serialization runtime format -> Connect runtime format, or vice versa) to get more flexibility for serialization formats and reuse of code. This is a good choice for Connect, but for Streams apps may or may not be worth the overhead of doing an additional transformation. 2. API familiarity Most users of Connect wouldn't even have to be aware of the Connect data API. However, they are likely already familiar with the APIs used for their serialization format (whether that's POJOs for Jackson, convenient SpecificRecords in Avro, or something else). Streams has the flexibility to deal with whatever serialization format you want (just as Connect can, simply in different ways). This lets people work with (and specifically *code against*) a format and APIs they are already comfortable with. 3. Impedance mismatch One drawback it is important to think about when considering an API that can safely be converted into a bunch of different formats is that there can be some data or precision loss. Maybe Connect doesn't have a way to express UUIDs (something that came up recently in JIRAs) or maybe it doesn't have a type that corresponds to a type in your serialization library. In that case, not passing through a generic translation layer is a *good* thing, since it preserves as much information as possible. If you like the generic Connect data API and it satisfies your needs, then I'd highly encourage you to write a simple Kafka Streams serde that uses Converters! For all intents and purposes, it is no different than any other serialization library/format! -Ewen On Wed, Jul 26, 2017 at 6:11 AM, Koert Kuipers <ko...@tresata.com> wrote: > just out of curiosity, why does kafka streams not use this runtime data api > defined in kafka connect? > > On Wed, Jul 26, 2017 at 3:10 AM, Ewen Cheslack-Postava <e...@confluent.io> > wrote: > > > Stephen's explanation is great and accurate :) > > > > One of the design goals for Kafka Connect was to not rely on any specific > > serialization format since that is really orthogonal to getting/sending > > data from/to other systems. We define the generic *runtime* data API, > which > > is what you'll find in the Kafka Connect Java data API. It is intentional > > that connectors/tasks only interact with these Java objects and the steps > > to convert this to/from the byte[] stored in Kafka is handled > independently > > by a plugin that the *user* can choose to get different serialization > > formats so they can use whatever serialization format they like with any > > connector. > > > > Koert is correct that Kafka itself sticks to opaque byte[] data and has > no > > understanding of the structure or data itself. Connect and Streams are > both > > meant to build on top of this raw, low-level functionality and handle > some > > higher-level functionality. > > > > -Ewen > > > > On Mon, Jul 10, 2017 at 8:18 AM, Stephen Durfey <sjdur...@gmail.com> > > wrote: > > > > > Ah, sorry, I have never used the JsonConverter, so didn't know that was > > > actually a thing. Looking at the code it looks like the converter can > > > handle json with or without the schema [1]. Take a look at the json > > > envelope code to get an idea of how the schema is passed along with the > > > message (also in the json converter code linked below). Setting those > > > configs will enable to schema to travel along with the data. Just make > > sure > > > those configs are set on both workers, if your sink and source tasks > are > > in > > > different jvms. > > > > > > [1] > > > https://github.com/apache/kafka/blob/trunk/connect/json/ > > > src/main/java/org/apache/kafka/connect/json/ > JsonConverter.java#L299-L321 > > > > > > On Mon, Jul 10, 2017 at 9:06 AM, Koert Kuipers <ko...@tresata.com> > > wrote: > > > > > > > thanks for that explanation. > > > > > > > > i use json instead of avro should i use the json serialization that > > > > serializes both schema and data, so that the schema travels with the > > data > > > > from source to sink? so set key.converter.schemas.enable=true and > > > > value.converter.schemas.enable=true? > > > > > > > > is it a correct assumption that kafka-connect wouldn't work if i > chose > > > the > > > > "raw" json serialization that discards the schema? > > > > > > > > On Sun, Jul 9, 2017 at 1:10 PM, Stephen Durfey <sjdur...@gmail.com> > > > wrote: > > > > > > > > > I'll try to answer this for you. I'm going to assume you are using > > the > > > > > pre-packaged kafka connect distro from confluent. > > > > > > > > > > org.apache.kafka.connect.data.Schema is an abstraction of the type > > > > > definition for the data being passed around. How that is defined > > > > > generally falls onto the connector being used. The source connector > > can > > > > > provide the schema definition information and make it available for > > the > > > > > sink connector to infer from provided information by the source > > > > connector. > > > > > How that is done is up to the connector developer (since, as you > > > mention > > > > > kafka only cares about bytes). I'll use a specific example to > > highlight > > > > > some of the pieces that play into it. > > > > > > > > > > For instance, the confluent JDBC source connector uses table > > > information > > > > > and dynamically generates the o.a.k.c.d.Schema from that. That > > > definition > > > > > becomes part of the SourceRecord. When the worker goes to serialize > > > that > > > > > payload to send to kafka, it uses a converter class [1]. The > specific > > > > class > > > > > is defined by 'key.converter' and 'value.converter' for the worker > > > > > definition. The worker calls those specific classes when it needs > to > > > > > serialize [2]. This is where the developer can insert logic to > inform > > > > > downstream consumers of the schema of the data written to kafka. In > > the > > > > > pre-packaged distro, it uses the AvroConverter class (also provided > > by > > > > > confluent) [3]. This class uses custom serializers and > deserializers > > > [4] > > > > to > > > > > interact with the schema registry. The schema is turned into an > Avro > > > > Schema > > > > > and registered with the schema registry. The schema registry in > > > > > turn returns an id to use to retrieve the schema at a later time. > The > > > id > > > > > is serialized in the front of the bytes being written to kafka. > > > > Downstream > > > > > uses can use the custom deserializer to get back to the original > > > message > > > > > generated by the source connector. > > > > > > > > > > I hope this helps. > > > > > > > > > > > > > > > [1] > > > > > https://github.com/apache/kafka/blob/trunk/connect/api/ > > > > > src/main/java/org/apache/kafka/connect/storage/Converter.java > > > > > > > > > > [2] > > > > > https://github.com/apache/kafka/blob/ > 41e676d29587042994a72baa5000a8 > > > > > 861a075c8c/connect/runtime/src/main/java/org/apache/ > > > > kafka/connect/runtime/ > > > > > WorkerSourceTask.java#L182-L183 > > > > > > > > > > [3] > > > > > https://github.com/confluentinc/schema-registry/ > > > > > blob/master/avro-converter/src/main/java/io/confluent/ > > > > > connect/avro/AvroConverter.java > > > > > > > > > > [4] > > > > > https://github.com/confluentinc/schema-registry/ > > > > > tree/master/avro-serializer/src/main/java/io/confluent/ > > > kafka/serializers > > > > > > > > > > On Sat, Jul 8, 2017 at 8:55 PM, Koert Kuipers <ko...@tresata.com> > > > wrote: > > > > > > > > > > > i see kafka connect invented its own runtime data type system in > > > > > > org.apache.kafka.connect.data > > > > > > > > > > > > however i struggle to understand how this is used. the payload in > > > kafka > > > > > is > > > > > > bytes. kafka does not carry any "schema" metadata. so how does > > > connect > > > > > know > > > > > > what the schema is of a ConnectRecord? > > > > > > > > > > > > if i write json data then perhaps i can see how a schema can be > > > > inferred > > > > > > from the data. is this what is happening? does this means the > > schema > > > > > > inference gets done for every json blob (which seems expensive)? > > > > > > > > > > > > thanks! koert > > > > > > > > > > > > > > > > > > > > >