Thinking about this a bit more. For adding the auditing support, I am not
sure if we need to change the message format by adding the application
tags. An alternative way to do that is to add it in the producer client.
For example, for each message payload (doesn't matter what the
serialization mechanism is) that a producer receives, the producer can just
add a header before the original payload. The header will contain all
needed fields (e.g. timestamp, host, etc) for the purpose of auditing. This
way, we don't need to change the message format and the auditing info can
be added independent of the serialization mechanism of the message. The
header can use a different serialization mechanism for better efficiency.
For example, if we use Avro to serialize the header, the encoded bytes
won't include the field names in the header. This is potentially more
efficient than representing those fields as application tags in the message
where the tags have to be explicitly store in every message.

To make it easier for the client to add and make use of this kind of
auditing support, I was imagining that we can add a ProducerFactory in the
new java client. The ProducerFactory will create an instance of Producer
based on a config property. By default, the current KafkaProducer will be
returned. However, a user can plug in a different implementation of
Producer that does auditing. For example, an implementation of an
AuditProducer.send() can take the original ProducerRecord, add the header
to the value byte array and then forward the record to an underlying
KafkaProducer. We can add a similar ConsumerFactory to the new consumer
client. If a user plugs in an implementation of the AuditingConsumer, the
consumer will then be audited automatically.

Thanks,

Jun

On Tue, Oct 21, 2014 at 4:06 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hi Jun,
>
> Regarding 4) in your comment, after thinking it for a while I cannot come
> up a way to it along with log compaction without adding new fields into the
> current format on message set. Do you have a better way that do not require
> protocol changes?
>
> Guozhang
>
> On Mon, Oct 20, 2014 at 9:53 AM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > I have updated the wiki page incorporating received comments. We can
> > discuss some more details on:
> >
> > 1. How we want to do audit? Whether we want to have in-built auditing on
> > brokers or even MMs or use  an audit consumer to fetch all messages from
> > just brokers.
> >
> > 2. How we can avoid de-/re-compression on brokers and MMs with log
> > compaction turned on.
> >
> > 3. How we can resolve unclean leader election resulted data inconsistency
> > with control messages.
> >
> > Guozhang
> >
> > On Sun, Oct 19, 2014 at 11:41 PM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> >
> >> Thanks for the detailed comments Jun! Some replies inlined.
> >>
> >> On Sun, Oct 19, 2014 at 7:42 PM, Jun Rao <jun...@gmail.com> wrote:
> >>
> >>> Hi, Guozhang,
> >>>
> >>> Thanks for the writeup.
> >>>
> >>> A few high level comments.
> >>>
> >>> 1. Associating (versioned) schemas to a topic can be a good thing
> >>> overall.
> >>> Yes, this could add a bit more management overhead in Kafka. However,
> it
> >>> makes sure that the data format contract between a producer and a
> >>> consumer
> >>> is kept and managed in a central place, instead of in the application.
> >>> The
> >>> latter is probably easier to start with, but is likely to be brittle in
> >>> the
> >>> long run.
> >>>
> >>
> >> I am actually not proposing to not support associated versioned schemas
> >> for topics, but to not let some core Kafka functionalities like auditing
> >> being depend on schemas. I think this alone can separate the schema
> >> management from Kafka piping management (i.e. making sure every single
> >> message is delivered, and within some latency, etc). Adding additional
> >> auditing info into an existing schema will force Kafka to be aware of
> the
> >> schema systems (Avro, JSON, etc).
> >>
> >>
> >>>
> >>> 2. Auditing can be a general feature that's useful for many
> applications.
> >>> Such a feature can be implemented by extending the low level message
> >>> format
> >>> with a header. However, it can also be added as part of the schema
> >>> management. For example, you can imagine a type of audited schema that
> >>> adds
> >>> additional auditing info to an existing schema automatically.
> Performance
> >>> wise, it probably doesn't make a big difference whether the auditing
> info
> >>> is added in the message header or the schema header.
> >>>
> >>>
> >> See replies above.
> >>
> >>
> >>> 3. We talked about avoiding the overhead of decompressing in both the
> >>> broker and the mirror maker. We probably need to think through how this
> >>> works with auditing. In the more general case where you want to audit
> >>> every
> >>> message, one has to do the decompression to get the individual message,
> >>> independent of how the auditing info is stored. This means that if we
> >>> want
> >>> to audit the broker directly or the consumer in mirror maker, we have
> to
> >>> pay the decompression cost anyway. Similarly, if we want to extend
> mirror
> >>> maker to support some customized filtering/transformation logic, we
> also
> >>> have to pay the decompression cost.
> >>>
> >>>
> >> I see your point. For that I would prefer to have a MM implementation
> >> that is able to do de-compress / re-compress ONLY if required, for
> example
> >> by auditing, etc. I agree that we have not thought through whether we
> >> should enable auditing on MM, and if yes how to do that, and we could
> >> discuss about that in a different thread. Overall, this proposal is not
> >> just for tackling de-compression on MM but about the feasibility of
> >> extending Kafka message header for system properties / app properties.
> >>
> >>
> >>> Some low level comments.
> >>>
> >>> 4. Broker offset reassignment (kafka-527):  This probably can be done
> >>> with
> >>> just a format change on the compressed message set.
> >>>
> >>> That is true. As I mentioned in the wiki each of the problems may be
> >> resolvable separately but I am thinking about a general way to get all
> of
> >> them.
> >>
> >>
> >>> 5. MirrorMaker refactoring: We probably can think through how general
> we
> >>> want mirror maker to be. If we want to it to be more general, we likely
> >>> need to decompress every message just like in a normal consumer. There
> >>> will
> >>> definitely be overhead. However, as long as mirror maker is made
> >>> scalable,
> >>> we can overcome the overhead by just running more instances on more
> >>> hardware resources. As for the proposed message format change, we
> >>> probably
> >>> need to think through it a bit more. The honor-ship flag seems a bit
> >>> hacky
> >>> to me.
> >>>
> >>>
> >> Replied as part of 3). Sure we can discuss more about that, will update
> >> the wiki for collected comments.
> >>
> >>
> >>> 6. Adding a timestamp in each message can be a useful thing. It (1)
> >>> allows
> >>> log segments to be rolled more accurately; (2) allows finding an offset
> >>> for
> >>> a particular timestamp more accurately. I am thinking that the
> timestamp
> >>> in
> >>> the message should probably be the time when the leader receives the
> >>> message. Followers preserve the timestamp set by leader. To avoid time
> >>> going back during leader change, the leader can probably set the
> >>> timestamp
> >>> to be the  max of current time and the timestamp of the last message,
> if
> >>> present. That timestamp can potentially be added to the index file to
> >>> answer offsetBeforeTimestamp queries more efficiently.
> >>>
> >>>
> >> Agreed.
> >>
> >>
> >>> 7. Log compaction: It seems that you are suggesting an improvement to
> >>> compact the active segment as well. This can be tricky and we need to
> >>> figure out the details on how to do this. This improvement seems to be
> >>> orthogonal to the message format change though.
> >>>
> >>>
> >> I think the improvements is more effective with the timestamps as in 6),
> >> we can discuss more about this.
> >>
> >>
> >>> 8. Data inconsistency from unclean election: I am not sure if we need
> to
> >>> add a controlled message to the log during leadership change. The
> <leader
> >>> generation, starting offset> map can be maintained in a separate
> >>> checkpoint
> >>> file. The follower just need to get that map from the leader during
> >>> startup.
> >>>
> >>> What I was proposing is an alternative solution given that we have this
> >> message header enhancement; with this we do not need to add another
> logic
> >> for leadership map and checkpoint file, but just the logic on
> >> replica-manager to handle this extra controlled message and remembering
> the
> >> current leader epoch instead of a map.
> >>
> >>
> >>> Thanks,
> >>>
> >>> Jun
> >>>
> >>> On Fri, Oct 10, 2014 at 5:33 PM, Guozhang Wang <wangg...@gmail.com>
> >>> wrote:
> >>>
> >>> > Hello all,
> >>> >
> >>> > I put some thoughts on enhancing our current message metadata format
> to
> >>> > solve a bunch of existing issues:
> >>> >
> >>> >
> >>> >
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
> >>> >
> >>> > This wiki page is for kicking off some discussions about the
> >>> feasibility of
> >>> > adding more info into the message header, and if possible how we
> would
> >>> add
> >>> > them.
> >>> >
> >>> > -- Guozhang
> >>> >
> >>>
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
> >
> > --
> > -- Guozhang
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to