Anna, Thanks for being diligent.
+1 on #1.2 and sounds good on #3. I recommend adding checksum and size fields to RecordMetadata and ConsumerRecord instead of exposing metadata piecemeal in the interceptor APIs. Thanks, Neha On Wed, Jan 27, 2016 at 4:10 PM, Anna Povzner <a...@confluent.io> wrote: > Hi All, > > The KIP wiki page is now up-to-date with the scope we have agreed on: > Producer and Consumer Interceptors with a minimal set of mutable API that > are not dependent on producer and consumer internal implementation. > > I have few more API details that I would like to bring attention to or/and > discuss: > > 1. Handling exceptions > > Exceptions can provide an additional level of control. For example, we can > filter messages on consumer side or stop messages on producer if they don’t > have the right field. > > I see two options: > 1.1. For callbacks that can mutate records (onSend and onConsume), > propagate exceptions through the original calls (KafkaProducer.send() and > KafkaConsumer.poll() respectively). For other callbacks, catch exception, > log, and ignore. > 1.2. Catch exceptions from all the interceptor callbacks and ignore. > > The issue with 1.1. is that it effectively changes KafkaProducer.send() and > KafkaConsumer.poll() API, since now they may throw exceptions that are not > documented in KafkaProducer/Consumer API. Another option is to allow to > propagate some exceptions, and ignore others. > > I think our use-cases do not require propagating exceptions. So, I propose > option 1.2. Unless someone has suggestion/use-cases for propagating > exceptions. Please let me know. > > 2. Intercepting record CRC and record size > > Since we decided not to add any intermediate callbacks (such as onEnqueue > or onReceive) to interceptors, I think it is still valuable to intercept > record CRC and record size in bytes for monitoring and audit use-cases. > > I propose to add checksum and size fields to RecordMetadata and > ConsumerRecord. Another option would be to add them as parameters in > onAcknowledgement() and onConsume() callbacks. > > 3. Callbacks that allow to modify records look as follows: > ProducerRecord<K, V> onSend(ProducerRecord<K, V> record); > ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records); > > This means that interceptors can potentially modify topic/partition in > ProducerRecord and topic/partition/offset in ConsumerRecord. I propose that > it is up to the interceptor implementation to ensure that topic/partition, > etc is correct. KafkaProducer.send() will use topic, partition, key, and > value from ProducerRecord returned from the onSend(). Similarly, > ConsumerRecords returned from KafkaConsumer.poll() would be the ones > returned from the interceptor. > > > > Please let me know if you have any suggestions or objections to the above. > > Thanks, > Anna > > > On Wed, Jan 27, 2016 at 2:56 PM, Anna Povzner <a...@confluent.io> wrote: > > > Hi Mayuresh, > > > > I see why you would want to check for messages left in the > > RecordAccumulator. However, I don't think this will completely solve the > > problem. Messages could be in-flight somewhere else, like in the socket, > or > > there maybe in-flight messages on the consumer side of the MirrorMaker. > So, > > if we go the route of checking whether there are any in-flight messages > for > > topic deletion use-case, maybe it is better count them with onSend() and > > onAcknowledge() -- whether all messages sent were acknowledged. I also > > think that it would be better to solve this without interceptors, such as > > fix error handling in this scenario. However, I do not have any good > > proposal right now, so these are just general thoughts. > > > > Thanks, > > Anna > > > > > > > > On Wed, Jan 27, 2016 at 11:18 AM, Mayuresh Gharat < > > gharatmayures...@gmail.com> wrote: > > > >> Calling producer.flush(), flushes all the data. So this is OK. But when > >> you > >> are running Mirror maker, I am not sure there is a way to flush() from > >> outside. > >> > >> > >> Thanks, > >> > >> Mayuresh > >> > >> On Wed, Jan 27, 2016 at 11:08 AM, Becket Qin <becket....@gmail.com> > >> wrote: > >> > >> > Mayuresh, > >> > > >> > Regarding your use case about mirror maker. Is it good enough as long > >> as we > >> > know there is no message for the topic in the producer anymore? If > that > >> is > >> > the case, call producer.flush() is sufficient. > >> > > >> > Thanks, > >> > > >> > Jiangjie (Becket) Qin > >> > > >> > On Tue, Jan 26, 2016 at 6:18 PM, Mayuresh Gharat < > >> > gharatmayures...@gmail.com > >> > > wrote: > >> > > >> > > Hi Anna, > >> > > > >> > > Thanks a lot for summarizing the discussion on this kip. > >> > > > >> > > It LGTM. > >> > > This is really nice : > >> > > We decided not to add any callbacks to producer and consumer > >> > > interceptors that will depend on internal implementation as part of > >> this > >> > > KIP. > >> > > *However, it is possible to add them later as part of another KIP if > >> > there > >> > > are good use-cases.* > >> > > > >> > > Do you agree with the use case I explained earlier for knowing the > >> number > >> > > of records left in the RecordAccumulator for a particular topic. It > >> might > >> > > be orthogonal to this KIP, but will be helpful. What do you think? > >> > > > >> > > Thanks, > >> > > > >> > > Mayuresh > >> > > > >> > > > >> > > On Tue, Jan 26, 2016 at 2:46 PM, Todd Palino <tpal...@gmail.com> > >> wrote: > >> > > > >> > > > This looks good. As noted, having one mutable interceptor on each > >> side > >> > > > allows for the use cases we can envision right now, and I think > >> that’s > >> > > > going to provide a great deal of opportunity for implementing > things > >> > like > >> > > > audit, especially within a multi-tenant environment. Looking > >> forward to > >> > > > getting this available in the clients. > >> > > > > >> > > > Thanks! > >> > > > > >> > > > -Todd > >> > > > > >> > > > > >> > > > On Tue, Jan 26, 2016 at 2:36 PM, Anna Povzner <a...@confluent.io> > >> > wrote: > >> > > > > >> > > > > Hi All, > >> > > > > > >> > > > > Here is meeting notes from today’s KIP meeting: > >> > > > > > >> > > > > 1. We agreed to keep the scope of this KIP to be producer and > >> > consumer > >> > > > > interceptors only. Broker-side interceptor will be added later > as > >> a > >> > > > > separate KIP. The reasons were already mentioned in this thread, > >> but > >> > > the > >> > > > > summary is: > >> > > > > * Broker interceptor is riskier and requires careful > >> consideration > >> > > about > >> > > > > overheads, whether to intercept leaders vs. leaders/replicas, > >> what to > >> > > do > >> > > > on > >> > > > > leader failover and so on. > >> > > > > * Broker interceptors increase monitoring resolution, but not > >> > > including > >> > > > it > >> > > > > in this KIP does not reduce usefulness of producer and consumer > >> > > > > interceptors that enable end-to-end monitoring > >> > > > > > >> > > > > 2. We agreed to scope ProducerInterceptor and > ConsumerInterceptor > >> > > > callbacks > >> > > > > to minimal set of mutable API that are not dependent on producer > >> and > >> > > > > consumer internal implementation. > >> > > > > > >> > > > > ProducerInterceptor: > >> > > > > *ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);* > >> > > > > *void onAcknowledgement(RecordMetadata metadata, Exception > >> > exception);* > >> > > > > > >> > > > > ConsumerInterceptor: > >> > > > > *ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> > records);* > >> > > > > *void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);* > >> > > > > > >> > > > > We will allow interceptors to modify ProducerRecord on producer > >> side, > >> > > and > >> > > > > modify ConsumerRecords on consumer side. This will support > >> end-to-end > >> > > > > monitoring and auditing and support the ability to add metadata > >> for a > >> > > > > message. This will support Todd’s Auditing and Routing > use-cases. > >> > > > > > >> > > > > We did not find any use-case for modifying records in > onConsume() > >> > > > callback, > >> > > > > but decided to enable modification of consumer records for > >> symmetry > >> > > with > >> > > > > onSend(). > >> > > > > > >> > > > > 3. We agreed to ensure compatibility when/if we add new methods > to > >> > > > > ProducerInterceptor and ConsumerInterceptor by using default > >> methods > >> > > with > >> > > > > an empty implementation. Ok to assume Java 8. (This is Ismael’s > >> > method > >> > > > #2). > >> > > > > > >> > > > > 4. We decided not to add any callbacks to producer and consumer > >> > > > > interceptors that will depend on internal implementation as part > >> of > >> > > this > >> > > > > KIP. However, it is possible to add them later as part of > another > >> KIP > >> > > if > >> > > > > there are good use-cases. > >> > > > > > >> > > > > *Reasoning.* We did not have concrete use-cases that justified > >> more > >> > > > methods > >> > > > > at this point. Some of the use-cases were for more fine-grain > >> latency > >> > > > > collection, which could be done with Kafka Metrics. Another > >> use-case > >> > > was > >> > > > > encryption. However, there are several design options for > >> encryption. > >> > > One > >> > > > > is to do per-record encryption which would require adding > >> > > > > ProducerInterceptor.onEnqueued() and > >> ConsumerInterceptor.onReceive(). > >> > > One > >> > > > > could argue that in that case encryption could be done by > adding a > >> > > custom > >> > > > > serializer/deserializer. Another option is to do encryption > after > >> > > message > >> > > > > gets compressed, but there are issues that arise regarding > broker > >> > doing > >> > > > > re-compression. We decided that it is better to have that > >> discussion > >> > > in a > >> > > > > separate KIP and decide that this is something we want to do > with > >> > > > > interceptors or by other means. > >> > > > > > >> > > > > > >> > > > > Todd, Mayuresh and others who missed the KIP meeting, please let > >> me > >> > > know > >> > > > > your thoughts on the scope we agreed on during the meeting. > >> > > > > > >> > > > > I will update the KIP proposal with the current decision by end > of > >> > > today. > >> > > > > > >> > > > > Thanks, > >> > > > > Anna > >> > > > > > >> > > > > > >> > > > > On Tue, Jan 26, 2016 at 11:41 AM, Mayuresh Gharat < > >> > > > > gharatmayures...@gmail.com> wrote: > >> > > > > > >> > > > > > Hi, > >> > > > > > > >> > > > > > I won't be able to make it to KIP hangout due to conflict. > >> > > > > > > >> > > > > > Anna, here is the use case where knowing if there are messages > >> in > >> > the > >> > > > > > RecordAccumulator left to be sent to the kafka cluster for a > >> topic > >> > is > >> > > > > > useful. > >> > > > > > > >> > > > > > 1) Consider a pipeline : > >> > > > > > A ---> Mirror-maker -----> B > >> > > > > > > >> > > > > > 2) We have a topic T in cluster A mirrored to cluster B. > >> > > > > > > >> > > > > > 3) Now if we delete topic T in A and immediately proceed to > >> delete > >> > > the > >> > > > > > topic in cluster B, some of the the Mirror-maker machines die > >> > because > >> > > > > > atleast one of the batches in RecordAccumulator for topic T > >> fail to > >> > > be > >> > > > > > produced to cluster B. We have seen this happening in our > >> clusters. > >> > > > > > > >> > > > > > > >> > > > > > If we know that there are no more messages left in the > >> > > > RecordAccumulator > >> > > > > to > >> > > > > > be produced to cluster B, we can safely delete the topic in > >> > cluster B > >> > > > > > without disturbing the pipeline. > >> > > > > > > >> > > > > > Thanks, > >> > > > > > > >> > > > > > Mayuresh > >> > > > > > > >> > > > > > On Tue, Jan 26, 2016 at 10:31 AM, Anna Povzner < > >> a...@confluent.io> > >> > > > > wrote: > >> > > > > > > >> > > > > > > Thanks Ismael and Todd for your feedback! > >> > > > > > > > >> > > > > > > I agree about coming up with lean, but useful interfaces > that > >> > will > >> > > be > >> > > > > > easy > >> > > > > > > to extend later. > >> > > > > > > > >> > > > > > > When we discuss the minimal set of producer and consumer > >> > > interceptor > >> > > > > API > >> > > > > > in > >> > > > > > > today’s KIP meeting (discussion item #2 in my previous > email), > >> > lets > >> > > > > > compare > >> > > > > > > two options: > >> > > > > > > > >> > > > > > > *1. Minimal set of immutable API for producer and consumer > >> > > > > interceptors* > >> > > > > > > > >> > > > > > > ProducerInterceptor: > >> > > > > > > public void onSend(ProducerRecord<K, V> record); > >> > > > > > > public void onAcknowledgement(RecordMetadata metadata, > >> Exception > >> > > > > > > exception); > >> > > > > > > > >> > > > > > > ConsumerInterceptor: > >> > > > > > > public void onConsume(ConsumerRecords<K, V> records); > >> > > > > > > public void onCommit(Map<TopicPartition, OffsetAndMetadata> > >> > > offsets); > >> > > > > > > > >> > > > > > > Use-cases: > >> > > > > > > — end-to-end monitoring; custom tracing and logging > >> > > > > > > > >> > > > > > > > >> > > > > > > *2. Minimal set of mutable API for producer and consumer > >> > > > interceptors* > >> > > > > > > > >> > > > > > > ProducerInterceptor: > >> > > > > > > ProducerRecord<K, V> onSend(ProducerRecord<K, V> record); > >> > > > > > > void onAcknowledgement(RecordMetadata metadata, Exception > >> > > exception); > >> > > > > > > > >> > > > > > > ConsumerInterceptor: > >> > > > > > > void onConsume(ConsumerRecords<K, V> records); > >> > > > > > > void onCommit(Map<TopicPartition, OffsetAndMetadata> > offsets); > >> > > > > > > > >> > > > > > > Additional use-cases to #1: > >> > > > > > > — Ability to add metadata to a message or fill in standard > >> fields > >> > > for > >> > > > > > audit > >> > > > > > > and routing. > >> > > > > > > > >> > > > > > > Implications > >> > > > > > > — Partition assignment will be done based on modified > >> key/value > >> > > > instead > >> > > > > > of > >> > > > > > > original key/value. If key/value transformation is not > >> consistent > >> > > > (same > >> > > > > > key > >> > > > > > > and value does not mutate to the same, but modified, > >> key/value), > >> > > then > >> > > > > log > >> > > > > > > compaction would not work. However, audit and routing > >> use-cases > >> > > from > >> > > > > Todd > >> > > > > > > will likely do consistent transformation. > >> > > > > > > > >> > > > > > > > >> > > > > > > *Additional callbacks (discussion item #3 in my previous > >> email):* > >> > > > > > > > >> > > > > > > If we want to support encryption, we would want to be able > to > >> > > modify > >> > > > > > > serialized key/value, rather than key and value objects. > This > >> > will > >> > > > add > >> > > > > > the > >> > > > > > > following API to producer and consumer interceptors: > >> > > > > > > > >> > > > > > > ProducerInterceptor: > >> > > > > > > SerializedKeyValue onEnqueued(TopicPartition tp, > >> > ProducerRecord<K, > >> > > V> > >> > > > > > > record, SerializedKeyValue serializedKeyValue); > >> > > > > > > > >> > > > > > > ConsumerInterceptor: > >> > > > > > > SerializedKeyValue onReceive(TopicPartition tp, > >> > SerializedKeyValue > >> > > > > > > serializedKeyValue); > >> > > > > > > > >> > > > > > > > >> > > > > > > I am leaning towards implementing the minimal set of > >> immutable or > >> > > > > mutable > >> > > > > > > interfaces, making sure that we have a compatibility plan > that > >> > > allows > >> > > > > us > >> > > > > > to > >> > > > > > > add more callbacks in the future (per Ismael comment), and > add > >> > more > >> > > > > APIs > >> > > > > > > later. E.g., for encryption use-case, there could be an > >> argument > >> > in > >> > > > > doing > >> > > > > > > encryption after message compression vs. per-record > encryption > >> > that > >> > > > > could > >> > > > > > > be done using the above additional API. There is also more > >> > > > implications > >> > > > > > for > >> > > > > > > every API that modifies records: modifying serialized > >> key/value > >> > > will > >> > > > > > again > >> > > > > > > impact partition assignment (we will likely do that after > >> > partition > >> > > > > > > assignment), which may impact log compaction and mirror > maker > >> > > > > > partitioning. > >> > > > > > > > >> > > > > > > > >> > > > > > > Thanks, > >> > > > > > > Anna > >> > > > > > > > >> > > > > > > On Tue, Jan 26, 2016 at 7:26 AM, Todd Palino < > >> tpal...@gmail.com> > >> > > > > wrote: > >> > > > > > > > >> > > > > > > > Finally got a chance to take a look at this. I won’t be > >> able to > >> > > > make > >> > > > > > the > >> > > > > > > > KIP meeting due to a conflict. > >> > > > > > > > > >> > > > > > > > I’m somewhat disappointed in this proposal. I think that > the > >> > > > explicit > >> > > > > > > > exclusion of modification of the messages is > short-sighted, > >> and > >> > > not > >> > > > > > > > accounting for it now is going to bite us later. Jay, > aren’t > >> > you > >> > > > the > >> > > > > > one > >> > > > > > > > railing against public interfaces and how difficult they > >> are to > >> > > > work > >> > > > > > with > >> > > > > > > > when you don’t get them right? The “simple” change to one > of > >> > > these > >> > > > > > > > interfaces to make it able to return a record is going to > >> be a > >> > > > > > > significant > >> > > > > > > > change and is going to require all clients to rewrite > their > >> > > > > > interceptors. > >> > > > > > > > If we’re not willing to put the time to think through > >> > > manipulation > >> > > > > now, > >> > > > > > > > then this KIP should be shelved until we are. Implementing > >> > > > something > >> > > > > > > > halfway is going to be worse than taking a little longer. > In > >> > > > > addition, > >> > > > > > I > >> > > > > > > > don’t believe that manipulation requires anything more > than > >> > > > > > interceptors > >> > > > > > > to > >> > > > > > > > receive the full record, and then to return it. > >> > > > > > > > > >> > > > > > > > There are 3 use case I can think of right now without any > >> deep > >> > > > > > discussion > >> > > > > > > > that can make use of interceptors with modification: > >> > > > > > > > > >> > > > > > > > 1. Auditing. The ability to add metadata to a message for > >> > > auditing > >> > > > is > >> > > > > > > > critical. Hostname, service name, timestamps, etc. are all > >> > pieces > >> > > > of > >> > > > > > data > >> > > > > > > > that can be used on the other side of the pipeline to > >> > categorize > >> > > > > > > messages, > >> > > > > > > > determine loss and transport time, and pin down issues. > You > >> may > >> > > say > >> > > > > > that > >> > > > > > > > these things can just be part of the message schema, but > >> anyone > >> > > who > >> > > > > has > >> > > > > > > > worked with a multi-user data system (especially those who > >> have > >> > > > been > >> > > > > > > > involved with LinkedIn) know how difficult it is to > maintain > >> > > > > consistent > >> > > > > > > > message schemas and to get other people to put in fields > for > >> > your > >> > > > > use. > >> > > > > > > > > >> > > > > > > > 2. Encryption. This is probably the most obvious case for > >> > record > >> > > > > > > > manipulation on both sides. The ability to tie in end to > end > >> > > > > encryption > >> > > > > > > is > >> > > > > > > > important for data that requires external compliance (PCI, > >> > HIPAA, > >> > > > > > etc.). > >> > > > > > > > > >> > > > > > > > 3. Routing. By being able to add a bit of information > about > >> the > >> > > > > source > >> > > > > > or > >> > > > > > > > destination of a message to the metadata, you can easily > >> > > construct > >> > > > an > >> > > > > > > > intelligent mirror maker that can prevent loops. This has > >> the > >> > > > > > opportunity > >> > > > > > > > to result in significant operational savings, as you can > get > >> > rid > >> > > of > >> > > > > the > >> > > > > > > > need for tiered clusters in order to prevent loops in > >> mirroring > >> > > > > > messages. > >> > > > > > > > > >> > > > > > > > All three of these share the feature that they add > metadata > >> to > >> > > > > > messages. > >> > > > > > > > With the pushback on having arbitrary metadata as an > >> “envelope” > >> > > to > >> > > > > the > >> > > > > > > > message, this is a way to provide it and make it the > >> > > responsibility > >> > > > > of > >> > > > > > > the > >> > > > > > > > client, and not the Kafka broker and system itself. > >> > > > > > > > > >> > > > > > > > -Todd > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > On Tue, Jan 26, 2016 at 2:30 AM, Ismael Juma < > >> > ism...@juma.me.uk> > >> > > > > > wrote: > >> > > > > > > > > >> > > > > > > > > Hi Anna and Neha, > >> > > > > > > > > > >> > > > > > > > > I think it makes a lot of sense to try and keep the > >> interface > >> > > > lean > >> > > > > > and > >> > > > > > > to > >> > > > > > > > > add more methods later when/if there is a need. What is > >> the > >> > > > current > >> > > > > > > > > thinking with regards to compatibility when/if we add > new > >> > > > methods? > >> > > > > A > >> > > > > > > few > >> > > > > > > > > options come to mind: > >> > > > > > > > > > >> > > > > > > > > 1. Change the interface to an abstract class with empty > >> > > > > > implementations > >> > > > > > > > for > >> > > > > > > > > all the methods. This means that the path to adding new > >> > methods > >> > > > is > >> > > > > > > clear. > >> > > > > > > > > 2. Hope we have moved to Java 8 by the time we need to > add > >> > new > >> > > > > > methods > >> > > > > > > > and > >> > > > > > > > > use default methods with an empty implementation for any > >> new > >> > > > method > >> > > > > > > (and > >> > > > > > > > > potentially make existing methods default methods too at > >> that > >> > > > point > >> > > > > > for > >> > > > > > > > > consistency) > >> > > > > > > > > 3. Introduce a new interface that inherits from the > >> existing > >> > > > > > > Interceptor > >> > > > > > > > > interface when we need to add new methods. > >> > > > > > > > > > >> > > > > > > > > Option 1 is the easiest and it also means that > interceptor > >> > > users > >> > > > > only > >> > > > > > > > need > >> > > > > > > > > to override the methods that they are interested (more > >> useful > >> > > if > >> > > > > the > >> > > > > > > > number > >> > > > > > > > > of methods grows). The downside is that interceptor > >> > > > implementations > >> > > > > > > > cannot > >> > > > > > > > > inherit from another class (a straightforward workaround > >> is > >> > to > >> > > > make > >> > > > > > the > >> > > > > > > > > interceptor a forwarder that calls another class). Also, > >> our > >> > > > > existing > >> > > > > > > > > callbacks are interfaces, so seems a bit inconsistent. > >> > > > > > > > > > >> > > > > > > > > Option 2 may be the most appealing one as both users and > >> > > > ourselves > >> > > > > > > retain > >> > > > > > > > > flexibility. The main downside is that it relies on us > >> moving > >> > > to > >> > > > > Java > >> > > > > > > 8, > >> > > > > > > > > which may be more than a year away potentially (if we > >> support > >> > > the > >> > > > > > last > >> > > > > > > 2 > >> > > > > > > > > Java releases). > >> > > > > > > > > > >> > > > > > > > > Thoughts? > >> > > > > > > > > > >> > > > > > > > > Ismael > >> > > > > > > > > > >> > > > > > > > > On Tue, Jan 26, 2016 at 4:59 AM, Neha Narkhede < > >> > > > n...@confluent.io> > >> > > > > > > > wrote: > >> > > > > > > > > > >> > > > > > > > > > Anna, > >> > > > > > > > > > > >> > > > > > > > > > I'm also in favor of including just the APIs for which > >> we > >> > > have > >> > > > a > >> > > > > > > clear > >> > > > > > > > > use > >> > > > > > > > > > case. If more use cases for finer monitoring show up > in > >> the > >> > > > > future, > >> > > > > > > we > >> > > > > > > > > can > >> > > > > > > > > > always update the interface. Would you please > highlight > >> in > >> > > the > >> > > > > KIP > >> > > > > > > the > >> > > > > > > > > APIs > >> > > > > > > > > > that you think we have an immediate use for? > >> > > > > > > > > > > >> > > > > > > > > > Joel, > >> > > > > > > > > > > >> > > > > > > > > > Broker-side monitoring makes a lot of sense in the > long > >> > term > >> > > > > > though I > >> > > > > > > > > don't > >> > > > > > > > > > think it is a requirement for end-to-end monitoring. > >> With > >> > the > >> > > > > > > producer > >> > > > > > > > > and > >> > > > > > > > > > consumer interceptors, you have the ability to get > full > >> > > > > > > > > > publish-to-subscribe end-to-end monitoring. The broker > >> > > > > interceptor > >> > > > > > > > > > certainly improves the resolution of monitoring but it > >> is > >> > > also > >> > > > a > >> > > > > > > > riskier > >> > > > > > > > > > change. I prefer an incremental approach over a > big-bang > >> > and > >> > > > > > > recommend > >> > > > > > > > > > taking baby-steps. Let's first make sure the > >> > > producer/consumer > >> > > > > > > > > interceptors > >> > > > > > > > > > are successful. And then come back and add the broker > >> > > > interceptor > >> > > > > > > > > > carefully. > >> > > > > > > > > > > >> > > > > > > > > > Having said that, it would be great to understand your > >> > > proposal > >> > > > > for > >> > > > > > > the > >> > > > > > > > > > broker interceptor independently. We can either add an > >> > > > > interceptor > >> > > > > > > > > > on-append or on-commit. If people want to use this for > >> > > > > monitoring, > >> > > > > > > then > >> > > > > > > > > > possibly on-commit might be more useful? > >> > > > > > > > > > > >> > > > > > > > > > Thanks, > >> > > > > > > > > > Neha > >> > > > > > > > > > > >> > > > > > > > > > On Mon, Jan 25, 2016 at 6:47 PM, Jay Kreps < > >> > j...@confluent.io > >> > > > > >> > > > > > wrote: > >> > > > > > > > > > > >> > > > > > > > > > > Hey Joel, > >> > > > > > > > > > > > >> > > > > > > > > > > What is the interface you are thinking of? Something > >> like > >> > > > this: > >> > > > > > > > > > > onAppend(String topic, int partition, Records > >> > records, > >> > > > long > >> > > > > > > time) > >> > > > > > > > > > > ? > >> > > > > > > > > > > > >> > > > > > > > > > > One challenge right now is that we are still using > the > >> > old > >> > > > > > > > > > > Message/MessageSet classes on the broker which I'm > not > >> > sure > >> > > > if > >> > > > > > we'd > >> > > > > > > > > want > >> > > > > > > > > > to > >> > > > > > > > > > > support over the long haul but it might be okay just > >> to > >> > > > create > >> > > > > > the > >> > > > > > > > > > records > >> > > > > > > > > > > instance for this interface. > >> > > > > > > > > > > > >> > > > > > > > > > > -Jay > >> > > > > > > > > > > > >> > > > > > > > > > > On Mon, Jan 25, 2016 at 12:37 PM, Joel Koshy < > >> > > > > > jjkosh...@gmail.com> > >> > > > > > > > > > wrote: > >> > > > > > > > > > > > >> > > > > > > > > > > > I'm definitely in favor of having such hooks in > the > >> > > > > > > produce/consume > >> > > > > > > > > > > > life-cycle. Not sure if people remember this but > in > >> > Kafka > >> > > > 0.7 > >> > > > > > > this > >> > > > > > > > > was > >> > > > > > > > > > > > pretty much how it was: > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://github.com/apache/kafka/blob/0.7/core/src/main/scala/kafka/producer/async/CallbackHandler.scala > >> > > > > > > > > > > > i.e., we had something similar to the interceptor > >> > > proposal > >> > > > > for > >> > > > > > > > > various > >> > > > > > > > > > > > stages of the producer request. The producer > >> provided > >> > > > > > call-backs > >> > > > > > > > for > >> > > > > > > > > > > > beforeEnqueue, afterEnqueue, afterDequeuing, > >> > > beforeSending, > >> > > > > > etc. > >> > > > > > > So > >> > > > > > > > > at > >> > > > > > > > > > > > LinkedIn we in fact did auditing within these > >> > call-backs > >> > > > (and > >> > > > > > not > >> > > > > > > > > > > > explicitly in the wrapper). Over time and with 0.8 > >> we > >> > > moved > >> > > > > > that > >> > > > > > > > out > >> > > > > > > > > to > >> > > > > > > > > > > the > >> > > > > > > > > > > > wrapper libraries. > >> > > > > > > > > > > > > >> > > > > > > > > > > > On a side-note while audit and other monitoring > can > >> be > >> > > done > >> > > > > > > > > internally > >> > > > > > > > > > > in a > >> > > > > > > > > > > > convenient way I think it should be clarified that > >> > > having a > >> > > > > > > wrapper > >> > > > > > > > > is > >> > > > > > > > > > in > >> > > > > > > > > > > > general not a bad idea and I would even consider > it > >> to > >> > > be a > >> > > > > > > > > > > best-practice. > >> > > > > > > > > > > > Even with 0.7 we still had a wrapper library and > >> that > >> > API > >> > > > has > >> > > > > > > > largely > >> > > > > > > > > > > > stayed the same and has helped protect against > >> > (sometimes > >> > > > > > > backwards > >> > > > > > > > > > > > incompatible) changes in open source. > >> > > > > > > > > > > > > >> > > > > > > > > > > > While we are on this topic I have one comment and > >> Anna, > >> > > you > >> > > > > may > >> > > > > > > > have > >> > > > > > > > > > > > already considered this but I don't see mention of > >> it > >> > in > >> > > > the > >> > > > > > KIP: > >> > > > > > > > > > > > > >> > > > > > > > > > > > Add a custom message interceptor/validator on the > >> > broker > >> > > on > >> > > > > > > message > >> > > > > > > > > > > > arrival. > >> > > > > > > > > > > > > >> > > > > > > > > > > > We decompress and do basic validation of messages > on > >> > > > > arrival. I > >> > > > > > > > think > >> > > > > > > > > > > there > >> > > > > > > > > > > > is value in supporting custom validation and > expand > >> it > >> > to > >> > > > > > support > >> > > > > > > > > > custom > >> > > > > > > > > > > > on-arrival processing. Here is a specific > use-case I > >> > have > >> > > > in > >> > > > > > > mind. > >> > > > > > > > > The > >> > > > > > > > > > > blog > >> > > > > > > > > > > > that James referenced describes our auditing > >> > > > infrastructure. > >> > > > > In > >> > > > > > > > order > >> > > > > > > > > > to > >> > > > > > > > > > > > audit the Kafka cluster itself we need to run a > >> > "console > >> > > > > > auditor" > >> > > > > > > > > > service > >> > > > > > > > > > > > that consumes everything and spits out audit > events > >> > back > >> > > to > >> > > > > the > >> > > > > > > > > > cluster. > >> > > > > > > > > > > I > >> > > > > > > > > > > > would prefer not having to run this service > because: > >> > > > > > > > > > > > > >> > > > > > > > > > > > - Well, it is one more service that we have to > >> run > >> > and > >> > > > > > monitor > >> > > > > > > > > > > > - Consuming everything takes up bandwidth which > >> can > >> > be > >> > > > > > avoided > >> > > > > > > > > > > > - The console auditor consumer itself can lag > and > >> > > cause > >> > > > > > > > temporary > >> > > > > > > > > > > audit > >> > > > > > > > > > > > discrepancies > >> > > > > > > > > > > > > >> > > > > > > > > > > > One way we can mitigate this is by having > >> mirror-makers > >> > > in > >> > > > > > > between > >> > > > > > > > > > > clusters > >> > > > > > > > > > > > emit audit events. The problem is that the very > last > >> > > > cluster > >> > > > > in > >> > > > > > > the > >> > > > > > > > > > > > pipeline will not have any audit which is why we > >> need > >> > to > >> > > > have > >> > > > > > > > > something > >> > > > > > > > > > > to > >> > > > > > > > > > > > audit the cluster. > >> > > > > > > > > > > > > >> > > > > > > > > > > > If we had a custom message validator then the > audit > >> can > >> > > be > >> > > > > done > >> > > > > > > > > > > on-arrival > >> > > > > > > > > > > > and we won't need a console auditor. > >> > > > > > > > > > > > > >> > > > > > > > > > > > One potential issue in this approach and any > >> elaborate > >> > > > > > on-arrival > >> > > > > > > > > > > > processing for that matter is that you may need to > >> > > > > deserialize > >> > > > > > > the > >> > > > > > > > > > > message > >> > > > > > > > > > > > as well which can drive up produce request > handling > >> > > times. > >> > > > > > > However > >> > > > > > > > > I'm > >> > > > > > > > > > > not > >> > > > > > > > > > > > terribly concerned about that especially if the > >> audit > >> > > > header > >> > > > > > can > >> > > > > > > be > >> > > > > > > > > > > > separated out easily or even deserialized > partially > >> as > >> > > this > >> > > > > > Avro > >> > > > > > > > > thread > >> > > > > > > > > > > > touches on > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > http://search-hadoop.com/m/F2svI1HDLY12W8tnH1&subj=Re+any+optimization+in+reading+a+partial+schema+in+the+decoder+ > >> > > > > > > > > > > > > >> > > > > > > > > > > > Thanks, > >> > > > > > > > > > > > > >> > > > > > > > > > > > Joel > >> > > > > > > > > > > > > >> > > > > > > > > > > > On Mon, Jan 25, 2016 at 12:02 PM, Mayuresh Gharat > < > >> > > > > > > > > > > > gharatmayures...@gmail.com> wrote: > >> > > > > > > > > > > > > >> > > > > > > > > > > > > Nice KIP. Excellent idea. > >> > > > > > > > > > > > > Was just thinking if we can add onDequed() to > the > >> > > > > > > > > ProducerIterceptor > >> > > > > > > > > > > > > interface. Since we have the onEnqueued(), it > will > >> > help > >> > > > the > >> > > > > > > > client > >> > > > > > > > > or > >> > > > > > > > > > > the > >> > > > > > > > > > > > > tools to know how much time the message spent in > >> the > >> > > > > > > > > > RecordAccumulator. > >> > > > > > > > > > > > > Also an API to check if there are any messages > >> left > >> > > for a > >> > > > > > > > > particular > >> > > > > > > > > > > > topic > >> > > > > > > > > > > > > in the RecordAccumulator would help. > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > Thanks, > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > Mayuresh > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > On Mon, Jan 25, 2016 at 11:29 AM, Todd Palino < > >> > > > > > > tpal...@gmail.com > >> > > > > > > > > > >> > > > > > > > > > > wrote: > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > Great idea. I’ve been talking about this for 2 > >> > years, > >> > > > and > >> > > > > > I’m > >> > > > > > > > > glad > >> > > > > > > > > > > > > someone > >> > > > > > > > > > > > > > is finally picking it up. Will take a look at > >> the > >> > KIP > >> > > > at > >> > > > > > some > >> > > > > > > > > point > >> > > > > > > > > > > > > > shortly. > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > -Todd > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > On Mon, Jan 25, 2016 at 11:24 AM, Jay Kreps < > >> > > > > > > j...@confluent.io> > >> > > > > > > > > > > wrote: > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Hey Becket, > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Yeah this is really similar to the callback. > >> The > >> > > > > > difference > >> > > > > > > > is > >> > > > > > > > > > > really > >> > > > > > > > > > > > > in > >> > > > > > > > > > > > > > > who sets the behavior. The idea of the > >> > interceptor > >> > > is > >> > > > > > that > >> > > > > > > it > >> > > > > > > > > > > doesn't > >> > > > > > > > > > > > > > > require any code change in apps so you can > >> > globally > >> > > > add > >> > > > > > > > > behavior > >> > > > > > > > > > to > >> > > > > > > > > > > > > your > >> > > > > > > > > > > > > > > Kafka usage without changing app code. > Whereas > >> > the > >> > > > > > callback > >> > > > > > > > is > >> > > > > > > > > > > added > >> > > > > > > > > > > > by > >> > > > > > > > > > > > > > the > >> > > > > > > > > > > > > > > app. The idea is to kind of obviate the need > >> for > >> > > the > >> > > > > > > wrapper > >> > > > > > > > > code > >> > > > > > > > > > > > that > >> > > > > > > > > > > > > > e.g. > >> > > > > > > > > > > > > > > LinkedIn maintains to hold this kind of > stuff. > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > -Jay > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > On Sun, Jan 24, 2016 at 4:21 PM, Becket Qin > < > >> > > > > > > > > > becket....@gmail.com> > >> > > > > > > > > > > > > > wrote: > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > This could be a useful feature. And I > think > >> > there > >> > > > are > >> > > > > > > some > >> > > > > > > > > use > >> > > > > > > > > > > > cases > >> > > > > > > > > > > > > to > >> > > > > > > > > > > > > > > > mutate the data like rejected alternative > >> one > >> > > > > > mentioned. > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > I am wondering if there is functional > >> > overlapping > >> > > > > > between > >> > > > > > > > > > > > > > > > ProducerInterceptor.onAcknowledgement() > and > >> the > >> > > > > > producer > >> > > > > > > > > > > callback? > >> > > > > > > > > > > > I > >> > > > > > > > > > > > > > can > >> > > > > > > > > > > > > > > > see that the Callback could be a per > record > >> > > setting > >> > > > > > while > >> > > > > > > > > > > > > > > > onAcknowledgement() is a producer level > >> > setting. > >> > > > > Other > >> > > > > > > than > >> > > > > > > > > > that, > >> > > > > > > > > > > > is > >> > > > > > > > > > > > > > > there > >> > > > > > > > > > > > > > > > any difference between them? > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > Thanks, > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > Jiangjie (Becket) Qin > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > On Fri, Jan 22, 2016 at 6:21 PM, Neha > >> Narkhede > >> > < > >> > > > > > > > > > > n...@confluent.io> > >> > > > > > > > > > > > > > > wrote: > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > James, > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > That is one of the many monitoring use > >> cases > >> > > for > >> > > > > the > >> > > > > > > > > > > interceptor > >> > > > > > > > > > > > > > > > interface. > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > Thanks, > >> > > > > > > > > > > > > > > > > Neha > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > On Fri, Jan 22, 2016 at 6:18 PM, James > >> Cheng > >> > < > >> > > > > > > > > > jch...@tivo.com> > >> > > > > > > > > > > > > > wrote: > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > Anna, > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > I'm trying to understand a concrete > use > >> > case. > >> > > > It > >> > > > > > > sounds > >> > > > > > > > > > like > >> > > > > > > > > > > > > > producer > >> > > > > > > > > > > > > > > > > > interceptors could be used to > implement > >> > part > >> > > of > >> > > > > > > > > LinkedIn's > >> > > > > > > > > > > > Kafak > >> > > > > > > > > > > > > > > Audit > >> > > > > > > > > > > > > > > > > > tool? > >> > > > > > > > > > > > > >> > > https://engineering.linkedin.com/kafka/running-kafka-scale > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > Part of that is done by a wrapper > >> library > >> > > > around > >> > > > > > the > >> > > > > > > > > kafka > >> > > > > > > > > > > > > producer > >> > > > > > > > > > > > > > > > that > >> > > > > > > > > > > > > > > > > > keeps a count of the number of > messages > >> > > > produced, > >> > > > > > and > >> > > > > > > > > then > >> > > > > > > > > > > > sends > >> > > > > > > > > > > > > > that > >> > > > > > > > > > > > > > > > > count > >> > > > > > > > > > > > > > > > > > to a side-topic. It sounds like the > >> > producer > >> > > > > > > > interceptors > >> > > > > > > > > > > could > >> > > > > > > > > > > > > > > > possibly > >> > > > > > > > > > > > > > > > > be > >> > > > > > > > > > > > > > > > > > used to implement that? > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > -James > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > On Jan 22, 2016, at 4:33 PM, Anna > >> > Povzner < > >> > > > > > > > > > > a...@confluent.io > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > > wrote: > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > Hi, > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > I just created a KIP-42 for adding > >> > producer > >> > > > and > >> > > > > > > > > consumer > >> > > > > > > > > > > > > > > interceptors > >> > > > > > > > > > > > > > > > > for > >> > > > > > > > > > > > > > > > > > > intercepting messages at different > >> points > >> > > on > >> > > > > > > producer > >> > > > > > > > > and > >> > > > > > > > > > > > > > consumer. > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > Comments and suggestions are > welcome! > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > Thanks, > >> > > > > > > > > > > > > > > > > > > Anna > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > ________________________________ > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > This email and any attachments may > >> contain > >> > > > > > > confidential > >> > > > > > > > > and > >> > > > > > > > > > > > > > > privileged > >> > > > > > > > > > > > > > > > > > material for the sole use of the > >> intended > >> > > > > > recipient. > >> > > > > > > > Any > >> > > > > > > > > > > > review, > >> > > > > > > > > > > > > > > > copying, > >> > > > > > > > > > > > > > > > > > or distribution of this email (or any > >> > > > > attachments) > >> > > > > > by > >> > > > > > > > > > others > >> > > > > > > > > > > is > >> > > > > > > > > > > > > > > > > prohibited. > >> > > > > > > > > > > > > > > > > > If you are not the intended recipient, > >> > please > >> > > > > > contact > >> > > > > > > > the > >> > > > > > > > > > > > sender > >> > > > > > > > > > > > > > > > > > immediately and permanently delete > this > >> > email > >> > > > and > >> > > > > > any > >> > > > > > > > > > > > > attachments. > >> > > > > > > > > > > > > > No > >> > > > > > > > > > > > > > > > > > employee or agent of TiVo Inc. is > >> > authorized > >> > > to > >> > > > > > > > conclude > >> > > > > > > > > > any > >> > > > > > > > > > > > > > binding > >> > > > > > > > > > > > > > > > > > agreement on behalf of TiVo Inc. by > >> email. > >> > > > > Binding > >> > > > > > > > > > agreements > >> > > > > > > > > > > > > with > >> > > > > > > > > > > > > > > TiVo > >> > > > > > > > > > > > > > > > > > Inc. may only be made by a signed > >> written > >> > > > > > agreement. > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > -- > >> > > > > > > > > > > > > > > > > Thanks, > >> > > > > > > > > > > > > > > > > Neha > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > -- > >> > > > > > > > > > > > > > *—-* > >> > > > > > > > > > > > > > *Todd Palino* > >> > > > > > > > > > > > > > Staff Site Reliability Engineer > >> > > > > > > > > > > > > > Data Infrastructure Streaming > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > linkedin.com/in/toddpalino > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > -- > >> > > > > > > > > > > > > -Regards, > >> > > > > > > > > > > > > Mayuresh R. Gharat > >> > > > > > > > > > > > > (862) 250-7125 > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > -- > >> > > > > > > > > > Thanks, > >> > > > > > > > > > Neha > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > -- > >> > > > > > > > *—-* > >> > > > > > > > *Todd Palino* > >> > > > > > > > Staff Site Reliability Engineer > >> > > > > > > > Data Infrastructure Streaming > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > linkedin.com/in/toddpalino > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > -- > >> > > > > > -Regards, > >> > > > > > Mayuresh R. Gharat > >> > > > > > (862) 250-7125 > >> > > > > > > >> > > > > > >> > > > > >> > > > > >> > > > > >> > > > -- > >> > > > *—-* > >> > > > *Todd Palino* > >> > > > Staff Site Reliability Engineer > >> > > > Data Infrastructure Streaming > >> > > > > >> > > > > >> > > > > >> > > > linkedin.com/in/toddpalino > >> > > > > >> > > > >> > > > >> > > > >> > > -- > >> > > -Regards, > >> > > Mayuresh R. Gharat > >> > > (862) 250-7125 > >> > > > >> > > >> > >> > >> > >> -- > >> -Regards, > >> Mayuresh R. Gharat > >> (862) 250-7125 > >> > > > > > -- Thanks, Neha