Nice finding on the CRC class. It will be great to switch to that at some
point.
On exposing the CRC - I think we are overthinking the problem. CRC over the
entire record makes sense for the durability check and having multiple CRCs
is a bad idea.
On Fri, Jan 29, 2016 at 4:47 PM, Jay Kreps wrote
The rationale for the CRC covering the whole record was to check corruption
in the full record contents as corruption there will equally prevent
someone trying to consume the data. I think you could argue either way but
let's definitely not end up with two different CRC calculations, that would
jus
Hi Anna,
I think there is value if CRC for only user bytes can be used. This will
help when we have future protocol updates. Otherwise any protocol migration
might break auditing if it largely relies on CRC including system bytes.
I did some test to understand the performance overhead of having a
Joel, thanks for your feedback. I updated the wiki based on your comments
about the wiki writeup.
On Fri, Jan 29, 2016 at 11:50 AM, Anna Povzner wrote:
> Becket,
>
> In your scenario with one message from producer A and one message from
> producer B, those are two different messages, and they s
Becket,
In your scenario with one message from producer A and one message from
producer B, those are two different messages, and they should be tracked as
two different messages. So I would argue for using record CRC -- CRC that
is actually used by the system + it will not require computing a diff
Neha,
CRC is definitely an important type of metadata of a record. I am not
arguing about that. But I think we should distinguish between two types of
checksum here, 1) the checksum of user data. and 2) the checksum including
system appended bytes.
I completely agree that (1) is good to add. But
Responding to some of the earlier comments in the thread:
@Jay/@Neha,
I think any one of onCommit/onAppend/onArrival would work for the concrete
use-case that I had outlined. I think onArrival is additionally useful for
custom validation - i.e., reject the message and do not append if it
violates
Becket,
Is your concern the presence of CRC in the RecordMetadata or do you want to
brainstorm how CRC can be used for auditing? I think we shouldn't try to
think about the various ways that people can do monitoring using
interceptors and the metadata we provide. The entire point of having
pluggab
Anna,
It is still not clear to me why we should expose CRC to end user.
Followings are my confusions.
1. Isn't the TopicPartition + Offset already uniquely identified a message?
It seems better than CRC no matter from summary point of view or auditing
point of view.
2. Currently CRC only has 4 b
On a second thought, yes, I think we should expose record size that
represents application bytes. This is Becket's option #1.
I updated the KIP wiki with new fields in RecordMetadata and ConsumerRecord.
I would like to start a voting thread tomorrow if there are no objections
or more things to di
Regarding record size as bytes sent over the wire, my concern is that it is
almost impossible to calculate per-record. We could do as: 1) compressed
bytes / number of records in a compressed message, as Todd mentioned; or 2)
or same as #1 but take it proportional to uncompressed record size vs.
tot
It may be difficult (or nearly impossible) to get actual compressed bytes
for a message from a compressed batch, but I do think it’s useful
information to have available for the very reason noted, bandwidth
consumed. Does it make sense to have an interceptor at the batch level that
can provide this
Hi Becket,
It will be up to the interceptor to implement their audit or monitoring
strategy. I would also imagine there is more than one good way to do audit.
So, I agree that some of the interceptors may not use CRC, and we will not
require it. The question is now whether intercepting CRCs is nee
Anna,
Using CRC to do end2end auditing might be very costly because you will need
to collect all the CRC from both producer and consumer. And it is based on
the assumption that broker does not modify the record.
Can you shed some idea on how end to end auditing will be using the CRC
before we deci
Hi Becket,
The use-case for CRC is end-to-end audit, rather than checking whether a
single message is corrupt or not.
Regarding record size, I was thinking to extract record size from Record.
That will include header overhead as well. I think total record size will
tell users how much bandwidth t
I am +1 on #1.2 and #3.
#2: Regarding CRC, I am not sure if users care about CRC. is there any
specific use case? Currently we validate messages by calling ensureValid()
to verify the checksum and throw exception if it does not match.
Message size would be useful. We can add that to ConsumerRecor
Anna,
Thanks for being diligent.
+1 on #1.2 and sounds good on #3. I recommend adding checksum and size
fields to RecordMetadata and ConsumerRecord instead of exposing metadata
piecemeal in the interceptor APIs.
Thanks,
Neha
On Wed, Jan 27, 2016 at 4:10 PM, Anna Povzner wrote:
> Hi All,
>
> T
Hi All,
The KIP wiki page is now up-to-date with the scope we have agreed on:
Producer and Consumer Interceptors with a minimal set of mutable API that
are not dependent on producer and consumer internal implementation.
I have few more API details that I would like to bring attention to or/and
di
Hi Mayuresh,
I see why you would want to check for messages left in the
RecordAccumulator. However, I don't think this will completely solve the
problem. Messages could be in-flight somewhere else, like in the socket, or
there maybe in-flight messages on the consumer side of the MirrorMaker. So,
i
Calling producer.flush(), flushes all the data. So this is OK. But when you
are running Mirror maker, I am not sure there is a way to flush() from
outside.
Thanks,
Mayuresh
On Wed, Jan 27, 2016 at 11:08 AM, Becket Qin wrote:
> Mayuresh,
>
> Regarding your use case about mirror maker. Is it go
Mayuresh,
Regarding your use case about mirror maker. Is it good enough as long as we
know there is no message for the topic in the producer anymore? If that is
the case, call producer.flush() is sufficient.
Thanks,
Jiangjie (Becket) Qin
On Tue, Jan 26, 2016 at 6:18 PM, Mayuresh Gharat wrote:
Hi Anna,
Thanks a lot for summarizing the discussion on this kip.
It LGTM.
This is really nice :
We decided not to add any callbacks to producer and consumer
interceptors that will depend on internal implementation as part of this
KIP.
*However, it is possible to add them later as part of another
This looks good. As noted, having one mutable interceptor on each side
allows for the use cases we can envision right now, and I think that’s
going to provide a great deal of opportunity for implementing things like
audit, especially within a multi-tenant environment. Looking forward to
getting thi
Hi All,
Here is meeting notes from today’s KIP meeting:
1. We agreed to keep the scope of this KIP to be producer and consumer
interceptors only. Broker-side interceptor will be added later as a
separate KIP. The reasons were already mentioned in this thread, but the
summary is:
* Broker interce
Hi,
I won't be able to make it to KIP hangout due to conflict.
Anna, here is the use case where knowing if there are messages in the
RecordAccumulator left to be sent to the kafka cluster for a topic is
useful.
1) Consider a pipeline :
A ---> Mirror-maker -> B
2) We have a topic T in cluste
Thanks Ismael and Todd for your feedback!
I agree about coming up with lean, but useful interfaces that will be easy
to extend later.
When we discuss the minimal set of producer and consumer interceptor API in
today’s KIP meeting (discussion item #2 in my previous email), lets compare
two options
Finally got a chance to take a look at this. I won’t be able to make the
KIP meeting due to a conflict.
I’m somewhat disappointed in this proposal. I think that the explicit
exclusion of modification of the messages is short-sighted, and not
accounting for it now is going to bite us later. Jay, ar
Hi Anna and Neha,
I think it makes a lot of sense to try and keep the interface lean and to
add more methods later when/if there is a need. What is the current
thinking with regards to compatibility when/if we add new methods? A few
options come to mind:
1. Change the interface to an abstract cla
Thanks everyone for your comments so far!
Since we have a KIP meeting tomorrow, here is the list of items I propose
to discuss in the meeting based on everyone's comments.
*1. Should we add broker-side interceptor to this KIP?*
Pros: Improves resolution of monitoring
Cons: Riskier and bigger chan
Anna,
I'm also in favor of including just the APIs for which we have a clear use
case. If more use cases for finer monitoring show up in the future, we can
always update the interface. Would you please highlight in the KIP the APIs
that you think we have an immediate use for?
Joel,
Broker-side m
Hey Joel,
What is the interface you are thinking of? Something like this:
onAppend(String topic, int partition, Records records, long time)
?
One challenge right now is that we are still using the old
Message/MessageSet classes on the broker which I'm not sure if we'd want to
support over the
Hi Joel,
Yes, we considered interceptors on broker side -- they make a lot of sense
and will add more detail to monitoring. We propose to do it later in a
separate KIP because: 1) broker interceptors are more risky, since brokers
are more sensitive to overheads; 2) Just producer and consumer inter
I think there is some tension here between exposing lots of hooks that you
can implement and giving a simple, supportable interface.
Personally I would actually argue for not adding an onDequeued but actually
removing both the onEnqueued() and onReceived().
My argument is that onSend and onAcknow
Hi Mayuresh,
Could you please describe a use-case for checking if there are messages in
RecordAccumulator for a particular topic? In this KIP, we are proposing
interceptor callbacks that will notify things about the message, no query
API. It is possible to add certain info in callback API, like 'm
Hi anna,
Agreed, its difficult to match which call of onDequeued() corresponds to
onEnqueued(), since ProducerRecord object is not available anymore at the
time the batch is dequeued.
We currently always update the lastAppendTime whenever we append a record
to a batch. We can use that and include
Hi Mayuresh,
I agree that onDequeue() callback in ProducerInterceptor would be useful.
The API of this callback needs discussion. I propose the following API:
public void onDequeued(TopicPartition tp, long appendTime, int attempts,
bool moreQueued);
This callback will be called for every record
I'm definitely in favor of having such hooks in the produce/consume
life-cycle. Not sure if people remember this but in Kafka 0.7 this was
pretty much how it was:
https://github.com/apache/kafka/blob/0.7/core/src/main/scala/kafka/producer/async/CallbackHandler.scala
i.e., we had something similar t
Nice KIP. Excellent idea.
Was just thinking if we can add onDequed() to the ProducerIterceptor
interface. Since we have the onEnqueued(), it will help the client or the
tools to know how much time the message spent in the RecordAccumulator.
Also an API to check if there are any messages left for a
Great idea. I’ve been talking about this for 2 years, and I’m glad someone
is finally picking it up. Will take a look at the KIP at some point shortly.
-Todd
On Mon, Jan 25, 2016 at 11:24 AM, Jay Kreps wrote:
> Hey Becket,
>
> Yeah this is really similar to the callback. The difference is real
Hey Becket,
Yeah this is really similar to the callback. The difference is really in
who sets the behavior. The idea of the interceptor is that it doesn't
require any code change in apps so you can globally add behavior to your
Kafka usage without changing app code. Whereas the callback is added b
This could be a useful feature. And I think there are some use cases to
mutate the data like rejected alternative one mentioned.
I am wondering if there is functional overlapping between
ProducerInterceptor.onAcknowledgement() and the producer callback? I can
see that the Callback could be a per r
James,
That is one of the many monitoring use cases for the interceptor interface.
Thanks,
Neha
On Fri, Jan 22, 2016 at 6:18 PM, James Cheng wrote:
> Anna,
>
> I'm trying to understand a concrete use case. It sounds like producer
> interceptors could be used to implement part of LinkedIn's Kaf
Anna,
I'm trying to understand a concrete use case. It sounds like producer
interceptors could be used to implement part of LinkedIn's Kafak Audit tool?
https://engineering.linkedin.com/kafka/running-kafka-scale
Part of that is done by a wrapper library around the kafka producer that keeps
a c
Hi,
I just created a KIP-42 for adding producer and consumer interceptors for
intercepting messages at different points on producer and consumer.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors
Comments and suggestions are welcome!
Thanks,
Anna
44 matches
Mail list logo