Hi all, I have updated the wiki page ( https://iwww.corp.linkedin.com/wiki/cf/display/ENGS/Kafka+Enriched+Message+Metadata) according to people's comments and discussions offline.
Guozhang On Thu, Nov 13, 2014 at 9:43 AM, Guozhang Wang <wangg...@gmail.com> wrote: > Hi Jun, > > Sorry for the delay on your comments in the wiki page as well as this > thread; quite swamped now. I will get back to you as soon as I find some > time. > > Guozhang > > On Tue, Nov 11, 2014 at 6:26 PM, Jun Rao <jun...@gmail.com> wrote: > >> Thinking about this a bit more. For adding the auditing support, I am not >> sure if we need to change the message format by adding the application >> tags. An alternative way to do that is to add it in the producer client. >> For example, for each message payload (doesn't matter what the >> serialization mechanism is) that a producer receives, the producer can >> just >> add a header before the original payload. The header will contain all >> needed fields (e.g. timestamp, host, etc) for the purpose of auditing. >> This >> way, we don't need to change the message format and the auditing info can >> be added independent of the serialization mechanism of the message. The >> header can use a different serialization mechanism for better efficiency. >> For example, if we use Avro to serialize the header, the encoded bytes >> won't include the field names in the header. This is potentially more >> efficient than representing those fields as application tags in the >> message >> where the tags have to be explicitly store in every message. >> >> To make it easier for the client to add and make use of this kind of >> auditing support, I was imagining that we can add a ProducerFactory in the >> new java client. The ProducerFactory will create an instance of Producer >> based on a config property. By default, the current KafkaProducer will be >> returned. However, a user can plug in a different implementation of >> Producer that does auditing. For example, an implementation of an >> AuditProducer.send() can take the original ProducerRecord, add the header >> to the value byte array and then forward the record to an underlying >> KafkaProducer. We can add a similar ConsumerFactory to the new consumer >> client. If a user plugs in an implementation of the AuditingConsumer, the >> consumer will then be audited automatically. >> >> Thanks, >> >> Jun >> >> On Tue, Oct 21, 2014 at 4:06 PM, Guozhang Wang <wangg...@gmail.com> >> wrote: >> >> > Hi Jun, >> > >> > Regarding 4) in your comment, after thinking it for a while I cannot >> come >> > up a way to it along with log compaction without adding new fields into >> the >> > current format on message set. Do you have a better way that do not >> require >> > protocol changes? >> > >> > Guozhang >> > >> > On Mon, Oct 20, 2014 at 9:53 AM, Guozhang Wang <wangg...@gmail.com> >> wrote: >> > >> > > I have updated the wiki page incorporating received comments. We can >> > > discuss some more details on: >> > > >> > > 1. How we want to do audit? Whether we want to have in-built auditing >> on >> > > brokers or even MMs or use an audit consumer to fetch all messages >> from >> > > just brokers. >> > > >> > > 2. How we can avoid de-/re-compression on brokers and MMs with log >> > > compaction turned on. >> > > >> > > 3. How we can resolve unclean leader election resulted data >> inconsistency >> > > with control messages. >> > > >> > > Guozhang >> > > >> > > On Sun, Oct 19, 2014 at 11:41 PM, Guozhang Wang <wangg...@gmail.com> >> > > wrote: >> > > >> > >> Thanks for the detailed comments Jun! Some replies inlined. >> > >> >> > >> On Sun, Oct 19, 2014 at 7:42 PM, Jun Rao <jun...@gmail.com> wrote: >> > >> >> > >>> Hi, Guozhang, >> > >>> >> > >>> Thanks for the writeup. >> > >>> >> > >>> A few high level comments. >> > >>> >> > >>> 1. Associating (versioned) schemas to a topic can be a good thing >> > >>> overall. >> > >>> Yes, this could add a bit more management overhead in Kafka. >> However, >> > it >> > >>> makes sure that the data format contract between a producer and a >> > >>> consumer >> > >>> is kept and managed in a central place, instead of in the >> application. >> > >>> The >> > >>> latter is probably easier to start with, but is likely to be >> brittle in >> > >>> the >> > >>> long run. >> > >>> >> > >> >> > >> I am actually not proposing to not support associated versioned >> schemas >> > >> for topics, but to not let some core Kafka functionalities like >> auditing >> > >> being depend on schemas. I think this alone can separate the schema >> > >> management from Kafka piping management (i.e. making sure every >> single >> > >> message is delivered, and within some latency, etc). Adding >> additional >> > >> auditing info into an existing schema will force Kafka to be aware of >> > the >> > >> schema systems (Avro, JSON, etc). >> > >> >> > >> >> > >>> >> > >>> 2. Auditing can be a general feature that's useful for many >> > applications. >> > >>> Such a feature can be implemented by extending the low level message >> > >>> format >> > >>> with a header. However, it can also be added as part of the schema >> > >>> management. For example, you can imagine a type of audited schema >> that >> > >>> adds >> > >>> additional auditing info to an existing schema automatically. >> > Performance >> > >>> wise, it probably doesn't make a big difference whether the auditing >> > info >> > >>> is added in the message header or the schema header. >> > >>> >> > >>> >> > >> See replies above. >> > >> >> > >> >> > >>> 3. We talked about avoiding the overhead of decompressing in both >> the >> > >>> broker and the mirror maker. We probably need to think through how >> this >> > >>> works with auditing. In the more general case where you want to >> audit >> > >>> every >> > >>> message, one has to do the decompression to get the individual >> message, >> > >>> independent of how the auditing info is stored. This means that if >> we >> > >>> want >> > >>> to audit the broker directly or the consumer in mirror maker, we >> have >> > to >> > >>> pay the decompression cost anyway. Similarly, if we want to extend >> > mirror >> > >>> maker to support some customized filtering/transformation logic, we >> > also >> > >>> have to pay the decompression cost. >> > >>> >> > >>> >> > >> I see your point. For that I would prefer to have a MM implementation >> > >> that is able to do de-compress / re-compress ONLY if required, for >> > example >> > >> by auditing, etc. I agree that we have not thought through whether we >> > >> should enable auditing on MM, and if yes how to do that, and we could >> > >> discuss about that in a different thread. Overall, this proposal is >> not >> > >> just for tackling de-compression on MM but about the feasibility of >> > >> extending Kafka message header for system properties / app >> properties. >> > >> >> > >> >> > >>> Some low level comments. >> > >>> >> > >>> 4. Broker offset reassignment (kafka-527): This probably can be >> done >> > >>> with >> > >>> just a format change on the compressed message set. >> > >>> >> > >>> That is true. As I mentioned in the wiki each of the problems may be >> > >> resolvable separately but I am thinking about a general way to get >> all >> > of >> > >> them. >> > >> >> > >> >> > >>> 5. MirrorMaker refactoring: We probably can think through how >> general >> > we >> > >>> want mirror maker to be. If we want to it to be more general, we >> likely >> > >>> need to decompress every message just like in a normal consumer. >> There >> > >>> will >> > >>> definitely be overhead. However, as long as mirror maker is made >> > >>> scalable, >> > >>> we can overcome the overhead by just running more instances on more >> > >>> hardware resources. As for the proposed message format change, we >> > >>> probably >> > >>> need to think through it a bit more. The honor-ship flag seems a bit >> > >>> hacky >> > >>> to me. >> > >>> >> > >>> >> > >> Replied as part of 3). Sure we can discuss more about that, will >> update >> > >> the wiki for collected comments. >> > >> >> > >> >> > >>> 6. Adding a timestamp in each message can be a useful thing. It (1) >> > >>> allows >> > >>> log segments to be rolled more accurately; (2) allows finding an >> offset >> > >>> for >> > >>> a particular timestamp more accurately. I am thinking that the >> > timestamp >> > >>> in >> > >>> the message should probably be the time when the leader receives the >> > >>> message. Followers preserve the timestamp set by leader. To avoid >> time >> > >>> going back during leader change, the leader can probably set the >> > >>> timestamp >> > >>> to be the max of current time and the timestamp of the last >> message, >> > if >> > >>> present. That timestamp can potentially be added to the index file >> to >> > >>> answer offsetBeforeTimestamp queries more efficiently. >> > >>> >> > >>> >> > >> Agreed. >> > >> >> > >> >> > >>> 7. Log compaction: It seems that you are suggesting an improvement >> to >> > >>> compact the active segment as well. This can be tricky and we need >> to >> > >>> figure out the details on how to do this. This improvement seems to >> be >> > >>> orthogonal to the message format change though. >> > >>> >> > >>> >> > >> I think the improvements is more effective with the timestamps as in >> 6), >> > >> we can discuss more about this. >> > >> >> > >> >> > >>> 8. Data inconsistency from unclean election: I am not sure if we >> need >> > to >> > >>> add a controlled message to the log during leadership change. The >> > <leader >> > >>> generation, starting offset> map can be maintained in a separate >> > >>> checkpoint >> > >>> file. The follower just need to get that map from the leader during >> > >>> startup. >> > >>> >> > >>> What I was proposing is an alternative solution given that we have >> this >> > >> message header enhancement; with this we do not need to add another >> > logic >> > >> for leadership map and checkpoint file, but just the logic on >> > >> replica-manager to handle this extra controlled message and >> remembering >> > the >> > >> current leader epoch instead of a map. >> > >> >> > >> >> > >>> Thanks, >> > >>> >> > >>> Jun >> > >>> >> > >>> On Fri, Oct 10, 2014 at 5:33 PM, Guozhang Wang <wangg...@gmail.com> >> > >>> wrote: >> > >>> >> > >>> > Hello all, >> > >>> > >> > >>> > I put some thoughts on enhancing our current message metadata >> format >> > to >> > >>> > solve a bunch of existing issues: >> > >>> > >> > >>> > >> > >>> > >> > >>> >> > >> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata >> > >>> > >> > >>> > This wiki page is for kicking off some discussions about the >> > >>> feasibility of >> > >>> > adding more info into the message header, and if possible how we >> > would >> > >>> add >> > >>> > them. >> > >>> > >> > >>> > -- Guozhang >> > >>> > >> > >>> >> > >> >> > >> >> > >> >> > >> -- >> > >> -- Guozhang >> > >> >> > > >> > > >> > > >> > > -- >> > > -- Guozhang >> > > >> > >> > >> > >> > -- >> > -- Guozhang >> > >> > > > > -- > -- Guozhang > -- -- Guozhang