Hi Jay, Thanks for the comments. Replied inline.
Guozhang On Mon, Oct 13, 2014 at 11:11 AM, Jay Kreps <[email protected]> wrote: > I need to take more time to think about this. Here are a few off-the-cuff > remarks: > > - To date we have tried really, really hard to keep the data model for > message simple since after all you can always add whatever you like inside > the message body. > > - For system tags, why not just make these fields first class fields in > message? The purpose of a system tag is presumably that Why have a bunch of > key-value pairs versus first-class fields? > Yes, we can alternatively make system tags as first class fields in the message header to make the format / processing logic simpler. The main reasons I put them as systems tags are 1) when I think about these possible system tags, some of them are for all types of messages (e.g. timestamps), but some of them may be for a specific type of message (compressed, control message) and for those not all of them are necessarily required all the time, hence making them as compact tags may save us some space when not all of them are available; 2) with tags we do not need to bump up the protocol version every time we make a change to it, which includes keeping the logic to handle all versions on the broker until the old ones are officially discarded; instead, the broker can just ignore a tag if its id is not recognizable since the client is on a newer version, or use some default value / throw exception if a required tag is missing since the client is on an older version. > > - You don't necessarily need application-level tags explicitly represented > in the message format for efficiency. The application can define their own > header (e.g. their message could be a size delimited header followed by a > size delimited body). But actually if you use Avro you don't even need this > I don't think. Avro has the ability to just deserialize the "header" fields > in your message. Avro has a notion of reader and writer schemas. The writer > schema is whatever the message was written with. If the reader schema is > just the header, avro will skip any fields it doesn't need and just > deserialize the fields it does need. This is actually a much more usable > and flexible way to define a header since you get all the types avro allows > instead of just bytes. > I agree that we can use a reader schema to just read out the header without de-serializing the full message, and probably for compressed message we can add an Avro / etc header for the compressed wrapper message also, but that would enforce these applications (MM, auditor, clients) to be schema-aware, which would usually require the people who manage this data pipeline also manage the schemas, whereas ideally Kafka itself should just consider bytes-in and bytes-out (and maybe a little bit more, like timestamps). The purpose here is to not introduce an extra dependency while at the same time allow applications to not fully de-serialize / de-compress the message in order to do some simple processing based on metadata only. > > - We will need to think carefully about what to do with timestamps if we > end up including them. There are actually several timestamps > - The time the producer created the message > - The time the leader received the message > - The time the current broker received the message > The producer timestamps won't be at all increasing. The leader timestamp > will be mostly increasing except when the clock changes or leadership > moves. This somewhat complicates the use of these timestamps, though. From > the point of view of the producer the only time that matters is the time > the message was created. However since the producer sets this it can be > arbitrarily bad (remember all the ntp issues and 1970 timestamps we would > get). Say that the heuristic was to use the timestamp of the first message > in a file for retention, the problem would be that the timestamps for the > segments need not even be sequential and a single bad producer could send > data with time in the distant past or future causing data to be deleted or > retained forever. Using the broker timestamp at write time is better, > though obvious that would be overwritten when data is mirrored between > clusters (the mirror would then have a different time--and if the mirroring > ever stopped that gap could be large). One approach would be to use the > client timestamp but have the broker overwrite it if it is too bad (e.g. > off by more than a minute, say). > We would need the reception timestamp (i.e. the third one) for log cleaning, and as for the first / second ones, I originally put them as app tags since they are likely to be used not by the brokers itself (e.g. auditor, etc). > > -Jay > > On Fri, Oct 10, 2014 at 11:21 PM, Joel Koshy <[email protected]> wrote: > > > Thanks Guozhang! This is an excellent write-up and the approach nicely > > consolidates a number of long-standing issues. It would be great if > > everyone can review this carefully and give feedback. > > > > Also, wrt discussion in the past we have used a mix of wiki comments > > and the mailing list. Personally, I think it is better to discuss on > > the mailing list (for more visibility) and just post a bold link to > > the (archived) mailing list thread on the wiki. > > > > Joel > > > > On Fri, Oct 10, 2014 at 05:33:52PM -0700, Guozhang Wang wrote: > > > Hello all, > > > > > > I put some thoughts on enhancing our current message metadata format to > > > solve a bunch of existing issues: > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata > > > > > > This wiki page is for kicking off some discussions about the > feasibility > > of > > > adding more info into the message header, and if possible how we would > > add > > > them. > > > > > > -- Guozhang > > > > > -- -- Guozhang
