A couple notes on this 1 / 2 - Message format should not be important to Kafka or the common infrastructure (audit, mirror maker). As I have noted elsewhere, having to ensure the data contract just to provide a general messaging service is additional overhead that is not always needed. It either requires specific functionality from the consumers and producers (which makes implementing them harder), or it requires the broker to care about the message format, which it should not. This is even before we get to the operational problems of enforcing data formatting.
3 - while this is interesting functionality, it is less general purpose than the simple mirroring that is mostly done now. If you need a customized mirror maker and audit that requires message parsing, go ahead and write that. But the general use case does not. Currently, producing a message requires it to be compressed twice (once by the producer and once by the broker), plus an additional 2 compressions for every additional cluster it needs to pass through. In my case, this usually means 6 compressions. Even one extra is an incredible waste of resources. -Todd > On Oct 19, 2014, at 7:42 PM, Jun Rao <jun...@gmail.com> wrote: > > Hi, Guozhang, > > Thanks for the writeup. > > A few high level comments. > > 1. Associating (versioned) schemas to a topic can be a good thing overall. > Yes, this could add a bit more management overhead in Kafka. However, it > makes sure that the data format contract between a producer and a consumer > is kept and managed in a central place, instead of in the application. The > latter is probably easier to start with, but is likely to be brittle in the > long run. > > 2. Auditing can be a general feature that's useful for many applications. > Such a feature can be implemented by extending the low level message format > with a header. However, it can also be added as part of the schema > management. For example, you can imagine a type of audited schema that adds > additional auditing info to an existing schema automatically. Performance > wise, it probably doesn't make a big difference whether the auditing info > is added in the message header or the schema header. > > 3. We talked about avoiding the overhead of decompressing in both the > broker and the mirror maker. We probably need to think through how this > works with auditing. In the more general case where you want to audit every > message, one has to do the decompression to get the individual message, > independent of how the auditing info is stored. This means that if we want > to audit the broker directly or the consumer in mirror maker, we have to > pay the decompression cost anyway. Similarly, if we want to extend mirror > maker to support some customized filtering/transformation logic, we also > have to pay the decompression cost. > > Some low level comments. > > 4. Broker offset reassignment (kafka-527): This probably can be done with > just a format change on the compressed message set. > > 5. MirrorMaker refactoring: We probably can think through how general we > want mirror maker to be. If we want to it to be more general, we likely > need to decompress every message just like in a normal consumer. There will > definitely be overhead. However, as long as mirror maker is made scalable, > we can overcome the overhead by just running more instances on more > hardware resources. As for the proposed message format change, we probably > need to think through it a bit more. The honor-ship flag seems a bit hacky > to me. > > 6. Adding a timestamp in each message can be a useful thing. It (1) allows > log segments to be rolled more accurately; (2) allows finding an offset for > a particular timestamp more accurately. I am thinking that the timestamp in > the message should probably be the time when the leader receives the > message. Followers preserve the timestamp set by leader. To avoid time > going back during leader change, the leader can probably set the timestamp > to be the max of current time and the timestamp of the last message, if > present. That timestamp can potentially be added to the index file to > answer offsetBeforeTimestamp queries more efficiently. > > 7. Log compaction: It seems that you are suggesting an improvement to > compact the active segment as well. This can be tricky and we need to > figure out the details on how to do this. This improvement seems to be > orthogonal to the message format change though. > > 8. Data inconsistency from unclean election: I am not sure if we need to > add a controlled message to the log during leadership change. The <leader > generation, starting offset> map can be maintained in a separate checkpoint > file. The follower just need to get that map from the leader during startup. > > Thanks, > > Jun > >> On Fri, Oct 10, 2014 at 5:33 PM, Guozhang Wang <wangg...@gmail.com> wrote: >> >> Hello all, >> >> I put some thoughts on enhancing our current message metadata format to >> solve a bunch of existing issues: >> >> >> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata >> >> This wiki page is for kicking off some discussions about the feasibility of >> adding more info into the message header, and if possible how we would add >> them. >> >> -- Guozhang >>