Hi Jun, Regarding 4) in your comment, after thinking it for a while I cannot come up a way to it along with log compaction without adding new fields into the current format on message set. Do you have a better way that do not require protocol changes?
Guozhang On Mon, Oct 20, 2014 at 9:53 AM, Guozhang Wang <wangg...@gmail.com> wrote: > I have updated the wiki page incorporating received comments. We can > discuss some more details on: > > 1. How we want to do audit? Whether we want to have in-built auditing on > brokers or even MMs or use an audit consumer to fetch all messages from > just brokers. > > 2. How we can avoid de-/re-compression on brokers and MMs with log > compaction turned on. > > 3. How we can resolve unclean leader election resulted data inconsistency > with control messages. > > Guozhang > > On Sun, Oct 19, 2014 at 11:41 PM, Guozhang Wang <wangg...@gmail.com> > wrote: > >> Thanks for the detailed comments Jun! Some replies inlined. >> >> On Sun, Oct 19, 2014 at 7:42 PM, Jun Rao <jun...@gmail.com> wrote: >> >>> Hi, Guozhang, >>> >>> Thanks for the writeup. >>> >>> A few high level comments. >>> >>> 1. Associating (versioned) schemas to a topic can be a good thing >>> overall. >>> Yes, this could add a bit more management overhead in Kafka. However, it >>> makes sure that the data format contract between a producer and a >>> consumer >>> is kept and managed in a central place, instead of in the application. >>> The >>> latter is probably easier to start with, but is likely to be brittle in >>> the >>> long run. >>> >> >> I am actually not proposing to not support associated versioned schemas >> for topics, but to not let some core Kafka functionalities like auditing >> being depend on schemas. I think this alone can separate the schema >> management from Kafka piping management (i.e. making sure every single >> message is delivered, and within some latency, etc). Adding additional >> auditing info into an existing schema will force Kafka to be aware of the >> schema systems (Avro, JSON, etc). >> >> >>> >>> 2. Auditing can be a general feature that's useful for many applications. >>> Such a feature can be implemented by extending the low level message >>> format >>> with a header. However, it can also be added as part of the schema >>> management. For example, you can imagine a type of audited schema that >>> adds >>> additional auditing info to an existing schema automatically. Performance >>> wise, it probably doesn't make a big difference whether the auditing info >>> is added in the message header or the schema header. >>> >>> >> See replies above. >> >> >>> 3. We talked about avoiding the overhead of decompressing in both the >>> broker and the mirror maker. We probably need to think through how this >>> works with auditing. In the more general case where you want to audit >>> every >>> message, one has to do the decompression to get the individual message, >>> independent of how the auditing info is stored. This means that if we >>> want >>> to audit the broker directly or the consumer in mirror maker, we have to >>> pay the decompression cost anyway. Similarly, if we want to extend mirror >>> maker to support some customized filtering/transformation logic, we also >>> have to pay the decompression cost. >>> >>> >> I see your point. For that I would prefer to have a MM implementation >> that is able to do de-compress / re-compress ONLY if required, for example >> by auditing, etc. I agree that we have not thought through whether we >> should enable auditing on MM, and if yes how to do that, and we could >> discuss about that in a different thread. Overall, this proposal is not >> just for tackling de-compression on MM but about the feasibility of >> extending Kafka message header for system properties / app properties. >> >> >>> Some low level comments. >>> >>> 4. Broker offset reassignment (kafka-527): This probably can be done >>> with >>> just a format change on the compressed message set. >>> >>> That is true. As I mentioned in the wiki each of the problems may be >> resolvable separately but I am thinking about a general way to get all of >> them. >> >> >>> 5. MirrorMaker refactoring: We probably can think through how general we >>> want mirror maker to be. If we want to it to be more general, we likely >>> need to decompress every message just like in a normal consumer. There >>> will >>> definitely be overhead. However, as long as mirror maker is made >>> scalable, >>> we can overcome the overhead by just running more instances on more >>> hardware resources. As for the proposed message format change, we >>> probably >>> need to think through it a bit more. The honor-ship flag seems a bit >>> hacky >>> to me. >>> >>> >> Replied as part of 3). Sure we can discuss more about that, will update >> the wiki for collected comments. >> >> >>> 6. Adding a timestamp in each message can be a useful thing. It (1) >>> allows >>> log segments to be rolled more accurately; (2) allows finding an offset >>> for >>> a particular timestamp more accurately. I am thinking that the timestamp >>> in >>> the message should probably be the time when the leader receives the >>> message. Followers preserve the timestamp set by leader. To avoid time >>> going back during leader change, the leader can probably set the >>> timestamp >>> to be the max of current time and the timestamp of the last message, if >>> present. That timestamp can potentially be added to the index file to >>> answer offsetBeforeTimestamp queries more efficiently. >>> >>> >> Agreed. >> >> >>> 7. Log compaction: It seems that you are suggesting an improvement to >>> compact the active segment as well. This can be tricky and we need to >>> figure out the details on how to do this. This improvement seems to be >>> orthogonal to the message format change though. >>> >>> >> I think the improvements is more effective with the timestamps as in 6), >> we can discuss more about this. >> >> >>> 8. Data inconsistency from unclean election: I am not sure if we need to >>> add a controlled message to the log during leadership change. The <leader >>> generation, starting offset> map can be maintained in a separate >>> checkpoint >>> file. The follower just need to get that map from the leader during >>> startup. >>> >>> What I was proposing is an alternative solution given that we have this >> message header enhancement; with this we do not need to add another logic >> for leadership map and checkpoint file, but just the logic on >> replica-manager to handle this extra controlled message and remembering the >> current leader epoch instead of a map. >> >> >>> Thanks, >>> >>> Jun >>> >>> On Fri, Oct 10, 2014 at 5:33 PM, Guozhang Wang <wangg...@gmail.com> >>> wrote: >>> >>> > Hello all, >>> > >>> > I put some thoughts on enhancing our current message metadata format to >>> > solve a bunch of existing issues: >>> > >>> > >>> > >>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata >>> > >>> > This wiki page is for kicking off some discussions about the >>> feasibility of >>> > adding more info into the message header, and if possible how we would >>> add >>> > them. >>> > >>> > -- Guozhang >>> > >>> >> >> >> >> -- >> -- Guozhang >> > > > > -- > -- Guozhang > -- -- Guozhang