Hi Jun, Sorry for the delay on your comments in the wiki page as well as this thread; quite swamped now. I will get back to you as soon as I find some time.
Guozhang On Tue, Nov 11, 2014 at 6:26 PM, Jun Rao <jun...@gmail.com> wrote: > Thinking about this a bit more. For adding the auditing support, I am not > sure if we need to change the message format by adding the application > tags. An alternative way to do that is to add it in the producer client. > For example, for each message payload (doesn't matter what the > serialization mechanism is) that a producer receives, the producer can just > add a header before the original payload. The header will contain all > needed fields (e.g. timestamp, host, etc) for the purpose of auditing. This > way, we don't need to change the message format and the auditing info can > be added independent of the serialization mechanism of the message. The > header can use a different serialization mechanism for better efficiency. > For example, if we use Avro to serialize the header, the encoded bytes > won't include the field names in the header. This is potentially more > efficient than representing those fields as application tags in the message > where the tags have to be explicitly store in every message. > > To make it easier for the client to add and make use of this kind of > auditing support, I was imagining that we can add a ProducerFactory in the > new java client. The ProducerFactory will create an instance of Producer > based on a config property. By default, the current KafkaProducer will be > returned. However, a user can plug in a different implementation of > Producer that does auditing. For example, an implementation of an > AuditProducer.send() can take the original ProducerRecord, add the header > to the value byte array and then forward the record to an underlying > KafkaProducer. We can add a similar ConsumerFactory to the new consumer > client. If a user plugs in an implementation of the AuditingConsumer, the > consumer will then be audited automatically. > > Thanks, > > Jun > > On Tue, Oct 21, 2014 at 4:06 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > > Hi Jun, > > > > Regarding 4) in your comment, after thinking it for a while I cannot come > > up a way to it along with log compaction without adding new fields into > the > > current format on message set. Do you have a better way that do not > require > > protocol changes? > > > > Guozhang > > > > On Mon, Oct 20, 2014 at 9:53 AM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > I have updated the wiki page incorporating received comments. We can > > > discuss some more details on: > > > > > > 1. How we want to do audit? Whether we want to have in-built auditing > on > > > brokers or even MMs or use an audit consumer to fetch all messages > from > > > just brokers. > > > > > > 2. How we can avoid de-/re-compression on brokers and MMs with log > > > compaction turned on. > > > > > > 3. How we can resolve unclean leader election resulted data > inconsistency > > > with control messages. > > > > > > Guozhang > > > > > > On Sun, Oct 19, 2014 at 11:41 PM, Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > > > >> Thanks for the detailed comments Jun! Some replies inlined. > > >> > > >> On Sun, Oct 19, 2014 at 7:42 PM, Jun Rao <jun...@gmail.com> wrote: > > >> > > >>> Hi, Guozhang, > > >>> > > >>> Thanks for the writeup. > > >>> > > >>> A few high level comments. > > >>> > > >>> 1. Associating (versioned) schemas to a topic can be a good thing > > >>> overall. > > >>> Yes, this could add a bit more management overhead in Kafka. However, > > it > > >>> makes sure that the data format contract between a producer and a > > >>> consumer > > >>> is kept and managed in a central place, instead of in the > application. > > >>> The > > >>> latter is probably easier to start with, but is likely to be brittle > in > > >>> the > > >>> long run. > > >>> > > >> > > >> I am actually not proposing to not support associated versioned > schemas > > >> for topics, but to not let some core Kafka functionalities like > auditing > > >> being depend on schemas. I think this alone can separate the schema > > >> management from Kafka piping management (i.e. making sure every single > > >> message is delivered, and within some latency, etc). Adding additional > > >> auditing info into an existing schema will force Kafka to be aware of > > the > > >> schema systems (Avro, JSON, etc). > > >> > > >> > > >>> > > >>> 2. Auditing can be a general feature that's useful for many > > applications. > > >>> Such a feature can be implemented by extending the low level message > > >>> format > > >>> with a header. However, it can also be added as part of the schema > > >>> management. For example, you can imagine a type of audited schema > that > > >>> adds > > >>> additional auditing info to an existing schema automatically. > > Performance > > >>> wise, it probably doesn't make a big difference whether the auditing > > info > > >>> is added in the message header or the schema header. > > >>> > > >>> > > >> See replies above. > > >> > > >> > > >>> 3. We talked about avoiding the overhead of decompressing in both the > > >>> broker and the mirror maker. We probably need to think through how > this > > >>> works with auditing. In the more general case where you want to audit > > >>> every > > >>> message, one has to do the decompression to get the individual > message, > > >>> independent of how the auditing info is stored. This means that if we > > >>> want > > >>> to audit the broker directly or the consumer in mirror maker, we have > > to > > >>> pay the decompression cost anyway. Similarly, if we want to extend > > mirror > > >>> maker to support some customized filtering/transformation logic, we > > also > > >>> have to pay the decompression cost. > > >>> > > >>> > > >> I see your point. For that I would prefer to have a MM implementation > > >> that is able to do de-compress / re-compress ONLY if required, for > > example > > >> by auditing, etc. I agree that we have not thought through whether we > > >> should enable auditing on MM, and if yes how to do that, and we could > > >> discuss about that in a different thread. Overall, this proposal is > not > > >> just for tackling de-compression on MM but about the feasibility of > > >> extending Kafka message header for system properties / app properties. > > >> > > >> > > >>> Some low level comments. > > >>> > > >>> 4. Broker offset reassignment (kafka-527): This probably can be done > > >>> with > > >>> just a format change on the compressed message set. > > >>> > > >>> That is true. As I mentioned in the wiki each of the problems may be > > >> resolvable separately but I am thinking about a general way to get all > > of > > >> them. > > >> > > >> > > >>> 5. MirrorMaker refactoring: We probably can think through how general > > we > > >>> want mirror maker to be. If we want to it to be more general, we > likely > > >>> need to decompress every message just like in a normal consumer. > There > > >>> will > > >>> definitely be overhead. However, as long as mirror maker is made > > >>> scalable, > > >>> we can overcome the overhead by just running more instances on more > > >>> hardware resources. As for the proposed message format change, we > > >>> probably > > >>> need to think through it a bit more. The honor-ship flag seems a bit > > >>> hacky > > >>> to me. > > >>> > > >>> > > >> Replied as part of 3). Sure we can discuss more about that, will > update > > >> the wiki for collected comments. > > >> > > >> > > >>> 6. Adding a timestamp in each message can be a useful thing. It (1) > > >>> allows > > >>> log segments to be rolled more accurately; (2) allows finding an > offset > > >>> for > > >>> a particular timestamp more accurately. I am thinking that the > > timestamp > > >>> in > > >>> the message should probably be the time when the leader receives the > > >>> message. Followers preserve the timestamp set by leader. To avoid > time > > >>> going back during leader change, the leader can probably set the > > >>> timestamp > > >>> to be the max of current time and the timestamp of the last message, > > if > > >>> present. That timestamp can potentially be added to the index file to > > >>> answer offsetBeforeTimestamp queries more efficiently. > > >>> > > >>> > > >> Agreed. > > >> > > >> > > >>> 7. Log compaction: It seems that you are suggesting an improvement to > > >>> compact the active segment as well. This can be tricky and we need to > > >>> figure out the details on how to do this. This improvement seems to > be > > >>> orthogonal to the message format change though. > > >>> > > >>> > > >> I think the improvements is more effective with the timestamps as in > 6), > > >> we can discuss more about this. > > >> > > >> > > >>> 8. Data inconsistency from unclean election: I am not sure if we need > > to > > >>> add a controlled message to the log during leadership change. The > > <leader > > >>> generation, starting offset> map can be maintained in a separate > > >>> checkpoint > > >>> file. The follower just need to get that map from the leader during > > >>> startup. > > >>> > > >>> What I was proposing is an alternative solution given that we have > this > > >> message header enhancement; with this we do not need to add another > > logic > > >> for leadership map and checkpoint file, but just the logic on > > >> replica-manager to handle this extra controlled message and > remembering > > the > > >> current leader epoch instead of a map. > > >> > > >> > > >>> Thanks, > > >>> > > >>> Jun > > >>> > > >>> On Fri, Oct 10, 2014 at 5:33 PM, Guozhang Wang <wangg...@gmail.com> > > >>> wrote: > > >>> > > >>> > Hello all, > > >>> > > > >>> > I put some thoughts on enhancing our current message metadata > format > > to > > >>> > solve a bunch of existing issues: > > >>> > > > >>> > > > >>> > > > >>> > > > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata > > >>> > > > >>> > This wiki page is for kicking off some discussions about the > > >>> feasibility of > > >>> > adding more info into the message header, and if possible how we > > would > > >>> add > > >>> > them. > > >>> > > > >>> > -- Guozhang > > >>> > > > >>> > > >> > > >> > > >> > > >> -- > > >> -- Guozhang > > >> > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang