My bad, the link should be this:

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata

On Fri, Nov 21, 2014 at 5:29 PM, Timothy Chen <tnac...@gmail.com> wrote:

> Hi Guozhang,
>
> I don't think that is publically accessible, can you update it to the
> Kafka wiki?
>
> Tim
>
> On Fri, Nov 21, 2014 at 5:24 PM, Guozhang Wang <wangg...@gmail.com> wrote:
> > Hi all,
> >
> > I have updated the wiki page (
> >
> https://iwww.corp.linkedin.com/wiki/cf/display/ENGS/Kafka+Enriched+Message+Metadata
> )
> > according to people's comments and discussions offline.
> >
> > Guozhang
> >
> > On Thu, Nov 13, 2014 at 9:43 AM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> >> Hi Jun,
> >>
> >> Sorry for the delay on your comments in the wiki page as well as this
> >> thread; quite swamped now. I will get back to you as soon as I find some
> >> time.
> >>
> >> Guozhang
> >>
> >> On Tue, Nov 11, 2014 at 6:26 PM, Jun Rao <jun...@gmail.com> wrote:
> >>
> >>> Thinking about this a bit more. For adding the auditing support, I am
> not
> >>> sure if we need to change the message format by adding the application
> >>> tags. An alternative way to do that is to add it in the producer
> client.
> >>> For example, for each message payload (doesn't matter what the
> >>> serialization mechanism is) that a producer receives, the producer can
> >>> just
> >>> add a header before the original payload. The header will contain all
> >>> needed fields (e.g. timestamp, host, etc) for the purpose of auditing.
> >>> This
> >>> way, we don't need to change the message format and the auditing info
> can
> >>> be added independent of the serialization mechanism of the message. The
> >>> header can use a different serialization mechanism for better
> efficiency.
> >>> For example, if we use Avro to serialize the header, the encoded bytes
> >>> won't include the field names in the header. This is potentially more
> >>> efficient than representing those fields as application tags in the
> >>> message
> >>> where the tags have to be explicitly store in every message.
> >>>
> >>> To make it easier for the client to add and make use of this kind of
> >>> auditing support, I was imagining that we can add a ProducerFactory in
> the
> >>> new java client. The ProducerFactory will create an instance of
> Producer
> >>> based on a config property. By default, the current KafkaProducer will
> be
> >>> returned. However, a user can plug in a different implementation of
> >>> Producer that does auditing. For example, an implementation of an
> >>> AuditProducer.send() can take the original ProducerRecord, add the
> header
> >>> to the value byte array and then forward the record to an underlying
> >>> KafkaProducer. We can add a similar ConsumerFactory to the new consumer
> >>> client. If a user plugs in an implementation of the AuditingConsumer,
> the
> >>> consumer will then be audited automatically.
> >>>
> >>> Thanks,
> >>>
> >>> Jun
> >>>
> >>> On Tue, Oct 21, 2014 at 4:06 PM, Guozhang Wang <wangg...@gmail.com>
> >>> wrote:
> >>>
> >>> > Hi Jun,
> >>> >
> >>> > Regarding 4) in your comment, after thinking it for a while I cannot
> >>> come
> >>> > up a way to it along with log compaction without adding new fields
> into
> >>> the
> >>> > current format on message set. Do you have a better way that do not
> >>> require
> >>> > protocol changes?
> >>> >
> >>> > Guozhang
> >>> >
> >>> > On Mon, Oct 20, 2014 at 9:53 AM, Guozhang Wang <wangg...@gmail.com>
> >>> wrote:
> >>> >
> >>> > > I have updated the wiki page incorporating received comments. We
> can
> >>> > > discuss some more details on:
> >>> > >
> >>> > > 1. How we want to do audit? Whether we want to have in-built
> auditing
> >>> on
> >>> > > brokers or even MMs or use  an audit consumer to fetch all messages
> >>> from
> >>> > > just brokers.
> >>> > >
> >>> > > 2. How we can avoid de-/re-compression on brokers and MMs with log
> >>> > > compaction turned on.
> >>> > >
> >>> > > 3. How we can resolve unclean leader election resulted data
> >>> inconsistency
> >>> > > with control messages.
> >>> > >
> >>> > > Guozhang
> >>> > >
> >>> > > On Sun, Oct 19, 2014 at 11:41 PM, Guozhang Wang <
> wangg...@gmail.com>
> >>> > > wrote:
> >>> > >
> >>> > >> Thanks for the detailed comments Jun! Some replies inlined.
> >>> > >>
> >>> > >> On Sun, Oct 19, 2014 at 7:42 PM, Jun Rao <jun...@gmail.com>
> wrote:
> >>> > >>
> >>> > >>> Hi, Guozhang,
> >>> > >>>
> >>> > >>> Thanks for the writeup.
> >>> > >>>
> >>> > >>> A few high level comments.
> >>> > >>>
> >>> > >>> 1. Associating (versioned) schemas to a topic can be a good thing
> >>> > >>> overall.
> >>> > >>> Yes, this could add a bit more management overhead in Kafka.
> >>> However,
> >>> > it
> >>> > >>> makes sure that the data format contract between a producer and a
> >>> > >>> consumer
> >>> > >>> is kept and managed in a central place, instead of in the
> >>> application.
> >>> > >>> The
> >>> > >>> latter is probably easier to start with, but is likely to be
> >>> brittle in
> >>> > >>> the
> >>> > >>> long run.
> >>> > >>>
> >>> > >>
> >>> > >> I am actually not proposing to not support associated versioned
> >>> schemas
> >>> > >> for topics, but to not let some core Kafka functionalities like
> >>> auditing
> >>> > >> being depend on schemas. I think this alone can separate the
> schema
> >>> > >> management from Kafka piping management (i.e. making sure every
> >>> single
> >>> > >> message is delivered, and within some latency, etc). Adding
> >>> additional
> >>> > >> auditing info into an existing schema will force Kafka to be
> aware of
> >>> > the
> >>> > >> schema systems (Avro, JSON, etc).
> >>> > >>
> >>> > >>
> >>> > >>>
> >>> > >>> 2. Auditing can be a general feature that's useful for many
> >>> > applications.
> >>> > >>> Such a feature can be implemented by extending the low level
> message
> >>> > >>> format
> >>> > >>> with a header. However, it can also be added as part of the
> schema
> >>> > >>> management. For example, you can imagine a type of audited schema
> >>> that
> >>> > >>> adds
> >>> > >>> additional auditing info to an existing schema automatically.
> >>> > Performance
> >>> > >>> wise, it probably doesn't make a big difference whether the
> auditing
> >>> > info
> >>> > >>> is added in the message header or the schema header.
> >>> > >>>
> >>> > >>>
> >>> > >> See replies above.
> >>> > >>
> >>> > >>
> >>> > >>> 3. We talked about avoiding the overhead of decompressing in both
> >>> the
> >>> > >>> broker and the mirror maker. We probably need to think through
> how
> >>> this
> >>> > >>> works with auditing. In the more general case where you want to
> >>> audit
> >>> > >>> every
> >>> > >>> message, one has to do the decompression to get the individual
> >>> message,
> >>> > >>> independent of how the auditing info is stored. This means that
> if
> >>> we
> >>> > >>> want
> >>> > >>> to audit the broker directly or the consumer in mirror maker, we
> >>> have
> >>> > to
> >>> > >>> pay the decompression cost anyway. Similarly, if we want to
> extend
> >>> > mirror
> >>> > >>> maker to support some customized filtering/transformation logic,
> we
> >>> > also
> >>> > >>> have to pay the decompression cost.
> >>> > >>>
> >>> > >>>
> >>> > >> I see your point. For that I would prefer to have a MM
> implementation
> >>> > >> that is able to do de-compress / re-compress ONLY if required, for
> >>> > example
> >>> > >> by auditing, etc. I agree that we have not thought through
> whether we
> >>> > >> should enable auditing on MM, and if yes how to do that, and we
> could
> >>> > >> discuss about that in a different thread. Overall, this proposal
> is
> >>> not
> >>> > >> just for tackling de-compression on MM but about the feasibility
> of
> >>> > >> extending Kafka message header for system properties / app
> >>> properties.
> >>> > >>
> >>> > >>
> >>> > >>> Some low level comments.
> >>> > >>>
> >>> > >>> 4. Broker offset reassignment (kafka-527):  This probably can be
> >>> done
> >>> > >>> with
> >>> > >>> just a format change on the compressed message set.
> >>> > >>>
> >>> > >>> That is true. As I mentioned in the wiki each of the problems
> may be
> >>> > >> resolvable separately but I am thinking about a general way to get
> >>> all
> >>> > of
> >>> > >> them.
> >>> > >>
> >>> > >>
> >>> > >>> 5. MirrorMaker refactoring: We probably can think through how
> >>> general
> >>> > we
> >>> > >>> want mirror maker to be. If we want to it to be more general, we
> >>> likely
> >>> > >>> need to decompress every message just like in a normal consumer.
> >>> There
> >>> > >>> will
> >>> > >>> definitely be overhead. However, as long as mirror maker is made
> >>> > >>> scalable,
> >>> > >>> we can overcome the overhead by just running more instances on
> more
> >>> > >>> hardware resources. As for the proposed message format change, we
> >>> > >>> probably
> >>> > >>> need to think through it a bit more. The honor-ship flag seems a
> bit
> >>> > >>> hacky
> >>> > >>> to me.
> >>> > >>>
> >>> > >>>
> >>> > >> Replied as part of 3). Sure we can discuss more about that, will
> >>> update
> >>> > >> the wiki for collected comments.
> >>> > >>
> >>> > >>
> >>> > >>> 6. Adding a timestamp in each message can be a useful thing. It
> (1)
> >>> > >>> allows
> >>> > >>> log segments to be rolled more accurately; (2) allows finding an
> >>> offset
> >>> > >>> for
> >>> > >>> a particular timestamp more accurately. I am thinking that the
> >>> > timestamp
> >>> > >>> in
> >>> > >>> the message should probably be the time when the leader receives
> the
> >>> > >>> message. Followers preserve the timestamp set by leader. To avoid
> >>> time
> >>> > >>> going back during leader change, the leader can probably set the
> >>> > >>> timestamp
> >>> > >>> to be the  max of current time and the timestamp of the last
> >>> message,
> >>> > if
> >>> > >>> present. That timestamp can potentially be added to the index
> file
> >>> to
> >>> > >>> answer offsetBeforeTimestamp queries more efficiently.
> >>> > >>>
> >>> > >>>
> >>> > >> Agreed.
> >>> > >>
> >>> > >>
> >>> > >>> 7. Log compaction: It seems that you are suggesting an
> improvement
> >>> to
> >>> > >>> compact the active segment as well. This can be tricky and we
> need
> >>> to
> >>> > >>> figure out the details on how to do this. This improvement seems
> to
> >>> be
> >>> > >>> orthogonal to the message format change though.
> >>> > >>>
> >>> > >>>
> >>> > >> I think the improvements is more effective with the timestamps as
> in
> >>> 6),
> >>> > >> we can discuss more about this.
> >>> > >>
> >>> > >>
> >>> > >>> 8. Data inconsistency from unclean election: I am not sure if we
> >>> need
> >>> > to
> >>> > >>> add a controlled message to the log during leadership change. The
> >>> > <leader
> >>> > >>> generation, starting offset> map can be maintained in a separate
> >>> > >>> checkpoint
> >>> > >>> file. The follower just need to get that map from the leader
> during
> >>> > >>> startup.
> >>> > >>>
> >>> > >>> What I was proposing is an alternative solution given that we
> have
> >>> this
> >>> > >> message header enhancement; with this we do not need to add
> another
> >>> > logic
> >>> > >> for leadership map and checkpoint file, but just the logic on
> >>> > >> replica-manager to handle this extra controlled message and
> >>> remembering
> >>> > the
> >>> > >> current leader epoch instead of a map.
> >>> > >>
> >>> > >>
> >>> > >>> Thanks,
> >>> > >>>
> >>> > >>> Jun
> >>> > >>>
> >>> > >>> On Fri, Oct 10, 2014 at 5:33 PM, Guozhang Wang <
> wangg...@gmail.com>
> >>> > >>> wrote:
> >>> > >>>
> >>> > >>> > Hello all,
> >>> > >>> >
> >>> > >>> > I put some thoughts on enhancing our current message metadata
> >>> format
> >>> > to
> >>> > >>> > solve a bunch of existing issues:
> >>> > >>> >
> >>> > >>> >
> >>> > >>> >
> >>> > >>>
> >>> >
> >>>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata
> >>> > >>> >
> >>> > >>> > This wiki page is for kicking off some discussions about the
> >>> > >>> feasibility of
> >>> > >>> > adding more info into the message header, and if possible how
> we
> >>> > would
> >>> > >>> add
> >>> > >>> > them.
> >>> > >>> >
> >>> > >>> > -- Guozhang
> >>> > >>> >
> >>> > >>>
> >>> > >>
> >>> > >>
> >>> > >>
> >>> > >> --
> >>> > >> -- Guozhang
> >>> > >>
> >>> > >
> >>> > >
> >>> > >
> >>> > > --
> >>> > > -- Guozhang
> >>> > >
> >>> >
> >>> >
> >>> >
> >>> > --
> >>> > -- Guozhang
> >>> >
> >>>
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
> >
> > --
> > -- Guozhang
>



-- 
-- Guozhang

Reply via email to