Let me add my view on #2 in less delicate terms than Guozhang did :)

When you¹re trying to run Kafka as a service, having to care about the
format of the message sucks. I have plenty of users who are just fine
using the Avro standard and play nice. Then I have a bunch of users who
don¹t want to use Avro and want to do something else (json, some plain
text, whatever). Then I have a bunch of users who use Avro but don¹t
properly register their schemas. Then I have a bunch of users who do
whatever they want and don¹t tell us.

What this means is that I can¹t have standard tooling, like auditing, that
works on the entire system. I either have to whitelist or blacklist
topics, and then I run into problems when someone adds something new
either way. It would be preferable if I could monitor and maintain the
health of the system without having to worry about the message format.

-Todd


On 10/15/14, 10:50 AM, "Guozhang Wang" <wangg...@gmail.com> wrote:

>Thanks Joe,
>
>I think we now have a few open questions to discuss around this topic:
>
>1. Shall we make core Kafka properties as first class fields in message
>header or put them as tags?
>
>The pros of the first approach is more compacted format and hence less
>message header overhead; the cons are that any future message header
>change
>needs protocol bump and possible multi-versioned handling on the server
>side.
>
>Vice versa for the second approach.
>
>2. Shall we leave app properties still in message content and enforce
>schema based topics or make them as extensible tags?
>
>The pros of the first approach is again saving message header overhead for
>apps properties; and the cons are that it enforce schema usage for message
>content to be partially de-serialized only for app header. At LinkedIn we
>enforce Avro schemas for auditing purposes, and as a result the Kafka team
>has to manage the schema registration process / schema repository as well.
>
>3. Which properties should be core KAFKA and which should be app
>properties? For example, shall we make properties that only MM cares about
>as app properties or Kafka properties?
>
>Guozhang
>
>On Tue, Oct 14, 2014 at 5:10 AM, Joe Stein <joe.st...@stealth.ly> wrote:
>
>> I think we could add schemaId(binary) to the MessageAndMetaData
>>
>> With the schemaId you can implement different downstream software
>>pattern
>> on the messages reliably. I wrote up more thoughts on this use
>> https://cwiki.apache.org/confluence/display/KAFKA/Schema+based+topics it
>> should strive to encompass all implementation needs for producer,
>>broker,
>> consumer hooks.
>>
>> So if the application and tagged fields are important you can package
>>that
>> into a specific Kafka topic plug-in and assign it to topic(s).  Kafka
>> server should be able to validate your expected formats (like
>> encoders/decoders but in broker by topic regardless of producer) to the
>> topics that have it enabled. We should have these maintained in the
>>project
>> under contrib.
>>
>> =- Joestein
>>
>> On Mon, Oct 13, 2014 at 11:02 PM, Guozhang Wang <wangg...@gmail.com>
>> wrote:
>>
>> > Hi Jay,
>> >
>> > Thanks for the comments. Replied inline.
>> >
>> > Guozhang
>> >
>> > On Mon, Oct 13, 2014 at 11:11 AM, Jay Kreps <jay.kr...@gmail.com>
>>wrote:
>> >
>> > > I need to take more time to think about this. Here are a few
>> off-the-cuff
>> > > remarks:
>> > >
>> > > - To date we have tried really, really hard to keep the data model
>>for
>> > > message simple since after all you can always add whatever you like
>> > inside
>> > > the message body.
>> > >
>> > > - For system tags, why not just make these fields first class
>>fields in
>> > > message? The purpose of a system tag is presumably that Why have a
>> bunch
>> > of
>> > > key-value pairs versus first-class fields?
>> > >
>> >
>> > Yes, we can alternatively make system tags as first class fields in
>>the
>> > message header to make the format / processing logic simpler.
>> >
>> > The main reasons I put them as systems tags are 1) when I think about
>> these
>> > possible system tags, some of them are for all types of messages (e.g.
>> > timestamps), but some of them may be for a specific type of message
>> > (compressed, control message) and for those not all of them are
>> necessarily
>> > required all the time, hence making them as compact tags may save us
>>some
>> > space when not all of them are available; 2) with tags we do not need
>>to
>> > bump up the protocol version every time we make a change to it, which
>> > includes keeping the logic to handle all versions on the broker until
>>the
>> > old ones are officially discarded; instead, the broker can just
>>ignore a
>> > tag if its id is not recognizable since the client is on a newer
>>version,
>> > or use some default value / throw exception if a required tag is
>>missing
>> > since the client is on an older version.
>> >
>> >
>> > >
>> > > - You don't necessarily need application-level tags explicitly
>> > represented
>> > > in the message format for efficiency. The application can define
>>their
>> > own
>> > > header (e.g. their message could be a size delimited header followed
>> by a
>> > > size delimited body). But actually if you use Avro you don't even
>>need
>> > this
>> > > I don't think. Avro has the ability to just deserialize the "header"
>> > fields
>> > > in your message. Avro has a notion of reader and writer schemas. The
>> > writer
>> > > schema is whatever the message was written with. If the reader
>>schema
>> is
>> > > just the header, avro will skip any fields it doesn't need and just
>> > > deserialize the fields it does need. This is actually a much more
>> usable
>> > > and flexible way to define a header since you get all the types avro
>> > allows
>> > > instead of just bytes.
>> > >
>> >
>> > I agree that we can use a reader schema to just read out the header
>> without
>> > de-serializing the full message, and probably for compressed message
>>we
>> can
>> > add an Avro / etc header for the compressed wrapper message also, but
>> that
>> > would enforce these applications (MM, auditor, clients) to be
>> schema-aware,
>> > which would usually require the people who manage this data pipeline
>>also
>> > manage the schemas, whereas ideally Kafka itself should just consider
>> > bytes-in and bytes-out (and maybe a little bit more, like timestamps).
>> The
>> > purpose here is to not introduce an extra dependency while at the same
>> time
>> > allow applications to not fully de-serialize / de-compress the
>>message in
>> > order to do some simple processing based on metadata only.
>> >
>> >
>> > >
>> > > - We will need to think carefully about what to do with timestamps
>>if
>> we
>> > > end up including them. There are actually several timestamps
>> > >   - The time the producer created the message
>> > >   - The time the leader received the message
>> > >   - The time the current broker received the message
>> > > The producer timestamps won't be at all increasing. The leader
>> timestamp
>> > > will be mostly increasing except when the clock changes or
>>leadership
>> > > moves. This somewhat complicates the use of these timestamps,
>>though.
>> > From
>> > > the point of view of the producer the only time that matters is the
>> time
>> > > the message was created. However since the producer sets this it
>>can be
>> > > arbitrarily bad (remember all the ntp issues and 1970 timestamps we
>> would
>> > > get). Say that the heuristic was to use the timestamp of the first
>> > message
>> > > in a file for retention, the problem would be that the timestamps
>>for
>> the
>> > > segments need not even be sequential and a single bad producer could
>> send
>> > > data with time in the distant past or future causing data to be
>>deleted
>> > or
>> > > retained forever. Using the broker timestamp at write time is
>>better,
>> > > though obvious that would be overwritten when data is mirrored
>>between
>> > > clusters (the mirror would then have a different time--and if the
>> > mirroring
>> > > ever stopped that gap could be large). One approach would be to use
>>the
>> > > client timestamp but have the broker overwrite it if it is too bad
>> (e.g.
>> > > off by more than a minute, say).
>> > >
>> >
>> > We would need the reception timestamp (i.e. the third one) for log
>> > cleaning, and as for the first / second ones, I originally put them as
>> app
>> > tags since they are likely to be used not by the brokers itself (e.g.
>> > auditor, etc).
>> >
>> >
>> > >
>> > > -Jay
>> > >
>> > > On Fri, Oct 10, 2014 at 11:21 PM, Joel Koshy <jjkosh...@gmail.com>
>> > wrote:
>> > >
>> > > > Thanks Guozhang! This is an excellent write-up and the approach
>> nicely
>> > > > consolidates a number of long-standing issues. It would be great
>>if
>> > > > everyone can review this carefully and give feedback.
>> > > >
>> > > > Also, wrt discussion in the past we have used a mix of wiki
>>comments
>> > > > and the mailing list. Personally, I think it is better to discuss
>>on
>> > > > the mailing list (for more visibility) and just post a bold link
>>to
>> > > > the (archived) mailing list thread on the wiki.
>> > > >
>> > > > Joel
>> > > >
>> > > > On Fri, Oct 10, 2014 at 05:33:52PM -0700, Guozhang Wang wrote:
>> > > > > Hello all,
>> > > > >
>> > > > > I put some thoughts on enhancing our current message metadata
>> format
>> > to
>> > > > > solve a bunch of existing issues:
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>> 
>>https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+
>>Metadata
>> > > > >
>> > > > > This wiki page is for kicking off some discussions about the
>> > > feasibility
>> > > > of
>> > > > > adding more info into the message header, and if possible how we
>> > would
>> > > > add
>> > > > > them.
>> > > > >
>> > > > > -- Guozhang
>> > > >
>> > > >
>> > >
>> >
>> >
>> >
>> > --
>> > -- Guozhang
>> >
>>
>
>
>
>-- 
>-- Guozhang

Reply via email to