Stephen's explanation is great and accurate :)

One of the design goals for Kafka Connect was to not rely on any specific
serialization format since that is really orthogonal to getting/sending
data from/to other systems. We define the generic *runtime* data API, which
is what you'll find in the Kafka Connect Java data API. It is intentional
that connectors/tasks only interact with these Java objects and the steps
to convert this to/from the byte[] stored in Kafka is handled independently
by a plugin that the *user* can choose to get different serialization
formats so they can use whatever serialization format they like with any
connector.

Koert is correct that Kafka itself sticks to opaque byte[] data and has no
understanding of the structure or data itself. Connect and Streams are both
meant to build on top of this raw, low-level functionality and handle some
higher-level functionality.

-Ewen

On Mon, Jul 10, 2017 at 8:18 AM, Stephen Durfey <sjdur...@gmail.com> wrote:

> Ah, sorry, I have never used the JsonConverter, so didn't know that was
> actually a thing. Looking at the code it looks like the converter can
> handle json with or without the schema [1]. Take a look at the json
> envelope code to get an idea of how the schema is passed along with the
> message (also in the json converter code linked below). Setting those
> configs will enable to schema to travel along with the data. Just make sure
> those configs are set on both workers, if your sink and source tasks are in
> different jvms.
>
> [1]
> https://github.com/apache/kafka/blob/trunk/connect/json/
> src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L299-L321
>
> On Mon, Jul 10, 2017 at 9:06 AM, Koert Kuipers <ko...@tresata.com> wrote:
>
> > thanks for that explanation.
> >
> > i use json instead of avro should i use the json serialization that
> > serializes both schema and data, so that the schema travels with the data
> > from source to sink? so set key.converter.schemas.enable=true and
> > value.converter.schemas.enable=true?
> >
> > is it a correct assumption that kafka-connect wouldn't work if i chose
> the
> > "raw" json serialization that discards the schema?
> >
> > On Sun, Jul 9, 2017 at 1:10 PM, Stephen Durfey <sjdur...@gmail.com>
> wrote:
> >
> > > I'll try to answer this for you. I'm going to assume you are using the
> > > pre-packaged kafka connect distro from confluent.
> > >
> > > org.apache.kafka.connect.data.Schema is an abstraction of the type
> > > definition for the data being passed around. How that is defined
> > > generally falls onto the connector being used. The source connector can
> > > provide the schema definition information and make it available for the
> > > sink connector to infer from provided information by the source
> > connector.
> > > How that is done is up to the connector developer (since, as you
> mention
> > > kafka only cares about bytes). I'll use a specific example to highlight
> > > some of the pieces that play into it.
> > >
> > > For instance, the confluent JDBC source connector uses table
> information
> > > and dynamically generates the o.a.k.c.d.Schema from that. That
> definition
> > > becomes part of the SourceRecord. When the worker goes to serialize
> that
> > > payload to send to kafka, it uses a converter class [1]. The specific
> > class
> > > is defined by 'key.converter' and 'value.converter' for the worker
> > > definition. The worker calls those specific classes when it needs to
> > > serialize [2]. This is where the developer can insert logic to inform
> > > downstream consumers of the schema of the data written to kafka. In the
> > > pre-packaged distro, it uses the AvroConverter class (also provided by
> > > confluent) [3]. This class uses custom serializers and deserializers
> [4]
> > to
> > > interact with the schema registry. The schema is turned into an Avro
> > Schema
> > > and registered with the schema registry. The schema registry in
> > > turn returns an id to use to retrieve the schema at a later time. The
> id
> > > is serialized in the front of the bytes being written to kafka.
> > Downstream
> > > uses can use the custom deserializer to get back to the original
> message
> > > generated by the source connector.
> > >
> > > I hope this helps.
> > >
> > >
> > > [1]
> > > https://github.com/apache/kafka/blob/trunk/connect/api/
> > > src/main/java/org/apache/kafka/connect/storage/Converter.java
> > >
> > > [2]
> > > https://github.com/apache/kafka/blob/41e676d29587042994a72baa5000a8
> > > 861a075c8c/connect/runtime/src/main/java/org/apache/
> > kafka/connect/runtime/
> > > WorkerSourceTask.java#L182-L183
> > >
> > > [3]
> > > https://github.com/confluentinc/schema-registry/
> > > blob/master/avro-converter/src/main/java/io/confluent/
> > > connect/avro/AvroConverter.java
> > >
> > > [4]
> > > https://github.com/confluentinc/schema-registry/
> > > tree/master/avro-serializer/src/main/java/io/confluent/
> kafka/serializers
> > >
> > > On Sat, Jul 8, 2017 at 8:55 PM, Koert Kuipers <ko...@tresata.com>
> wrote:
> > >
> > > > i see kafka connect invented its own runtime data type system in
> > > > org.apache.kafka.connect.data
> > > >
> > > > however i struggle to understand how this is used. the payload in
> kafka
> > > is
> > > > bytes. kafka does not carry any "schema" metadata. so how does
> connect
> > > know
> > > > what the schema is of a ConnectRecord?
> > > >
> > > > if i write json data then perhaps i can see how a schema can be
> > inferred
> > > > from the data. is this what is happening? does this means the schema
> > > > inference gets done for every json blob (which seems expensive)?
> > > >
> > > > thanks! koert
> > > >
> > >
> >
>

Reply via email to