I think we could add schemaId(binary) to the MessageAndMetaData With the schemaId you can implement different downstream software pattern on the messages reliably. I wrote up more thoughts on this use https://cwiki.apache.org/confluence/display/KAFKA/Schema+based+topics it should strive to encompass all implementation needs for producer, broker, consumer hooks.
So if the application and tagged fields are important you can package that into a specific Kafka topic plug-in and assign it to topic(s). Kafka server should be able to validate your expected formats (like encoders/decoders but in broker by topic regardless of producer) to the topics that have it enabled. We should have these maintained in the project under contrib. =- Joestein On Mon, Oct 13, 2014 at 11:02 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Hi Jay, > > Thanks for the comments. Replied inline. > > Guozhang > > On Mon, Oct 13, 2014 at 11:11 AM, Jay Kreps <jay.kr...@gmail.com> wrote: > > > I need to take more time to think about this. Here are a few off-the-cuff > > remarks: > > > > - To date we have tried really, really hard to keep the data model for > > message simple since after all you can always add whatever you like > inside > > the message body. > > > > - For system tags, why not just make these fields first class fields in > > message? The purpose of a system tag is presumably that Why have a bunch > of > > key-value pairs versus first-class fields? > > > > Yes, we can alternatively make system tags as first class fields in the > message header to make the format / processing logic simpler. > > The main reasons I put them as systems tags are 1) when I think about these > possible system tags, some of them are for all types of messages (e.g. > timestamps), but some of them may be for a specific type of message > (compressed, control message) and for those not all of them are necessarily > required all the time, hence making them as compact tags may save us some > space when not all of them are available; 2) with tags we do not need to > bump up the protocol version every time we make a change to it, which > includes keeping the logic to handle all versions on the broker until the > old ones are officially discarded; instead, the broker can just ignore a > tag if its id is not recognizable since the client is on a newer version, > or use some default value / throw exception if a required tag is missing > since the client is on an older version. > > > > > > - You don't necessarily need application-level tags explicitly > represented > > in the message format for efficiency. The application can define their > own > > header (e.g. their message could be a size delimited header followed by a > > size delimited body). But actually if you use Avro you don't even need > this > > I don't think. Avro has the ability to just deserialize the "header" > fields > > in your message. Avro has a notion of reader and writer schemas. The > writer > > schema is whatever the message was written with. If the reader schema is > > just the header, avro will skip any fields it doesn't need and just > > deserialize the fields it does need. This is actually a much more usable > > and flexible way to define a header since you get all the types avro > allows > > instead of just bytes. > > > > I agree that we can use a reader schema to just read out the header without > de-serializing the full message, and probably for compressed message we can > add an Avro / etc header for the compressed wrapper message also, but that > would enforce these applications (MM, auditor, clients) to be schema-aware, > which would usually require the people who manage this data pipeline also > manage the schemas, whereas ideally Kafka itself should just consider > bytes-in and bytes-out (and maybe a little bit more, like timestamps). The > purpose here is to not introduce an extra dependency while at the same time > allow applications to not fully de-serialize / de-compress the message in > order to do some simple processing based on metadata only. > > > > > > - We will need to think carefully about what to do with timestamps if we > > end up including them. There are actually several timestamps > > - The time the producer created the message > > - The time the leader received the message > > - The time the current broker received the message > > The producer timestamps won't be at all increasing. The leader timestamp > > will be mostly increasing except when the clock changes or leadership > > moves. This somewhat complicates the use of these timestamps, though. > From > > the point of view of the producer the only time that matters is the time > > the message was created. However since the producer sets this it can be > > arbitrarily bad (remember all the ntp issues and 1970 timestamps we would > > get). Say that the heuristic was to use the timestamp of the first > message > > in a file for retention, the problem would be that the timestamps for the > > segments need not even be sequential and a single bad producer could send > > data with time in the distant past or future causing data to be deleted > or > > retained forever. Using the broker timestamp at write time is better, > > though obvious that would be overwritten when data is mirrored between > > clusters (the mirror would then have a different time--and if the > mirroring > > ever stopped that gap could be large). One approach would be to use the > > client timestamp but have the broker overwrite it if it is too bad (e.g. > > off by more than a minute, say). > > > > We would need the reception timestamp (i.e. the third one) for log > cleaning, and as for the first / second ones, I originally put them as app > tags since they are likely to be used not by the brokers itself (e.g. > auditor, etc). > > > > > > -Jay > > > > On Fri, Oct 10, 2014 at 11:21 PM, Joel Koshy <jjkosh...@gmail.com> > wrote: > > > > > Thanks Guozhang! This is an excellent write-up and the approach nicely > > > consolidates a number of long-standing issues. It would be great if > > > everyone can review this carefully and give feedback. > > > > > > Also, wrt discussion in the past we have used a mix of wiki comments > > > and the mailing list. Personally, I think it is better to discuss on > > > the mailing list (for more visibility) and just post a bold link to > > > the (archived) mailing list thread on the wiki. > > > > > > Joel > > > > > > On Fri, Oct 10, 2014 at 05:33:52PM -0700, Guozhang Wang wrote: > > > > Hello all, > > > > > > > > I put some thoughts on enhancing our current message metadata format > to > > > > solve a bunch of existing issues: > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata > > > > > > > > This wiki page is for kicking off some discussions about the > > feasibility > > > of > > > > adding more info into the message header, and if possible how we > would > > > add > > > > them. > > > > > > > > -- Guozhang > > > > > > > > > > > > -- > -- Guozhang >