Let me add my view on #2 in less delicate terms than Guozhang did :) When you¹re trying to run Kafka as a service, having to care about the format of the message sucks. I have plenty of users who are just fine using the Avro standard and play nice. Then I have a bunch of users who don¹t want to use Avro and want to do something else (json, some plain text, whatever). Then I have a bunch of users who use Avro but don¹t properly register their schemas. Then I have a bunch of users who do whatever they want and don¹t tell us.
What this means is that I can¹t have standard tooling, like auditing, that works on the entire system. I either have to whitelist or blacklist topics, and then I run into problems when someone adds something new either way. It would be preferable if I could monitor and maintain the health of the system without having to worry about the message format. -Todd On 10/15/14, 10:50 AM, "Guozhang Wang" <wangg...@gmail.com> wrote: >Thanks Joe, > >I think we now have a few open questions to discuss around this topic: > >1. Shall we make core Kafka properties as first class fields in message >header or put them as tags? > >The pros of the first approach is more compacted format and hence less >message header overhead; the cons are that any future message header >change >needs protocol bump and possible multi-versioned handling on the server >side. > >Vice versa for the second approach. > >2. Shall we leave app properties still in message content and enforce >schema based topics or make them as extensible tags? > >The pros of the first approach is again saving message header overhead for >apps properties; and the cons are that it enforce schema usage for message >content to be partially de-serialized only for app header. At LinkedIn we >enforce Avro schemas for auditing purposes, and as a result the Kafka team >has to manage the schema registration process / schema repository as well. > >3. Which properties should be core KAFKA and which should be app >properties? For example, shall we make properties that only MM cares about >as app properties or Kafka properties? > >Guozhang > >On Tue, Oct 14, 2014 at 5:10 AM, Joe Stein <joe.st...@stealth.ly> wrote: > >> I think we could add schemaId(binary) to the MessageAndMetaData >> >> With the schemaId you can implement different downstream software >>pattern >> on the messages reliably. I wrote up more thoughts on this use >> https://cwiki.apache.org/confluence/display/KAFKA/Schema+based+topics it >> should strive to encompass all implementation needs for producer, >>broker, >> consumer hooks. >> >> So if the application and tagged fields are important you can package >>that >> into a specific Kafka topic plug-in and assign it to topic(s). Kafka >> server should be able to validate your expected formats (like >> encoders/decoders but in broker by topic regardless of producer) to the >> topics that have it enabled. We should have these maintained in the >>project >> under contrib. >> >> =- Joestein >> >> On Mon, Oct 13, 2014 at 11:02 PM, Guozhang Wang <wangg...@gmail.com> >> wrote: >> >> > Hi Jay, >> > >> > Thanks for the comments. Replied inline. >> > >> > Guozhang >> > >> > On Mon, Oct 13, 2014 at 11:11 AM, Jay Kreps <jay.kr...@gmail.com> >>wrote: >> > >> > > I need to take more time to think about this. Here are a few >> off-the-cuff >> > > remarks: >> > > >> > > - To date we have tried really, really hard to keep the data model >>for >> > > message simple since after all you can always add whatever you like >> > inside >> > > the message body. >> > > >> > > - For system tags, why not just make these fields first class >>fields in >> > > message? The purpose of a system tag is presumably that Why have a >> bunch >> > of >> > > key-value pairs versus first-class fields? >> > > >> > >> > Yes, we can alternatively make system tags as first class fields in >>the >> > message header to make the format / processing logic simpler. >> > >> > The main reasons I put them as systems tags are 1) when I think about >> these >> > possible system tags, some of them are for all types of messages (e.g. >> > timestamps), but some of them may be for a specific type of message >> > (compressed, control message) and for those not all of them are >> necessarily >> > required all the time, hence making them as compact tags may save us >>some >> > space when not all of them are available; 2) with tags we do not need >>to >> > bump up the protocol version every time we make a change to it, which >> > includes keeping the logic to handle all versions on the broker until >>the >> > old ones are officially discarded; instead, the broker can just >>ignore a >> > tag if its id is not recognizable since the client is on a newer >>version, >> > or use some default value / throw exception if a required tag is >>missing >> > since the client is on an older version. >> > >> > >> > > >> > > - You don't necessarily need application-level tags explicitly >> > represented >> > > in the message format for efficiency. The application can define >>their >> > own >> > > header (e.g. their message could be a size delimited header followed >> by a >> > > size delimited body). But actually if you use Avro you don't even >>need >> > this >> > > I don't think. Avro has the ability to just deserialize the "header" >> > fields >> > > in your message. Avro has a notion of reader and writer schemas. The >> > writer >> > > schema is whatever the message was written with. If the reader >>schema >> is >> > > just the header, avro will skip any fields it doesn't need and just >> > > deserialize the fields it does need. This is actually a much more >> usable >> > > and flexible way to define a header since you get all the types avro >> > allows >> > > instead of just bytes. >> > > >> > >> > I agree that we can use a reader schema to just read out the header >> without >> > de-serializing the full message, and probably for compressed message >>we >> can >> > add an Avro / etc header for the compressed wrapper message also, but >> that >> > would enforce these applications (MM, auditor, clients) to be >> schema-aware, >> > which would usually require the people who manage this data pipeline >>also >> > manage the schemas, whereas ideally Kafka itself should just consider >> > bytes-in and bytes-out (and maybe a little bit more, like timestamps). >> The >> > purpose here is to not introduce an extra dependency while at the same >> time >> > allow applications to not fully de-serialize / de-compress the >>message in >> > order to do some simple processing based on metadata only. >> > >> > >> > > >> > > - We will need to think carefully about what to do with timestamps >>if >> we >> > > end up including them. There are actually several timestamps >> > > - The time the producer created the message >> > > - The time the leader received the message >> > > - The time the current broker received the message >> > > The producer timestamps won't be at all increasing. The leader >> timestamp >> > > will be mostly increasing except when the clock changes or >>leadership >> > > moves. This somewhat complicates the use of these timestamps, >>though. >> > From >> > > the point of view of the producer the only time that matters is the >> time >> > > the message was created. However since the producer sets this it >>can be >> > > arbitrarily bad (remember all the ntp issues and 1970 timestamps we >> would >> > > get). Say that the heuristic was to use the timestamp of the first >> > message >> > > in a file for retention, the problem would be that the timestamps >>for >> the >> > > segments need not even be sequential and a single bad producer could >> send >> > > data with time in the distant past or future causing data to be >>deleted >> > or >> > > retained forever. Using the broker timestamp at write time is >>better, >> > > though obvious that would be overwritten when data is mirrored >>between >> > > clusters (the mirror would then have a different time--and if the >> > mirroring >> > > ever stopped that gap could be large). One approach would be to use >>the >> > > client timestamp but have the broker overwrite it if it is too bad >> (e.g. >> > > off by more than a minute, say). >> > > >> > >> > We would need the reception timestamp (i.e. the third one) for log >> > cleaning, and as for the first / second ones, I originally put them as >> app >> > tags since they are likely to be used not by the brokers itself (e.g. >> > auditor, etc). >> > >> > >> > > >> > > -Jay >> > > >> > > On Fri, Oct 10, 2014 at 11:21 PM, Joel Koshy <jjkosh...@gmail.com> >> > wrote: >> > > >> > > > Thanks Guozhang! This is an excellent write-up and the approach >> nicely >> > > > consolidates a number of long-standing issues. It would be great >>if >> > > > everyone can review this carefully and give feedback. >> > > > >> > > > Also, wrt discussion in the past we have used a mix of wiki >>comments >> > > > and the mailing list. Personally, I think it is better to discuss >>on >> > > > the mailing list (for more visibility) and just post a bold link >>to >> > > > the (archived) mailing list thread on the wiki. >> > > > >> > > > Joel >> > > > >> > > > On Fri, Oct 10, 2014 at 05:33:52PM -0700, Guozhang Wang wrote: >> > > > > Hello all, >> > > > > >> > > > > I put some thoughts on enhancing our current message metadata >> format >> > to >> > > > > solve a bunch of existing issues: >> > > > > >> > > > > >> > > > >> > > >> > >> >>https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+ >>Metadata >> > > > > >> > > > > This wiki page is for kicking off some discussions about the >> > > feasibility >> > > > of >> > > > > adding more info into the message header, and if possible how we >> > would >> > > > add >> > > > > them. >> > > > > >> > > > > -- Guozhang >> > > > >> > > > >> > > >> > >> > >> > >> > -- >> > -- Guozhang >> > >> > > > >-- >-- Guozhang