Hi Guozhang,

I don't think that is publically accessible, can you update it to the
Kafka wiki?

Tim

On Fri, Nov 21, 2014 at 5:24 PM, Guozhang Wang <wangg...@gmail.com> wrote:
> Hi all,
>
> I have updated the wiki page (
> https://iwww.corp.linkedin.com/wiki/cf/display/ENGS/Kafka+Enriched+Message+Metadata)
> according to people's comments and discussions offline.
>
> Guozhang
>
> On Thu, Nov 13, 2014 at 9:43 AM, Guozhang Wang <wangg...@gmail.com> wrote:
>
>> Hi Jun,
>>
>> Sorry for the delay on your comments in the wiki page as well as this
>> thread; quite swamped now. I will get back to you as soon as I find some
>> time.
>>
>> Guozhang
>>
>> On Tue, Nov 11, 2014 at 6:26 PM, Jun Rao <jun...@gmail.com> wrote:
>>
>>> Thinking about this a bit more. For adding the auditing support, I am not
>>> sure if we need to change the message format by adding the application
>>> tags. An alternative way to do that is to add it in the producer client.
>>> For example, for each message payload (doesn't matter what the
>>> serialization mechanism is) that a producer receives, the producer can
>>> just
>>> add a header before the original payload. The header will contain all
>>> needed fields (e.g. timestamp, host, etc) for the purpose of auditing.
>>> This
>>> way, we don't need to change the message format and the auditing info can
>>> be added independent of the serialization mechanism of the message. The
>>> header can use a different serialization mechanism for better efficiency.
>>> For example, if we use Avro to serialize the header, the encoded bytes
>>> won't include the field names in the header. This is potentially more
>>> efficient than representing those fields as application tags in the
>>> message
>>> where the tags have to be explicitly store in every message.
>>>
>>> To make it easier for the client to add and make use of this kind of
>>> auditing support, I was imagining that we can add a ProducerFactory in the
>>> new java client. The ProducerFactory will create an instance of Producer
>>> based on a config property. By default, the current KafkaProducer will be
>>> returned. However, a user can plug in a different implementation of
>>> Producer that does auditing. For example, an implementation of an
>>> AuditProducer.send() can take the original ProducerRecord, add the header
>>> to the value byte array and then forward the record to an underlying
>>> KafkaProducer. We can add a similar ConsumerFactory to the new consumer
>>> client. If a user plugs in an implementation of the AuditingConsumer, the
>>> consumer will then be audited automatically.
>>>
>>> Thanks,
>>>
>>> Jun
>>>
>>> On Tue, Oct 21, 2014 at 4:06 PM, Guozhang Wang <wangg...@gmail.com>
>>> wrote:
>>>
>>> > Hi Jun,
>>> >
>>> > Regarding 4) in your comment, after thinking it for a while I cannot
>>> come
>>> > up a way to it along with log compaction without adding new fields into
>>> the
>>> > current format on message set. Do you have a better way that do not
>>> require
>>> > protocol changes?
>>> >
>>> > Guozhang
>>> >
>>> > On Mon, Oct 20, 2014 at 9:53 AM, Guozhang Wang <wangg...@gmail.com>
>>> wrote:
>>> >
>>> > > I have updated the wiki page incorporating received comments. We can
>>> > > discuss some more details on:
>>> > >
>>> > > 1. How we want to do audit? Whether we want to have in-built auditing
>>> on
>>> > > brokers or even MMs or use  an audit consumer to fetch all messages
>>> from
>>> > > just brokers.
>>> > >
>>> > > 2. How we can avoid de-/re-compression on brokers and MMs with log
>>> > > compaction turned on.
>>> > >
>>> > > 3. How we can resolve unclean leader election resulted data
>>> inconsistency
>>> > > with control messages.
>>> > >
>>> > > Guozhang
>>> > >
>>> > > On Sun, Oct 19, 2014 at 11:41 PM, Guozhang Wang <wangg...@gmail.com>
>>> > > wrote:
>>> > >
>>> > >> Thanks for the detailed comments Jun! Some replies inlined.
>>> > >>
>>> > >> On Sun, Oct 19, 2014 at 7:42 PM, Jun Rao <jun...@gmail.com> wrote:
>>> > >>
>>> > >>> Hi, Guozhang,
>>> > >>>
>>> > >>> Thanks for the writeup.
>>> > >>>
>>> > >>> A few high level comments.
>>> > >>>
>>> > >>> 1. Associating (versioned) schemas to a topic can be a good thing
>>> > >>> overall.
>>> > >>> Yes, this could add a bit more management overhead in Kafka.
>>> However,
>>> > it
>>> > >>> makes sure that the data format contract between a producer and a
>>> > >>> consumer
>>> > >>> is kept and managed in a central place, instead of in the
>>> application.
>>> > >>> The
>>> > >>> latter is probably easier to start with, but is likely to be
>>> brittle in
>>> > >>> the
>>> > >>> long run.
>>> > >>>
>>> > >>
>>> > >> I am actually not proposing to not support associated versioned
>>> schemas
>>> > >> for topics, but to not let some core Kafka functionalities like
>>> auditing
>>> > >> being depend on schemas. I think this alone can separate the schema
>>> > >> management from Kafka piping management (i.e. making sure every
>>> single
>>> > >> message is delivered, and within some latency, etc). Adding
>>> additional
>>> > >> auditing info into an existing schema will force Kafka to be aware of
>>> > the
>>> > >> schema systems (Avro, JSON, etc).
>>> > >>
>>> > >>
>>> > >>>
>>> > >>> 2. Auditing can be a general feature that's useful for many
>>> > applications.
>>> > >>> Such a feature can be implemented by extending the low level message
>>> > >>> format
>>> > >>> with a header. However, it can also be added as part of the schema
>>> > >>> management. For example, you can imagine a type of audited schema
>>> that
>>> > >>> adds
>>> > >>> additional auditing info to an existing schema automatically.
>>> > Performance
>>> > >>> wise, it probably doesn't make a big difference whether the auditing
>>> > info
>>> > >>> is added in the message header or the schema header.
>>> > >>>
>>> > >>>
>>> > >> See replies above.
>>> > >>
>>> > >>
>>> > >>> 3. We talked about avoiding the overhead of decompressing in both
>>> the
>>> > >>> broker and the mirror maker. We probably need to think through how
>>> this
>>> > >>> works with auditing. In the more general case where you want to
>>> audit
>>> > >>> every
>>> > >>> message, one has to do the decompression to get the individual
>>> message,
>>> > >>> independent of how the auditing info is stored. This means that if
>>> we
>>> > >>> want
>>> > >>> to audit the broker directly or the consumer in mirror maker, we
>>> have
>>> > to
>>> > >>> pay the decompression cost anyway. Similarly, if we want to extend
>>> > mirror
>>> > >>> maker to support some customized filtering/transformation logic, we
>>> > also
>>> > >>> have to pay the decompression cost.
>>> > >>>
>>> > >>>
>>> > >> I see your point. For that I would prefer to have a MM implementation
>>> > >> that is able to do de-compress / re-compress ONLY if required, for
>>> > example
>>> > >> by auditing, etc. I agree that we have not thought through whether we
>>> > >> should enable auditing on MM, and if yes how to do that, and we could
>>> > >> discuss about that in a different thread. Overall, this proposal is
>>> not
>>> > >> just for tackling de-compression on MM but about the feasibility of
>>> > >> extending Kafka message header for system properties / app
>>> properties.
>>> > >>
>>> > >>
>>> > >>> Some low level comments.
>>> > >>>
>>> > >>> 4. Broker offset reassignment (kafka-527):  This probably can be
>>> done
>>> > >>> with
>>> > >>> just a format change on the compressed message set.
>>> > >>>
>>> > >>> That is true. As I mentioned in the wiki each of the problems may be
>>> > >> resolvable separately but I am thinking about a general way to get
>>> all
>>> > of
>>> > >> them.
>>> > >>
>>> > >>
>>> > >>> 5. MirrorMaker refactoring: We probably can think through how
>>> general
>>> > we
>>> > >>> want mirror maker to be. If we want to it to be more general, we
>>> likely
>>> > >>> need to decompress every message just like in a normal consumer.
>>> There
>>> > >>> will
>>> > >>> definitely be overhead. However, as long as mirror maker is made
>>> > >>> scalable,
>>> > >>> we can overcome the overhead by just running more instances on more
>>> > >>> hardware resources. As for the proposed message format change, we
>>> > >>> probably
>>> > >>> need to think through it a bit more. The honor-ship flag seems a bit
>>> > >>> hacky
>>> > >>> to me.
>>> > >>>
>>> > >>>
>>> > >> Replied as part of 3). Sure we can discuss more about that, will
>>> update
>>> > >> the wiki for collected comments.
>>> > >>
>>> > >>
>>> > >>> 6. Adding a timestamp in each message can be a useful thing. It (1)
>>> > >>> allows
>>> > >>> log segments to be rolled more accurately; (2) allows finding an
>>> offset
>>> > >>> for
>>> > >>> a particular timestamp more accurately. I am thinking that the
>>> > timestamp
>>> > >>> in
>>> > >>> the message should probably be the time when the leader receives the
>>> > >>> message. Followers preserve the timestamp set by leader. To avoid
>>> time
>>> > >>> going back during leader change, the leader can probably set the
>>> > >>> timestamp
>>> > >>> to be the  max of current time and the timestamp of the last
>>> message,
>>> > if
>>> > >>> present. That timestamp can potentially be added to the index file
>>> to
>>> > >>> answer offsetBeforeTimestamp queries more efficiently.
>>> > >>>
>>> > >>>
>>> > >> Agreed.
>>> > >>
>>> > >>
>>> > >>> 7. Log compaction: It seems that you are suggesting an improvement
>>> to
>>> > >>> compact the active segment as well. This can be tricky and we need
>>> to
>>> > >>> figure out the details on how to do this. This improvement seems to
>>> be
>>> > >>> orthogonal to the message format change though.
>>> > >>>
>>> > >>>
>>> > >> I think the improvements is more effective with the timestamps as in
>>> 6),
>>> > >> we can discuss more about this.
>>> > >>
>>> > >>
>>> > >>> 8. Data inconsistency from unclean election: I am not sure if we
>>> need
>>> > to
>>> > >>> add a controlled message to the log during leadership change. The
>>> > <leader
>>> > >>> generation, starting offset> map can be maintained in a separate
>>> > >>> checkpoint
>>> > >>> file. The follower just need to get that map from the leader during
>>> > >>> startup.
>>> > >>>
>>> > >>> What I was proposing is an alternative solution given that we have
>>> this
>>> > >> message header enhancement; with this we do not need to add another
>>> > logic
>>> > >> for leadership map and checkpoint file, but just the logic on
>>> > >> replica-manager to handle this extra controlled message and
>>> remembering
>>> > the
>>> > >> current leader epoch instead of a map.
>>> > >>
>>> > >>
>>> > >>> Thanks,
>>> > >>>
>>> > >>> Jun
>>> > >>>
>>> > >>> On Fri, Oct 10, 2014 at 5:33 PM, Guozhang Wang <wangg...@gmail.com>
>>> > >>> wrote:
>>> > >>>
>>> > >>> > Hello all,
>>> > >>> >
>>> > >>> > I put some thoughts on enhancing our current message metadata
>>> format
>>> > to
>>> > >>> > solve a bunch of existing issues:
>>> > >>> >
>>> > >>> >
>>> > >>> >
>>> > >>>
>>> >
>>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
>>> > >>> >
>>> > >>> > This wiki page is for kicking off some discussions about the
>>> > >>> feasibility of
>>> > >>> > adding more info into the message header, and if possible how we
>>> > would
>>> > >>> add
>>> > >>> > them.
>>> > >>> >
>>> > >>> > -- Guozhang
>>> > >>> >
>>> > >>>
>>> > >>
>>> > >>
>>> > >>
>>> > >> --
>>> > >> -- Guozhang
>>> > >>
>>> > >
>>> > >
>>> > >
>>> > > --
>>> > > -- Guozhang
>>> > >
>>> >
>>> >
>>> >
>>> > --
>>> > -- Guozhang
>>> >
>>>
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>
>
> --
> -- Guozhang

Reply via email to