Hi all,

I have updated the wiki page (
https://iwww.corp.linkedin.com/wiki/cf/display/ENGS/Kafka+Enriched+Message+Metadata)
according to people's comments and discussions offline.

Guozhang

On Thu, Nov 13, 2014 at 9:43 AM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hi Jun,
>
> Sorry for the delay on your comments in the wiki page as well as this
> thread; quite swamped now. I will get back to you as soon as I find some
> time.
>
> Guozhang
>
> On Tue, Nov 11, 2014 at 6:26 PM, Jun Rao <jun...@gmail.com> wrote:
>
>> Thinking about this a bit more. For adding the auditing support, I am not
>> sure if we need to change the message format by adding the application
>> tags. An alternative way to do that is to add it in the producer client.
>> For example, for each message payload (doesn't matter what the
>> serialization mechanism is) that a producer receives, the producer can
>> just
>> add a header before the original payload. The header will contain all
>> needed fields (e.g. timestamp, host, etc) for the purpose of auditing.
>> This
>> way, we don't need to change the message format and the auditing info can
>> be added independent of the serialization mechanism of the message. The
>> header can use a different serialization mechanism for better efficiency.
>> For example, if we use Avro to serialize the header, the encoded bytes
>> won't include the field names in the header. This is potentially more
>> efficient than representing those fields as application tags in the
>> message
>> where the tags have to be explicitly store in every message.
>>
>> To make it easier for the client to add and make use of this kind of
>> auditing support, I was imagining that we can add a ProducerFactory in the
>> new java client. The ProducerFactory will create an instance of Producer
>> based on a config property. By default, the current KafkaProducer will be
>> returned. However, a user can plug in a different implementation of
>> Producer that does auditing. For example, an implementation of an
>> AuditProducer.send() can take the original ProducerRecord, add the header
>> to the value byte array and then forward the record to an underlying
>> KafkaProducer. We can add a similar ConsumerFactory to the new consumer
>> client. If a user plugs in an implementation of the AuditingConsumer, the
>> consumer will then be audited automatically.
>>
>> Thanks,
>>
>> Jun
>>
>> On Tue, Oct 21, 2014 at 4:06 PM, Guozhang Wang <wangg...@gmail.com>
>> wrote:
>>
>> > Hi Jun,
>> >
>> > Regarding 4) in your comment, after thinking it for a while I cannot
>> come
>> > up a way to it along with log compaction without adding new fields into
>> the
>> > current format on message set. Do you have a better way that do not
>> require
>> > protocol changes?
>> >
>> > Guozhang
>> >
>> > On Mon, Oct 20, 2014 at 9:53 AM, Guozhang Wang <wangg...@gmail.com>
>> wrote:
>> >
>> > > I have updated the wiki page incorporating received comments. We can
>> > > discuss some more details on:
>> > >
>> > > 1. How we want to do audit? Whether we want to have in-built auditing
>> on
>> > > brokers or even MMs or use  an audit consumer to fetch all messages
>> from
>> > > just brokers.
>> > >
>> > > 2. How we can avoid de-/re-compression on brokers and MMs with log
>> > > compaction turned on.
>> > >
>> > > 3. How we can resolve unclean leader election resulted data
>> inconsistency
>> > > with control messages.
>> > >
>> > > Guozhang
>> > >
>> > > On Sun, Oct 19, 2014 at 11:41 PM, Guozhang Wang <wangg...@gmail.com>
>> > > wrote:
>> > >
>> > >> Thanks for the detailed comments Jun! Some replies inlined.
>> > >>
>> > >> On Sun, Oct 19, 2014 at 7:42 PM, Jun Rao <jun...@gmail.com> wrote:
>> > >>
>> > >>> Hi, Guozhang,
>> > >>>
>> > >>> Thanks for the writeup.
>> > >>>
>> > >>> A few high level comments.
>> > >>>
>> > >>> 1. Associating (versioned) schemas to a topic can be a good thing
>> > >>> overall.
>> > >>> Yes, this could add a bit more management overhead in Kafka.
>> However,
>> > it
>> > >>> makes sure that the data format contract between a producer and a
>> > >>> consumer
>> > >>> is kept and managed in a central place, instead of in the
>> application.
>> > >>> The
>> > >>> latter is probably easier to start with, but is likely to be
>> brittle in
>> > >>> the
>> > >>> long run.
>> > >>>
>> > >>
>> > >> I am actually not proposing to not support associated versioned
>> schemas
>> > >> for topics, but to not let some core Kafka functionalities like
>> auditing
>> > >> being depend on schemas. I think this alone can separate the schema
>> > >> management from Kafka piping management (i.e. making sure every
>> single
>> > >> message is delivered, and within some latency, etc). Adding
>> additional
>> > >> auditing info into an existing schema will force Kafka to be aware of
>> > the
>> > >> schema systems (Avro, JSON, etc).
>> > >>
>> > >>
>> > >>>
>> > >>> 2. Auditing can be a general feature that's useful for many
>> > applications.
>> > >>> Such a feature can be implemented by extending the low level message
>> > >>> format
>> > >>> with a header. However, it can also be added as part of the schema
>> > >>> management. For example, you can imagine a type of audited schema
>> that
>> > >>> adds
>> > >>> additional auditing info to an existing schema automatically.
>> > Performance
>> > >>> wise, it probably doesn't make a big difference whether the auditing
>> > info
>> > >>> is added in the message header or the schema header.
>> > >>>
>> > >>>
>> > >> See replies above.
>> > >>
>> > >>
>> > >>> 3. We talked about avoiding the overhead of decompressing in both
>> the
>> > >>> broker and the mirror maker. We probably need to think through how
>> this
>> > >>> works with auditing. In the more general case where you want to
>> audit
>> > >>> every
>> > >>> message, one has to do the decompression to get the individual
>> message,
>> > >>> independent of how the auditing info is stored. This means that if
>> we
>> > >>> want
>> > >>> to audit the broker directly or the consumer in mirror maker, we
>> have
>> > to
>> > >>> pay the decompression cost anyway. Similarly, if we want to extend
>> > mirror
>> > >>> maker to support some customized filtering/transformation logic, we
>> > also
>> > >>> have to pay the decompression cost.
>> > >>>
>> > >>>
>> > >> I see your point. For that I would prefer to have a MM implementation
>> > >> that is able to do de-compress / re-compress ONLY if required, for
>> > example
>> > >> by auditing, etc. I agree that we have not thought through whether we
>> > >> should enable auditing on MM, and if yes how to do that, and we could
>> > >> discuss about that in a different thread. Overall, this proposal is
>> not
>> > >> just for tackling de-compression on MM but about the feasibility of
>> > >> extending Kafka message header for system properties / app
>> properties.
>> > >>
>> > >>
>> > >>> Some low level comments.
>> > >>>
>> > >>> 4. Broker offset reassignment (kafka-527):  This probably can be
>> done
>> > >>> with
>> > >>> just a format change on the compressed message set.
>> > >>>
>> > >>> That is true. As I mentioned in the wiki each of the problems may be
>> > >> resolvable separately but I am thinking about a general way to get
>> all
>> > of
>> > >> them.
>> > >>
>> > >>
>> > >>> 5. MirrorMaker refactoring: We probably can think through how
>> general
>> > we
>> > >>> want mirror maker to be. If we want to it to be more general, we
>> likely
>> > >>> need to decompress every message just like in a normal consumer.
>> There
>> > >>> will
>> > >>> definitely be overhead. However, as long as mirror maker is made
>> > >>> scalable,
>> > >>> we can overcome the overhead by just running more instances on more
>> > >>> hardware resources. As for the proposed message format change, we
>> > >>> probably
>> > >>> need to think through it a bit more. The honor-ship flag seems a bit
>> > >>> hacky
>> > >>> to me.
>> > >>>
>> > >>>
>> > >> Replied as part of 3). Sure we can discuss more about that, will
>> update
>> > >> the wiki for collected comments.
>> > >>
>> > >>
>> > >>> 6. Adding a timestamp in each message can be a useful thing. It (1)
>> > >>> allows
>> > >>> log segments to be rolled more accurately; (2) allows finding an
>> offset
>> > >>> for
>> > >>> a particular timestamp more accurately. I am thinking that the
>> > timestamp
>> > >>> in
>> > >>> the message should probably be the time when the leader receives the
>> > >>> message. Followers preserve the timestamp set by leader. To avoid
>> time
>> > >>> going back during leader change, the leader can probably set the
>> > >>> timestamp
>> > >>> to be the  max of current time and the timestamp of the last
>> message,
>> > if
>> > >>> present. That timestamp can potentially be added to the index file
>> to
>> > >>> answer offsetBeforeTimestamp queries more efficiently.
>> > >>>
>> > >>>
>> > >> Agreed.
>> > >>
>> > >>
>> > >>> 7. Log compaction: It seems that you are suggesting an improvement
>> to
>> > >>> compact the active segment as well. This can be tricky and we need
>> to
>> > >>> figure out the details on how to do this. This improvement seems to
>> be
>> > >>> orthogonal to the message format change though.
>> > >>>
>> > >>>
>> > >> I think the improvements is more effective with the timestamps as in
>> 6),
>> > >> we can discuss more about this.
>> > >>
>> > >>
>> > >>> 8. Data inconsistency from unclean election: I am not sure if we
>> need
>> > to
>> > >>> add a controlled message to the log during leadership change. The
>> > <leader
>> > >>> generation, starting offset> map can be maintained in a separate
>> > >>> checkpoint
>> > >>> file. The follower just need to get that map from the leader during
>> > >>> startup.
>> > >>>
>> > >>> What I was proposing is an alternative solution given that we have
>> this
>> > >> message header enhancement; with this we do not need to add another
>> > logic
>> > >> for leadership map and checkpoint file, but just the logic on
>> > >> replica-manager to handle this extra controlled message and
>> remembering
>> > the
>> > >> current leader epoch instead of a map.
>> > >>
>> > >>
>> > >>> Thanks,
>> > >>>
>> > >>> Jun
>> > >>>
>> > >>> On Fri, Oct 10, 2014 at 5:33 PM, Guozhang Wang <wangg...@gmail.com>
>> > >>> wrote:
>> > >>>
>> > >>> > Hello all,
>> > >>> >
>> > >>> > I put some thoughts on enhancing our current message metadata
>> format
>> > to
>> > >>> > solve a bunch of existing issues:
>> > >>> >
>> > >>> >
>> > >>> >
>> > >>>
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
>> > >>> >
>> > >>> > This wiki page is for kicking off some discussions about the
>> > >>> feasibility of
>> > >>> > adding more info into the message header, and if possible how we
>> > would
>> > >>> add
>> > >>> > them.
>> > >>> >
>> > >>> > -- Guozhang
>> > >>> >
>> > >>>
>> > >>
>> > >>
>> > >>
>> > >> --
>> > >> -- Guozhang
>> > >>
>> > >
>> > >
>> > >
>> > > --
>> > > -- Guozhang
>> > >
>> >
>> >
>> >
>> > --
>> > -- Guozhang
>> >
>>
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang

Reply via email to