I think the tags are a useful concept to have in that they do for applications, what the additional metadata does for brokers. i.e., avoiding decompression and recompression of an entire message-set. I agree that we should not place any "core" fields (i.e., those used internally by Kafka) in tags and those should be first-class fields in the message header. E.g., if we intend to support in-built end-to-end audit in Kafka then fields for auditing (server, timestamps, etc.) should be first-class fields in the message header. However, tags are useful for application-level features that can avoid a full decompression.
Although Avro has the ability to just deserialize select fields (say a header) we then limit the optimization to avro-like formats. Also, that will remain an application-specific thing and not an intrinsic part of the wire protocol. i.e., brokers will continue to have to decompress and recompress messages to assign offsets. Joel On Wed, Oct 15, 2014 at 09:04:55PM +0000, Todd Palino wrote: > Let me add my view on #2 in less delicate terms than Guozhang did :) > > When you¹re trying to run Kafka as a service, having to care about the > format of the message sucks. I have plenty of users who are just fine > using the Avro standard and play nice. Then I have a bunch of users who > don¹t want to use Avro and want to do something else (json, some plain > text, whatever). Then I have a bunch of users who use Avro but don¹t > properly register their schemas. Then I have a bunch of users who do > whatever they want and don¹t tell us. > > What this means is that I can¹t have standard tooling, like auditing, that > works on the entire system. I either have to whitelist or blacklist > topics, and then I run into problems when someone adds something new > either way. It would be preferable if I could monitor and maintain the > health of the system without having to worry about the message format. > > -Todd > > > On 10/15/14, 10:50 AM, "Guozhang Wang" <wangg...@gmail.com> wrote: > > >Thanks Joe, > > > >I think we now have a few open questions to discuss around this topic: > > > >1. Shall we make core Kafka properties as first class fields in message > >header or put them as tags? > > > >The pros of the first approach is more compacted format and hence less > >message header overhead; the cons are that any future message header > >change > >needs protocol bump and possible multi-versioned handling on the server > >side. > > > >Vice versa for the second approach. > > > >2. Shall we leave app properties still in message content and enforce > >schema based topics or make them as extensible tags? > > > >The pros of the first approach is again saving message header overhead for > >apps properties; and the cons are that it enforce schema usage for message > >content to be partially de-serialized only for app header. At LinkedIn we > >enforce Avro schemas for auditing purposes, and as a result the Kafka team > >has to manage the schema registration process / schema repository as well. > > > >3. Which properties should be core KAFKA and which should be app > >properties? For example, shall we make properties that only MM cares about > >as app properties or Kafka properties? > > > >Guozhang > > > >On Tue, Oct 14, 2014 at 5:10 AM, Joe Stein <joe.st...@stealth.ly> wrote: > > > >> I think we could add schemaId(binary) to the MessageAndMetaData > >> > >> With the schemaId you can implement different downstream software > >>pattern > >> on the messages reliably. I wrote up more thoughts on this use > >> https://cwiki.apache.org/confluence/display/KAFKA/Schema+based+topics it > >> should strive to encompass all implementation needs for producer, > >>broker, > >> consumer hooks. > >> > >> So if the application and tagged fields are important you can package > >>that > >> into a specific Kafka topic plug-in and assign it to topic(s). Kafka > >> server should be able to validate your expected formats (like > >> encoders/decoders but in broker by topic regardless of producer) to the > >> topics that have it enabled. We should have these maintained in the > >>project > >> under contrib. > >> > >> =- Joestein > >> > >> On Mon, Oct 13, 2014 at 11:02 PM, Guozhang Wang <wangg...@gmail.com> > >> wrote: > >> > >> > Hi Jay, > >> > > >> > Thanks for the comments. Replied inline. > >> > > >> > Guozhang > >> > > >> > On Mon, Oct 13, 2014 at 11:11 AM, Jay Kreps <jay.kr...@gmail.com> > >>wrote: > >> > > >> > > I need to take more time to think about this. Here are a few > >> off-the-cuff > >> > > remarks: > >> > > > >> > > - To date we have tried really, really hard to keep the data model > >>for > >> > > message simple since after all you can always add whatever you like > >> > inside > >> > > the message body. > >> > > > >> > > - For system tags, why not just make these fields first class > >>fields in > >> > > message? The purpose of a system tag is presumably that Why have a > >> bunch > >> > of > >> > > key-value pairs versus first-class fields? > >> > > > >> > > >> > Yes, we can alternatively make system tags as first class fields in > >>the > >> > message header to make the format / processing logic simpler. > >> > > >> > The main reasons I put them as systems tags are 1) when I think about > >> these > >> > possible system tags, some of them are for all types of messages (e.g. > >> > timestamps), but some of them may be for a specific type of message > >> > (compressed, control message) and for those not all of them are > >> necessarily > >> > required all the time, hence making them as compact tags may save us > >>some > >> > space when not all of them are available; 2) with tags we do not need > >>to > >> > bump up the protocol version every time we make a change to it, which > >> > includes keeping the logic to handle all versions on the broker until > >>the > >> > old ones are officially discarded; instead, the broker can just > >>ignore a > >> > tag if its id is not recognizable since the client is on a newer > >>version, > >> > or use some default value / throw exception if a required tag is > >>missing > >> > since the client is on an older version. > >> > > >> > > >> > > > >> > > - You don't necessarily need application-level tags explicitly > >> > represented > >> > > in the message format for efficiency. The application can define > >>their > >> > own > >> > > header (e.g. their message could be a size delimited header followed > >> by a > >> > > size delimited body). But actually if you use Avro you don't even > >>need > >> > this > >> > > I don't think. Avro has the ability to just deserialize the "header" > >> > fields > >> > > in your message. Avro has a notion of reader and writer schemas. The > >> > writer > >> > > schema is whatever the message was written with. If the reader > >>schema > >> is > >> > > just the header, avro will skip any fields it doesn't need and just > >> > > deserialize the fields it does need. This is actually a much more > >> usable > >> > > and flexible way to define a header since you get all the types avro > >> > allows > >> > > instead of just bytes. > >> > > > >> > > >> > I agree that we can use a reader schema to just read out the header > >> without > >> > de-serializing the full message, and probably for compressed message > >>we > >> can > >> > add an Avro / etc header for the compressed wrapper message also, but > >> that > >> > would enforce these applications (MM, auditor, clients) to be > >> schema-aware, > >> > which would usually require the people who manage this data pipeline > >>also > >> > manage the schemas, whereas ideally Kafka itself should just consider > >> > bytes-in and bytes-out (and maybe a little bit more, like timestamps). > >> The > >> > purpose here is to not introduce an extra dependency while at the same > >> time > >> > allow applications to not fully de-serialize / de-compress the > >>message in > >> > order to do some simple processing based on metadata only. > >> > > >> > > >> > > > >> > > - We will need to think carefully about what to do with timestamps > >>if > >> we > >> > > end up including them. There are actually several timestamps > >> > > - The time the producer created the message > >> > > - The time the leader received the message > >> > > - The time the current broker received the message > >> > > The producer timestamps won't be at all increasing. The leader > >> timestamp > >> > > will be mostly increasing except when the clock changes or > >>leadership > >> > > moves. This somewhat complicates the use of these timestamps, > >>though. > >> > From > >> > > the point of view of the producer the only time that matters is the > >> time > >> > > the message was created. However since the producer sets this it > >>can be > >> > > arbitrarily bad (remember all the ntp issues and 1970 timestamps we > >> would > >> > > get). Say that the heuristic was to use the timestamp of the first > >> > message > >> > > in a file for retention, the problem would be that the timestamps > >>for > >> the > >> > > segments need not even be sequential and a single bad producer could > >> send > >> > > data with time in the distant past or future causing data to be > >>deleted > >> > or > >> > > retained forever. Using the broker timestamp at write time is > >>better, > >> > > though obvious that would be overwritten when data is mirrored > >>between > >> > > clusters (the mirror would then have a different time--and if the > >> > mirroring > >> > > ever stopped that gap could be large). One approach would be to use > >>the > >> > > client timestamp but have the broker overwrite it if it is too bad > >> (e.g. > >> > > off by more than a minute, say). > >> > > > >> > > >> > We would need the reception timestamp (i.e. the third one) for log > >> > cleaning, and as for the first / second ones, I originally put them as > >> app > >> > tags since they are likely to be used not by the brokers itself (e.g. > >> > auditor, etc). > >> > > >> > > >> > > > >> > > -Jay > >> > > > >> > > On Fri, Oct 10, 2014 at 11:21 PM, Joel Koshy <jjkosh...@gmail.com> > >> > wrote: > >> > > > >> > > > Thanks Guozhang! This is an excellent write-up and the approach > >> nicely > >> > > > consolidates a number of long-standing issues. It would be great > >>if > >> > > > everyone can review this carefully and give feedback. > >> > > > > >> > > > Also, wrt discussion in the past we have used a mix of wiki > >>comments > >> > > > and the mailing list. Personally, I think it is better to discuss > >>on > >> > > > the mailing list (for more visibility) and just post a bold link > >>to > >> > > > the (archived) mailing list thread on the wiki. > >> > > > > >> > > > Joel > >> > > > > >> > > > On Fri, Oct 10, 2014 at 05:33:52PM -0700, Guozhang Wang wrote: > >> > > > > Hello all, > >> > > > > > >> > > > > I put some thoughts on enhancing our current message metadata > >> format > >> > to > >> > > > > solve a bunch of existing issues: > >> > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >>https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+ > >>Metadata > >> > > > > > >> > > > > This wiki page is for kicking off some discussions about the > >> > > feasibility > >> > > > of > >> > > > > adding more info into the message header, and if possible how we > >> > would > >> > > > add > >> > > > > them. > >> > > > > > >> > > > > -- Guozhang > >> > > > > >> > > > > >> > > > >> > > >> > > >> > > >> > -- > >> > -- Guozhang > >> > > >> > > > > > > > >-- > >-- Guozhang