Hi Becket, It will be up to the interceptor to implement their audit or monitoring strategy. I would also imagine there is more than one good way to do audit. So, I agree that some of the interceptors may not use CRC, and we will not require it. The question is now whether intercepting CRCs is needed. I think they are very useful for monitoring and audit, because CRC provides an a easy way to get a summary of a message, rather than using message bytes or key/value objects.
Regarding record size, I agree that bandwidth example was not a good one. I think it would be hard to get actual bytes sent over the wire (your #2), since multiple records get compressed together and we would need to decide which bytes to account to which record. So I am inclined to only do your #1. However, it still makes more sense to me just to return record size including the header, since this is the actual record size. Thanks, Anna On Thu, Jan 28, 2016 at 11:46 AM, Becket Qin <becket....@gmail.com> wrote: > Anna, > > Using CRC to do end2end auditing might be very costly because you will need > to collect all the CRC from both producer and consumer. And it is based on > the assumption that broker does not modify the record. > Can you shed some idea on how end to end auditing will be using the CRC > before we decide to expose such low level detail to the end user? It would > also be helpful if you can compare it with something like sequence number > based auditing. > > About the record size, one thing worth notice is that the size of Record is > not the actual bytes sent over the wire if we use compression. So that does > not really tell user how much bandwidth they are using. Personally I think > two kinds of size may be useful. > 1. The record size after serialization, i.e. application bytes. (The > uncompressed record size can be easily derived as well) > 2. The actual bytes sent over the wire. > We can get (1) easily, but (2) is difficult to get at Record level when we > use compression. > > Thanks, > > Jiangjie (Becket) Qin > > On Thu, Jan 28, 2016 at 10:55 AM, Anna Povzner <a...@confluent.io> wrote: > > > Hi Becket, > > > > The use-case for CRC is end-to-end audit, rather than checking whether a > > single message is corrupt or not. > > > > Regarding record size, I was thinking to extract record size from Record. > > That will include header overhead as well. I think total record size will > > tell users how much bandwidth their messages take. Since header is > > relatively small and constant, users also will get an idea of their > > key/value sizes. > > > > Thanks, > > Anna > > > > On Thu, Jan 28, 2016 at 9:29 AM, Becket Qin <becket....@gmail.com> > wrote: > > > > > I am +1 on #1.2 and #3. > > > > > > #2: Regarding CRC, I am not sure if users care about CRC. is there any > > > specific use case? Currently we validate messages by calling > > ensureValid() > > > to verify the checksum and throw exception if it does not match. > > > > > > Message size would be useful. We can add that to ConsumerRecord. Can > you > > > clarify the message size you are referring to? Does it include the > > message > > > header overhead or not? From user's point of view, they probably don't > > care > > > about header size. > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > > > > On Wed, Jan 27, 2016 at 8:26 PM, Neha Narkhede <n...@confluent.io> > > wrote: > > > > > > > Anna, > > > > > > > > Thanks for being diligent. > > > > > > > > +1 on #1.2 and sounds good on #3. I recommend adding checksum and > size > > > > fields to RecordMetadata and ConsumerRecord instead of exposing > > metadata > > > > piecemeal in the interceptor APIs. > > > > > > > > Thanks, > > > > Neha > > > > > > > > On Wed, Jan 27, 2016 at 4:10 PM, Anna Povzner <a...@confluent.io> > > wrote: > > > > > > > > > Hi All, > > > > > > > > > > The KIP wiki page is now up-to-date with the scope we have agreed > on: > > > > > Producer and Consumer Interceptors with a minimal set of mutable > API > > > that > > > > > are not dependent on producer and consumer internal implementation. > > > > > > > > > > I have few more API details that I would like to bring attention to > > > > or/and > > > > > discuss: > > > > > > > > > > 1. Handling exceptions > > > > > > > > > > Exceptions can provide an additional level of control. For example, > > we > > > > can > > > > > filter messages on consumer side or stop messages on producer if > they > > > > don’t > > > > > have the right field. > > > > > > > > > > I see two options: > > > > > 1.1. For callbacks that can mutate records (onSend and onConsume), > > > > > propagate exceptions through the original calls > (KafkaProducer.send() > > > and > > > > > KafkaConsumer.poll() respectively). For other callbacks, catch > > > exception, > > > > > log, and ignore. > > > > > 1.2. Catch exceptions from all the interceptor callbacks and > ignore. > > > > > > > > > > The issue with 1.1. is that it effectively changes > > KafkaProducer.send() > > > > and > > > > > KafkaConsumer.poll() API, since now they may throw exceptions that > > are > > > > not > > > > > documented in KafkaProducer/Consumer API. Another option is to > allow > > to > > > > > propagate some exceptions, and ignore others. > > > > > > > > > > I think our use-cases do not require propagating exceptions. So, I > > > > propose > > > > > option 1.2. Unless someone has suggestion/use-cases for propagating > > > > > exceptions. Please let me know. > > > > > > > > > > 2. Intercepting record CRC and record size > > > > > > > > > > Since we decided not to add any intermediate callbacks (such as > > > onEnqueue > > > > > or onReceive) to interceptors, I think it is still valuable to > > > intercept > > > > > record CRC and record size in bytes for monitoring and audit > > use-cases. > > > > > > > > > > I propose to add checksum and size fields to RecordMetadata and > > > > > ConsumerRecord. Another option would be to add them as parameters > in > > > > > onAcknowledgement() and onConsume() callbacks. > > > > > > > > > > 3. Callbacks that allow to modify records look as follows: > > > > > ProducerRecord<K, V> onSend(ProducerRecord<K, V> record); > > > > > ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records); > > > > > > > > > > This means that interceptors can potentially modify topic/partition > > in > > > > > ProducerRecord and topic/partition/offset in ConsumerRecord. I > > propose > > > > that > > > > > it is up to the interceptor implementation to ensure that > > > > topic/partition, > > > > > etc is correct. KafkaProducer.send() will use topic, partition, > key, > > > and > > > > > value from ProducerRecord returned from the onSend(). Similarly, > > > > > ConsumerRecords returned from KafkaConsumer.poll() would be the > ones > > > > > returned from the interceptor. > > > > > > > > > > > > > > > > > > > > Please let me know if you have any suggestions or objections to the > > > > above. > > > > > > > > > > Thanks, > > > > > Anna > > > > > > > > > > > > > > > On Wed, Jan 27, 2016 at 2:56 PM, Anna Povzner <a...@confluent.io> > > > wrote: > > > > > > > > > > > Hi Mayuresh, > > > > > > > > > > > > I see why you would want to check for messages left in the > > > > > > RecordAccumulator. However, I don't think this will completely > > solve > > > > the > > > > > > problem. Messages could be in-flight somewhere else, like in the > > > > socket, > > > > > or > > > > > > there maybe in-flight messages on the consumer side of the > > > MirrorMaker. > > > > > So, > > > > > > if we go the route of checking whether there are any in-flight > > > messages > > > > > for > > > > > > topic deletion use-case, maybe it is better count them with > > onSend() > > > > and > > > > > > onAcknowledge() -- whether all messages sent were acknowledged. I > > > also > > > > > > think that it would be better to solve this without interceptors, > > > such > > > > as > > > > > > fix error handling in this scenario. However, I do not have any > > good > > > > > > proposal right now, so these are just general thoughts. > > > > > > > > > > > > Thanks, > > > > > > Anna > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Jan 27, 2016 at 11:18 AM, Mayuresh Gharat < > > > > > > gharatmayures...@gmail.com> wrote: > > > > > > > > > > > >> Calling producer.flush(), flushes all the data. So this is OK. > But > > > > when > > > > > >> you > > > > > >> are running Mirror maker, I am not sure there is a way to > flush() > > > from > > > > > >> outside. > > > > > >> > > > > > >> > > > > > >> Thanks, > > > > > >> > > > > > >> Mayuresh > > > > > >> > > > > > >> On Wed, Jan 27, 2016 at 11:08 AM, Becket Qin < > > becket....@gmail.com> > > > > > >> wrote: > > > > > >> > > > > > >> > Mayuresh, > > > > > >> > > > > > > >> > Regarding your use case about mirror maker. Is it good enough > as > > > > long > > > > > >> as we > > > > > >> > know there is no message for the topic in the producer > anymore? > > If > > > > > that > > > > > >> is > > > > > >> > the case, call producer.flush() is sufficient. > > > > > >> > > > > > > >> > Thanks, > > > > > >> > > > > > > >> > Jiangjie (Becket) Qin > > > > > >> > > > > > > >> > On Tue, Jan 26, 2016 at 6:18 PM, Mayuresh Gharat < > > > > > >> > gharatmayures...@gmail.com > > > > > >> > > wrote: > > > > > >> > > > > > > >> > > Hi Anna, > > > > > >> > > > > > > > >> > > Thanks a lot for summarizing the discussion on this kip. > > > > > >> > > > > > > > >> > > It LGTM. > > > > > >> > > This is really nice : > > > > > >> > > We decided not to add any callbacks to producer and consumer > > > > > >> > > interceptors that will depend on internal implementation as > > part > > > > of > > > > > >> this > > > > > >> > > KIP. > > > > > >> > > *However, it is possible to add them later as part of > another > > > KIP > > > > if > > > > > >> > there > > > > > >> > > are good use-cases.* > > > > > >> > > > > > > > >> > > Do you agree with the use case I explained earlier for > knowing > > > the > > > > > >> number > > > > > >> > > of records left in the RecordAccumulator for a particular > > topic. > > > > It > > > > > >> might > > > > > >> > > be orthogonal to this KIP, but will be helpful. What do you > > > think? > > > > > >> > > > > > > > >> > > Thanks, > > > > > >> > > > > > > > >> > > Mayuresh > > > > > >> > > > > > > > >> > > > > > > > >> > > On Tue, Jan 26, 2016 at 2:46 PM, Todd Palino < > > tpal...@gmail.com > > > > > > > > > >> wrote: > > > > > >> > > > > > > > >> > > > This looks good. As noted, having one mutable interceptor > on > > > > each > > > > > >> side > > > > > >> > > > allows for the use cases we can envision right now, and I > > > think > > > > > >> that’s > > > > > >> > > > going to provide a great deal of opportunity for > > implementing > > > > > things > > > > > >> > like > > > > > >> > > > audit, especially within a multi-tenant environment. > Looking > > > > > >> forward to > > > > > >> > > > getting this available in the clients. > > > > > >> > > > > > > > > >> > > > Thanks! > > > > > >> > > > > > > > > >> > > > -Todd > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > On Tue, Jan 26, 2016 at 2:36 PM, Anna Povzner < > > > > a...@confluent.io> > > > > > >> > wrote: > > > > > >> > > > > > > > > >> > > > > Hi All, > > > > > >> > > > > > > > > > >> > > > > Here is meeting notes from today’s KIP meeting: > > > > > >> > > > > > > > > > >> > > > > 1. We agreed to keep the scope of this KIP to be > producer > > > and > > > > > >> > consumer > > > > > >> > > > > interceptors only. Broker-side interceptor will be added > > > later > > > > > as > > > > > >> a > > > > > >> > > > > separate KIP. The reasons were already mentioned in this > > > > thread, > > > > > >> but > > > > > >> > > the > > > > > >> > > > > summary is: > > > > > >> > > > > * Broker interceptor is riskier and requires careful > > > > > >> consideration > > > > > >> > > about > > > > > >> > > > > overheads, whether to intercept leaders vs. > > > leaders/replicas, > > > > > >> what to > > > > > >> > > do > > > > > >> > > > on > > > > > >> > > > > leader failover and so on. > > > > > >> > > > > * Broker interceptors increase monitoring resolution, > but > > > not > > > > > >> > > including > > > > > >> > > > it > > > > > >> > > > > in this KIP does not reduce usefulness of producer and > > > > consumer > > > > > >> > > > > interceptors that enable end-to-end monitoring > > > > > >> > > > > > > > > > >> > > > > 2. We agreed to scope ProducerInterceptor and > > > > > ConsumerInterceptor > > > > > >> > > > callbacks > > > > > >> > > > > to minimal set of mutable API that are not dependent on > > > > producer > > > > > >> and > > > > > >> > > > > consumer internal implementation. > > > > > >> > > > > > > > > > >> > > > > ProducerInterceptor: > > > > > >> > > > > *ProducerRecord<K, V> onSend(ProducerRecord<K, V> > > record);* > > > > > >> > > > > *void onAcknowledgement(RecordMetadata metadata, > Exception > > > > > >> > exception);* > > > > > >> > > > > > > > > > >> > > > > ConsumerInterceptor: > > > > > >> > > > > *ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> > > > > > records);* > > > > > >> > > > > *void onCommit(Map<TopicPartition, OffsetAndMetadata> > > > > offsets);* > > > > > >> > > > > > > > > > >> > > > > We will allow interceptors to modify ProducerRecord on > > > > producer > > > > > >> side, > > > > > >> > > and > > > > > >> > > > > modify ConsumerRecords on consumer side. This will > support > > > > > >> end-to-end > > > > > >> > > > > monitoring and auditing and support the ability to add > > > > metadata > > > > > >> for a > > > > > >> > > > > message. This will support Todd’s Auditing and Routing > > > > > use-cases. > > > > > >> > > > > > > > > > >> > > > > We did not find any use-case for modifying records in > > > > > onConsume() > > > > > >> > > > callback, > > > > > >> > > > > but decided to enable modification of consumer records > for > > > > > >> symmetry > > > > > >> > > with > > > > > >> > > > > onSend(). > > > > > >> > > > > > > > > > >> > > > > 3. We agreed to ensure compatibility when/if we add new > > > > methods > > > > > to > > > > > >> > > > > ProducerInterceptor and ConsumerInterceptor by using > > default > > > > > >> methods > > > > > >> > > with > > > > > >> > > > > an empty implementation. Ok to assume Java 8. (This is > > > > Ismael’s > > > > > >> > method > > > > > >> > > > #2). > > > > > >> > > > > > > > > > >> > > > > 4. We decided not to add any callbacks to producer and > > > > consumer > > > > > >> > > > > interceptors that will depend on internal implementation > > as > > > > part > > > > > >> of > > > > > >> > > this > > > > > >> > > > > KIP. However, it is possible to add them later as part > of > > > > > another > > > > > >> KIP > > > > > >> > > if > > > > > >> > > > > there are good use-cases. > > > > > >> > > > > > > > > > >> > > > > *Reasoning.* We did not have concrete use-cases that > > > justified > > > > > >> more > > > > > >> > > > methods > > > > > >> > > > > at this point. Some of the use-cases were for more > > > fine-grain > > > > > >> latency > > > > > >> > > > > collection, which could be done with Kafka Metrics. > > Another > > > > > >> use-case > > > > > >> > > was > > > > > >> > > > > encryption. However, there are several design options > for > > > > > >> encryption. > > > > > >> > > One > > > > > >> > > > > is to do per-record encryption which would require > adding > > > > > >> > > > > ProducerInterceptor.onEnqueued() and > > > > > >> ConsumerInterceptor.onReceive(). > > > > > >> > > One > > > > > >> > > > > could argue that in that case encryption could be done > by > > > > > adding a > > > > > >> > > custom > > > > > >> > > > > serializer/deserializer. Another option is to do > > encryption > > > > > after > > > > > >> > > message > > > > > >> > > > > gets compressed, but there are issues that arise > regarding > > > > > broker > > > > > >> > doing > > > > > >> > > > > re-compression. We decided that it is better to have > that > > > > > >> discussion > > > > > >> > > in a > > > > > >> > > > > separate KIP and decide that this is something we want > to > > do > > > > > with > > > > > >> > > > > interceptors or by other means. > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > Todd, Mayuresh and others who missed the KIP meeting, > > please > > > > let > > > > > >> me > > > > > >> > > know > > > > > >> > > > > your thoughts on the scope we agreed on during the > > meeting. > > > > > >> > > > > > > > > > >> > > > > I will update the KIP proposal with the current decision > > by > > > > end > > > > > of > > > > > >> > > today. > > > > > >> > > > > > > > > > >> > > > > Thanks, > > > > > >> > > > > Anna > > > > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > On Tue, Jan 26, 2016 at 11:41 AM, Mayuresh Gharat < > > > > > >> > > > > gharatmayures...@gmail.com> wrote: > > > > > >> > > > > > > > > > >> > > > > > Hi, > > > > > >> > > > > > > > > > > >> > > > > > I won't be able to make it to KIP hangout due to > > conflict. > > > > > >> > > > > > > > > > > >> > > > > > Anna, here is the use case where knowing if there are > > > > messages > > > > > >> in > > > > > >> > the > > > > > >> > > > > > RecordAccumulator left to be sent to the kafka cluster > > > for a > > > > > >> topic > > > > > >> > is > > > > > >> > > > > > useful. > > > > > >> > > > > > > > > > > >> > > > > > 1) Consider a pipeline : > > > > > >> > > > > > A ---> Mirror-maker -----> B > > > > > >> > > > > > > > > > > >> > > > > > 2) We have a topic T in cluster A mirrored to cluster > B. > > > > > >> > > > > > > > > > > >> > > > > > 3) Now if we delete topic T in A and immediately > proceed > > > to > > > > > >> delete > > > > > >> > > the > > > > > >> > > > > > topic in cluster B, some of the the Mirror-maker > > machines > > > > die > > > > > >> > because > > > > > >> > > > > > atleast one of the batches in RecordAccumulator for > > topic > > > T > > > > > >> fail to > > > > > >> > > be > > > > > >> > > > > > produced to cluster B. We have seen this happening in > > our > > > > > >> clusters. > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > If we know that there are no more messages left in the > > > > > >> > > > RecordAccumulator > > > > > >> > > > > to > > > > > >> > > > > > be produced to cluster B, we can safely delete the > topic > > > in > > > > > >> > cluster B > > > > > >> > > > > > without disturbing the pipeline. > > > > > >> > > > > > > > > > > >> > > > > > Thanks, > > > > > >> > > > > > > > > > > >> > > > > > Mayuresh > > > > > >> > > > > > > > > > > >> > > > > > On Tue, Jan 26, 2016 at 10:31 AM, Anna Povzner < > > > > > >> a...@confluent.io> > > > > > >> > > > > wrote: > > > > > >> > > > > > > > > > > >> > > > > > > Thanks Ismael and Todd for your feedback! > > > > > >> > > > > > > > > > > > >> > > > > > > I agree about coming up with lean, but useful > > interfaces > > > > > that > > > > > >> > will > > > > > >> > > be > > > > > >> > > > > > easy > > > > > >> > > > > > > to extend later. > > > > > >> > > > > > > > > > > > >> > > > > > > When we discuss the minimal set of producer and > > consumer > > > > > >> > > interceptor > > > > > >> > > > > API > > > > > >> > > > > > in > > > > > >> > > > > > > today’s KIP meeting (discussion item #2 in my > previous > > > > > email), > > > > > >> > lets > > > > > >> > > > > > compare > > > > > >> > > > > > > two options: > > > > > >> > > > > > > > > > > > >> > > > > > > *1. Minimal set of immutable API for producer and > > > consumer > > > > > >> > > > > interceptors* > > > > > >> > > > > > > > > > > > >> > > > > > > ProducerInterceptor: > > > > > >> > > > > > > public void onSend(ProducerRecord<K, V> record); > > > > > >> > > > > > > public void onAcknowledgement(RecordMetadata > metadata, > > > > > >> Exception > > > > > >> > > > > > > exception); > > > > > >> > > > > > > > > > > > >> > > > > > > ConsumerInterceptor: > > > > > >> > > > > > > public void onConsume(ConsumerRecords<K, V> > records); > > > > > >> > > > > > > public void onCommit(Map<TopicPartition, > > > > OffsetAndMetadata> > > > > > >> > > offsets); > > > > > >> > > > > > > > > > > > >> > > > > > > Use-cases: > > > > > >> > > > > > > — end-to-end monitoring; custom tracing and logging > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > *2. Minimal set of mutable API for producer and > > consumer > > > > > >> > > > interceptors* > > > > > >> > > > > > > > > > > > >> > > > > > > ProducerInterceptor: > > > > > >> > > > > > > ProducerRecord<K, V> onSend(ProducerRecord<K, V> > > > record); > > > > > >> > > > > > > void onAcknowledgement(RecordMetadata metadata, > > > Exception > > > > > >> > > exception); > > > > > >> > > > > > > > > > > > >> > > > > > > ConsumerInterceptor: > > > > > >> > > > > > > void onConsume(ConsumerRecords<K, V> records); > > > > > >> > > > > > > void onCommit(Map<TopicPartition, OffsetAndMetadata> > > > > > offsets); > > > > > >> > > > > > > > > > > > >> > > > > > > Additional use-cases to #1: > > > > > >> > > > > > > — Ability to add metadata to a message or fill in > > > standard > > > > > >> fields > > > > > >> > > for > > > > > >> > > > > > audit > > > > > >> > > > > > > and routing. > > > > > >> > > > > > > > > > > > >> > > > > > > Implications > > > > > >> > > > > > > — Partition assignment will be done based on > modified > > > > > >> key/value > > > > > >> > > > instead > > > > > >> > > > > > of > > > > > >> > > > > > > original key/value. If key/value transformation is > not > > > > > >> consistent > > > > > >> > > > (same > > > > > >> > > > > > key > > > > > >> > > > > > > and value does not mutate to the same, but modified, > > > > > >> key/value), > > > > > >> > > then > > > > > >> > > > > log > > > > > >> > > > > > > compaction would not work. However, audit and > routing > > > > > >> use-cases > > > > > >> > > from > > > > > >> > > > > Todd > > > > > >> > > > > > > will likely do consistent transformation. > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > *Additional callbacks (discussion item #3 in my > > previous > > > > > >> email):* > > > > > >> > > > > > > > > > > > >> > > > > > > If we want to support encryption, we would want to > be > > > able > > > > > to > > > > > >> > > modify > > > > > >> > > > > > > serialized key/value, rather than key and value > > objects. > > > > > This > > > > > >> > will > > > > > >> > > > add > > > > > >> > > > > > the > > > > > >> > > > > > > following API to producer and consumer interceptors: > > > > > >> > > > > > > > > > > > >> > > > > > > ProducerInterceptor: > > > > > >> > > > > > > SerializedKeyValue onEnqueued(TopicPartition tp, > > > > > >> > ProducerRecord<K, > > > > > >> > > V> > > > > > >> > > > > > > record, SerializedKeyValue serializedKeyValue); > > > > > >> > > > > > > > > > > > >> > > > > > > ConsumerInterceptor: > > > > > >> > > > > > > SerializedKeyValue onReceive(TopicPartition tp, > > > > > >> > SerializedKeyValue > > > > > >> > > > > > > serializedKeyValue); > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > I am leaning towards implementing the minimal set of > > > > > >> immutable or > > > > > >> > > > > mutable > > > > > >> > > > > > > interfaces, making sure that we have a compatibility > > > plan > > > > > that > > > > > >> > > allows > > > > > >> > > > > us > > > > > >> > > > > > to > > > > > >> > > > > > > add more callbacks in the future (per Ismael > comment), > > > and > > > > > add > > > > > >> > more > > > > > >> > > > > APIs > > > > > >> > > > > > > later. E.g., for encryption use-case, there could be > > an > > > > > >> argument > > > > > >> > in > > > > > >> > > > > doing > > > > > >> > > > > > > encryption after message compression vs. per-record > > > > > encryption > > > > > >> > that > > > > > >> > > > > could > > > > > >> > > > > > > be done using the above additional API. There is > also > > > more > > > > > >> > > > implications > > > > > >> > > > > > for > > > > > >> > > > > > > every API that modifies records: modifying > serialized > > > > > >> key/value > > > > > >> > > will > > > > > >> > > > > > again > > > > > >> > > > > > > impact partition assignment (we will likely do that > > > after > > > > > >> > partition > > > > > >> > > > > > > assignment), which may impact log compaction and > > mirror > > > > > maker > > > > > >> > > > > > partitioning. > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > Thanks, > > > > > >> > > > > > > Anna > > > > > >> > > > > > > > > > > > >> > > > > > > On Tue, Jan 26, 2016 at 7:26 AM, Todd Palino < > > > > > >> tpal...@gmail.com> > > > > > >> > > > > wrote: > > > > > >> > > > > > > > > > > > >> > > > > > > > Finally got a chance to take a look at this. I > won’t > > > be > > > > > >> able to > > > > > >> > > > make > > > > > >> > > > > > the > > > > > >> > > > > > > > KIP meeting due to a conflict. > > > > > >> > > > > > > > > > > > > >> > > > > > > > I’m somewhat disappointed in this proposal. I > think > > > that > > > > > the > > > > > >> > > > explicit > > > > > >> > > > > > > > exclusion of modification of the messages is > > > > > short-sighted, > > > > > >> and > > > > > >> > > not > > > > > >> > > > > > > > accounting for it now is going to bite us later. > > Jay, > > > > > aren’t > > > > > >> > you > > > > > >> > > > the > > > > > >> > > > > > one > > > > > >> > > > > > > > railing against public interfaces and how > difficult > > > they > > > > > >> are to > > > > > >> > > > work > > > > > >> > > > > > with > > > > > >> > > > > > > > when you don’t get them right? The “simple” change > > to > > > > one > > > > > of > > > > > >> > > these > > > > > >> > > > > > > > interfaces to make it able to return a record is > > going > > > > to > > > > > >> be a > > > > > >> > > > > > > significant > > > > > >> > > > > > > > change and is going to require all clients to > > rewrite > > > > > their > > > > > >> > > > > > interceptors. > > > > > >> > > > > > > > If we’re not willing to put the time to think > > through > > > > > >> > > manipulation > > > > > >> > > > > now, > > > > > >> > > > > > > > then this KIP should be shelved until we are. > > > > Implementing > > > > > >> > > > something > > > > > >> > > > > > > > halfway is going to be worse than taking a little > > > > longer. > > > > > In > > > > > >> > > > > addition, > > > > > >> > > > > > I > > > > > >> > > > > > > > don’t believe that manipulation requires anything > > more > > > > > than > > > > > >> > > > > > interceptors > > > > > >> > > > > > > to > > > > > >> > > > > > > > receive the full record, and then to return it. > > > > > >> > > > > > > > > > > > > >> > > > > > > > There are 3 use case I can think of right now > > without > > > > any > > > > > >> deep > > > > > >> > > > > > discussion > > > > > >> > > > > > > > that can make use of interceptors with > modification: > > > > > >> > > > > > > > > > > > > >> > > > > > > > 1. Auditing. The ability to add metadata to a > > message > > > > for > > > > > >> > > auditing > > > > > >> > > > is > > > > > >> > > > > > > > critical. Hostname, service name, timestamps, etc. > > are > > > > all > > > > > >> > pieces > > > > > >> > > > of > > > > > >> > > > > > data > > > > > >> > > > > > > > that can be used on the other side of the pipeline > > to > > > > > >> > categorize > > > > > >> > > > > > > messages, > > > > > >> > > > > > > > determine loss and transport time, and pin down > > > issues. > > > > > You > > > > > >> may > > > > > >> > > say > > > > > >> > > > > > that > > > > > >> > > > > > > > these things can just be part of the message > schema, > > > but > > > > > >> anyone > > > > > >> > > who > > > > > >> > > > > has > > > > > >> > > > > > > > worked with a multi-user data system (especially > > those > > > > who > > > > > >> have > > > > > >> > > > been > > > > > >> > > > > > > > involved with LinkedIn) know how difficult it is > to > > > > > maintain > > > > > >> > > > > consistent > > > > > >> > > > > > > > message schemas and to get other people to put in > > > fields > > > > > for > > > > > >> > your > > > > > >> > > > > use. > > > > > >> > > > > > > > > > > > > >> > > > > > > > 2. Encryption. This is probably the most obvious > > case > > > > for > > > > > >> > record > > > > > >> > > > > > > > manipulation on both sides. The ability to tie in > > end > > > to > > > > > end > > > > > >> > > > > encryption > > > > > >> > > > > > > is > > > > > >> > > > > > > > important for data that requires external > compliance > > > > (PCI, > > > > > >> > HIPAA, > > > > > >> > > > > > etc.). > > > > > >> > > > > > > > > > > > > >> > > > > > > > 3. Routing. By being able to add a bit of > > information > > > > > about > > > > > >> the > > > > > >> > > > > source > > > > > >> > > > > > or > > > > > >> > > > > > > > destination of a message to the metadata, you can > > > easily > > > > > >> > > construct > > > > > >> > > > an > > > > > >> > > > > > > > intelligent mirror maker that can prevent loops. > > This > > > > has > > > > > >> the > > > > > >> > > > > > opportunity > > > > > >> > > > > > > > to result in significant operational savings, as > you > > > can > > > > > get > > > > > >> > rid > > > > > >> > > of > > > > > >> > > > > the > > > > > >> > > > > > > > need for tiered clusters in order to prevent loops > > in > > > > > >> mirroring > > > > > >> > > > > > messages. > > > > > >> > > > > > > > > > > > > >> > > > > > > > All three of these share the feature that they add > > > > > metadata > > > > > >> to > > > > > >> > > > > > messages. > > > > > >> > > > > > > > With the pushback on having arbitrary metadata as > an > > > > > >> “envelope” > > > > > >> > > to > > > > > >> > > > > the > > > > > >> > > > > > > > message, this is a way to provide it and make it > the > > > > > >> > > responsibility > > > > > >> > > > > of > > > > > >> > > > > > > the > > > > > >> > > > > > > > client, and not the Kafka broker and system > itself. > > > > > >> > > > > > > > > > > > > >> > > > > > > > -Todd > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > On Tue, Jan 26, 2016 at 2:30 AM, Ismael Juma < > > > > > >> > ism...@juma.me.uk> > > > > > >> > > > > > wrote: > > > > > >> > > > > > > > > > > > > >> > > > > > > > > Hi Anna and Neha, > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > I think it makes a lot of sense to try and keep > > the > > > > > >> interface > > > > > >> > > > lean > > > > > >> > > > > > and > > > > > >> > > > > > > to > > > > > >> > > > > > > > > add more methods later when/if there is a need. > > What > > > > is > > > > > >> the > > > > > >> > > > current > > > > > >> > > > > > > > > thinking with regards to compatibility when/if > we > > > add > > > > > new > > > > > >> > > > methods? > > > > > >> > > > > A > > > > > >> > > > > > > few > > > > > >> > > > > > > > > options come to mind: > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > 1. Change the interface to an abstract class > with > > > > empty > > > > > >> > > > > > implementations > > > > > >> > > > > > > > for > > > > > >> > > > > > > > > all the methods. This means that the path to > > adding > > > > new > > > > > >> > methods > > > > > >> > > > is > > > > > >> > > > > > > clear. > > > > > >> > > > > > > > > 2. Hope we have moved to Java 8 by the time we > > need > > > to > > > > > add > > > > > >> > new > > > > > >> > > > > > methods > > > > > >> > > > > > > > and > > > > > >> > > > > > > > > use default methods with an empty implementation > > for > > > > any > > > > > >> new > > > > > >> > > > method > > > > > >> > > > > > > (and > > > > > >> > > > > > > > > potentially make existing methods default > methods > > > too > > > > at > > > > > >> that > > > > > >> > > > point > > > > > >> > > > > > for > > > > > >> > > > > > > > > consistency) > > > > > >> > > > > > > > > 3. Introduce a new interface that inherits from > > the > > > > > >> existing > > > > > >> > > > > > > Interceptor > > > > > >> > > > > > > > > interface when we need to add new methods. > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > Option 1 is the easiest and it also means that > > > > > interceptor > > > > > >> > > users > > > > > >> > > > > only > > > > > >> > > > > > > > need > > > > > >> > > > > > > > > to override the methods that they are interested > > > (more > > > > > >> useful > > > > > >> > > if > > > > > >> > > > > the > > > > > >> > > > > > > > number > > > > > >> > > > > > > > > of methods grows). The downside is that > > interceptor > > > > > >> > > > implementations > > > > > >> > > > > > > > cannot > > > > > >> > > > > > > > > inherit from another class (a straightforward > > > > workaround > > > > > >> is > > > > > >> > to > > > > > >> > > > make > > > > > >> > > > > > the > > > > > >> > > > > > > > > interceptor a forwarder that calls another > class). > > > > Also, > > > > > >> our > > > > > >> > > > > existing > > > > > >> > > > > > > > > callbacks are interfaces, so seems a bit > > > inconsistent. > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > Option 2 may be the most appealing one as both > > users > > > > and > > > > > >> > > > ourselves > > > > > >> > > > > > > retain > > > > > >> > > > > > > > > flexibility. The main downside is that it relies > > on > > > us > > > > > >> moving > > > > > >> > > to > > > > > >> > > > > Java > > > > > >> > > > > > > 8, > > > > > >> > > > > > > > > which may be more than a year away potentially > (if > > > we > > > > > >> support > > > > > >> > > the > > > > > >> > > > > > last > > > > > >> > > > > > > 2 > > > > > >> > > > > > > > > Java releases). > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > Thoughts? > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > Ismael > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > On Tue, Jan 26, 2016 at 4:59 AM, Neha Narkhede < > > > > > >> > > > n...@confluent.io> > > > > > >> > > > > > > > wrote: > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > Anna, > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > I'm also in favor of including just the APIs > for > > > > which > > > > > >> we > > > > > >> > > have > > > > > >> > > > a > > > > > >> > > > > > > clear > > > > > >> > > > > > > > > use > > > > > >> > > > > > > > > > case. If more use cases for finer monitoring > > show > > > up > > > > > in > > > > > >> the > > > > > >> > > > > future, > > > > > >> > > > > > > we > > > > > >> > > > > > > > > can > > > > > >> > > > > > > > > > always update the interface. Would you please > > > > > highlight > > > > > >> in > > > > > >> > > the > > > > > >> > > > > KIP > > > > > >> > > > > > > the > > > > > >> > > > > > > > > APIs > > > > > >> > > > > > > > > > that you think we have an immediate use for? > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > Joel, > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > Broker-side monitoring makes a lot of sense in > > the > > > > > long > > > > > >> > term > > > > > >> > > > > > though I > > > > > >> > > > > > > > > don't > > > > > >> > > > > > > > > > think it is a requirement for end-to-end > > > monitoring. > > > > > >> With > > > > > >> > the > > > > > >> > > > > > > producer > > > > > >> > > > > > > > > and > > > > > >> > > > > > > > > > consumer interceptors, you have the ability to > > get > > > > > full > > > > > >> > > > > > > > > > publish-to-subscribe end-to-end monitoring. > The > > > > broker > > > > > >> > > > > interceptor > > > > > >> > > > > > > > > > certainly improves the resolution of > monitoring > > > but > > > > it > > > > > >> is > > > > > >> > > also > > > > > >> > > > a > > > > > >> > > > > > > > riskier > > > > > >> > > > > > > > > > change. I prefer an incremental approach over > a > > > > > big-bang > > > > > >> > and > > > > > >> > > > > > > recommend > > > > > >> > > > > > > > > > taking baby-steps. Let's first make sure the > > > > > >> > > producer/consumer > > > > > >> > > > > > > > > interceptors > > > > > >> > > > > > > > > > are successful. And then come back and add the > > > > broker > > > > > >> > > > interceptor > > > > > >> > > > > > > > > > carefully. > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > Having said that, it would be great to > > understand > > > > your > > > > > >> > > proposal > > > > > >> > > > > for > > > > > >> > > > > > > the > > > > > >> > > > > > > > > > broker interceptor independently. We can > either > > > add > > > > an > > > > > >> > > > > interceptor > > > > > >> > > > > > > > > > on-append or on-commit. If people want to use > > this > > > > for > > > > > >> > > > > monitoring, > > > > > >> > > > > > > then > > > > > >> > > > > > > > > > possibly on-commit might be more useful? > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > Thanks, > > > > > >> > > > > > > > > > Neha > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > On Mon, Jan 25, 2016 at 6:47 PM, Jay Kreps < > > > > > >> > j...@confluent.io > > > > > >> > > > > > > > > >> > > > > > wrote: > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > Hey Joel, > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > What is the interface you are thinking of? > > > > Something > > > > > >> like > > > > > >> > > > this: > > > > > >> > > > > > > > > > > onAppend(String topic, int partition, > > > Records > > > > > >> > records, > > > > > >> > > > long > > > > > >> > > > > > > time) > > > > > >> > > > > > > > > > > ? > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > One challenge right now is that we are still > > > using > > > > > the > > > > > >> > old > > > > > >> > > > > > > > > > > Message/MessageSet classes on the broker > which > > > I'm > > > > > not > > > > > >> > sure > > > > > >> > > > if > > > > > >> > > > > > we'd > > > > > >> > > > > > > > > want > > > > > >> > > > > > > > > > to > > > > > >> > > > > > > > > > > support over the long haul but it might be > > okay > > > > just > > > > > >> to > > > > > >> > > > create > > > > > >> > > > > > the > > > > > >> > > > > > > > > > records > > > > > >> > > > > > > > > > > instance for this interface. > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > -Jay > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > On Mon, Jan 25, 2016 at 12:37 PM, Joel > Koshy < > > > > > >> > > > > > jjkosh...@gmail.com> > > > > > >> > > > > > > > > > wrote: > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > I'm definitely in favor of having such > hooks > > > in > > > > > the > > > > > >> > > > > > > produce/consume > > > > > >> > > > > > > > > > > > life-cycle. Not sure if people remember > this > > > but > > > > > in > > > > > >> > Kafka > > > > > >> > > > 0.7 > > > > > >> > > > > > > this > > > > > >> > > > > > > > > was > > > > > >> > > > > > > > > > > > pretty much how it was: > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/0.7/core/src/main/scala/kafka/producer/async/CallbackHandler.scala > > > > > >> > > > > > > > > > > > i.e., we had something similar to the > > > > interceptor > > > > > >> > > proposal > > > > > >> > > > > for > > > > > >> > > > > > > > > various > > > > > >> > > > > > > > > > > > stages of the producer request. The > producer > > > > > >> provided > > > > > >> > > > > > call-backs > > > > > >> > > > > > > > for > > > > > >> > > > > > > > > > > > beforeEnqueue, afterEnqueue, > afterDequeuing, > > > > > >> > > beforeSending, > > > > > >> > > > > > etc. > > > > > >> > > > > > > So > > > > > >> > > > > > > > > at > > > > > >> > > > > > > > > > > > LinkedIn we in fact did auditing within > > these > > > > > >> > call-backs > > > > > >> > > > (and > > > > > >> > > > > > not > > > > > >> > > > > > > > > > > > explicitly in the wrapper). Over time and > > with > > > > 0.8 > > > > > >> we > > > > > >> > > moved > > > > > >> > > > > > that > > > > > >> > > > > > > > out > > > > > >> > > > > > > > > to > > > > > >> > > > > > > > > > > the > > > > > >> > > > > > > > > > > > wrapper libraries. > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > On a side-note while audit and other > > > monitoring > > > > > can > > > > > >> be > > > > > >> > > done > > > > > >> > > > > > > > > internally > > > > > >> > > > > > > > > > > in a > > > > > >> > > > > > > > > > > > convenient way I think it should be > > clarified > > > > that > > > > > >> > > having a > > > > > >> > > > > > > wrapper > > > > > >> > > > > > > > > is > > > > > >> > > > > > > > > > in > > > > > >> > > > > > > > > > > > general not a bad idea and I would even > > > consider > > > > > it > > > > > >> to > > > > > >> > > be a > > > > > >> > > > > > > > > > > best-practice. > > > > > >> > > > > > > > > > > > Even with 0.7 we still had a wrapper > library > > > and > > > > > >> that > > > > > >> > API > > > > > >> > > > has > > > > > >> > > > > > > > largely > > > > > >> > > > > > > > > > > > stayed the same and has helped protect > > against > > > > > >> > (sometimes > > > > > >> > > > > > > backwards > > > > > >> > > > > > > > > > > > incompatible) changes in open source. > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > While we are on this topic I have one > > comment > > > > and > > > > > >> Anna, > > > > > >> > > you > > > > > >> > > > > may > > > > > >> > > > > > > > have > > > > > >> > > > > > > > > > > > already considered this but I don't see > > > mention > > > > of > > > > > >> it > > > > > >> > in > > > > > >> > > > the > > > > > >> > > > > > KIP: > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > Add a custom message interceptor/validator > > on > > > > the > > > > > >> > broker > > > > > >> > > on > > > > > >> > > > > > > message > > > > > >> > > > > > > > > > > > arrival. > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > We decompress and do basic validation of > > > > messages > > > > > on > > > > > >> > > > > arrival. I > > > > > >> > > > > > > > think > > > > > >> > > > > > > > > > > there > > > > > >> > > > > > > > > > > > is value in supporting custom validation > and > > > > > expand > > > > > >> it > > > > > >> > to > > > > > >> > > > > > support > > > > > >> > > > > > > > > > custom > > > > > >> > > > > > > > > > > > on-arrival processing. Here is a specific > > > > > use-case I > > > > > >> > have > > > > > >> > > > in > > > > > >> > > > > > > mind. > > > > > >> > > > > > > > > The > > > > > >> > > > > > > > > > > blog > > > > > >> > > > > > > > > > > > that James referenced describes our > auditing > > > > > >> > > > infrastructure. > > > > > >> > > > > In > > > > > >> > > > > > > > order > > > > > >> > > > > > > > > > to > > > > > >> > > > > > > > > > > > audit the Kafka cluster itself we need to > > run > > > a > > > > > >> > "console > > > > > >> > > > > > auditor" > > > > > >> > > > > > > > > > service > > > > > >> > > > > > > > > > > > that consumes everything and spits out > audit > > > > > events > > > > > >> > back > > > > > >> > > to > > > > > >> > > > > the > > > > > >> > > > > > > > > > cluster. > > > > > >> > > > > > > > > > > I > > > > > >> > > > > > > > > > > > would prefer not having to run this > service > > > > > because: > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > - Well, it is one more service that we > > have > > > > to > > > > > >> run > > > > > >> > and > > > > > >> > > > > > monitor > > > > > >> > > > > > > > > > > > - Consuming everything takes up > bandwidth > > > > which > > > > > >> can > > > > > >> > be > > > > > >> > > > > > avoided > > > > > >> > > > > > > > > > > > - The console auditor consumer itself > can > > > lag > > > > > and > > > > > >> > > cause > > > > > >> > > > > > > > temporary > > > > > >> > > > > > > > > > > audit > > > > > >> > > > > > > > > > > > discrepancies > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > One way we can mitigate this is by having > > > > > >> mirror-makers > > > > > >> > > in > > > > > >> > > > > > > between > > > > > >> > > > > > > > > > > clusters > > > > > >> > > > > > > > > > > > emit audit events. The problem is that the > > > very > > > > > last > > > > > >> > > > cluster > > > > > >> > > > > in > > > > > >> > > > > > > the > > > > > >> > > > > > > > > > > > pipeline will not have any audit which is > > why > > > we > > > > > >> need > > > > > >> > to > > > > > >> > > > have > > > > > >> > > > > > > > > something > > > > > >> > > > > > > > > > > to > > > > > >> > > > > > > > > > > > audit the cluster. > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > If we had a custom message validator then > > the > > > > > audit > > > > > >> can > > > > > >> > > be > > > > > >> > > > > done > > > > > >> > > > > > > > > > > on-arrival > > > > > >> > > > > > > > > > > > and we won't need a console auditor. > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > One potential issue in this approach and > any > > > > > >> elaborate > > > > > >> > > > > > on-arrival > > > > > >> > > > > > > > > > > > processing for that matter is that you may > > > need > > > > to > > > > > >> > > > > deserialize > > > > > >> > > > > > > the > > > > > >> > > > > > > > > > > message > > > > > >> > > > > > > > > > > > as well which can drive up produce request > > > > > handling > > > > > >> > > times. > > > > > >> > > > > > > However > > > > > >> > > > > > > > > I'm > > > > > >> > > > > > > > > > > not > > > > > >> > > > > > > > > > > > terribly concerned about that especially > if > > > the > > > > > >> audit > > > > > >> > > > header > > > > > >> > > > > > can > > > > > >> > > > > > > be > > > > > >> > > > > > > > > > > > separated out easily or even deserialized > > > > > partially > > > > > >> as > > > > > >> > > this > > > > > >> > > > > > Avro > > > > > >> > > > > > > > > thread > > > > > >> > > > > > > > > > > > touches on > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > http://search-hadoop.com/m/F2svI1HDLY12W8tnH1&subj=Re+any+optimization+in+reading+a+partial+schema+in+the+decoder+ > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > Thanks, > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > Joel > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > On Mon, Jan 25, 2016 at 12:02 PM, Mayuresh > > > > Gharat > > > > > < > > > > > >> > > > > > > > > > > > gharatmayures...@gmail.com> wrote: > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > Nice KIP. Excellent idea. > > > > > >> > > > > > > > > > > > > Was just thinking if we can add > onDequed() > > > to > > > > > the > > > > > >> > > > > > > > > ProducerIterceptor > > > > > >> > > > > > > > > > > > > interface. Since we have the > onEnqueued(), > > > it > > > > > will > > > > > >> > help > > > > > >> > > > the > > > > > >> > > > > > > > client > > > > > >> > > > > > > > > or > > > > > >> > > > > > > > > > > the > > > > > >> > > > > > > > > > > > > tools to know how much time the message > > > spent > > > > in > > > > > >> the > > > > > >> > > > > > > > > > RecordAccumulator. > > > > > >> > > > > > > > > > > > > Also an API to check if there are any > > > messages > > > > > >> left > > > > > >> > > for a > > > > > >> > > > > > > > > particular > > > > > >> > > > > > > > > > > > topic > > > > > >> > > > > > > > > > > > > in the RecordAccumulator would help. > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > Thanks, > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > Mayuresh > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > On Mon, Jan 25, 2016 at 11:29 AM, Todd > > > Palino > > > > < > > > > > >> > > > > > > tpal...@gmail.com > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > wrote: > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > Great idea. I’ve been talking about > this > > > > for 2 > > > > > >> > years, > > > > > >> > > > and > > > > > >> > > > > > I’m > > > > > >> > > > > > > > > glad > > > > > >> > > > > > > > > > > > > someone > > > > > >> > > > > > > > > > > > > > is finally picking it up. Will take a > > look > > > > at > > > > > >> the > > > > > >> > KIP > > > > > >> > > > at > > > > > >> > > > > > some > > > > > >> > > > > > > > > point > > > > > >> > > > > > > > > > > > > > shortly. > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > -Todd > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > On Mon, Jan 25, 2016 at 11:24 AM, Jay > > > Kreps > > > > < > > > > > >> > > > > > > j...@confluent.io> > > > > > >> > > > > > > > > > > wrote: > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Hey Becket, > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Yeah this is really similar to the > > > > callback. > > > > > >> The > > > > > >> > > > > > difference > > > > > >> > > > > > > > is > > > > > >> > > > > > > > > > > really > > > > > >> > > > > > > > > > > > > in > > > > > >> > > > > > > > > > > > > > > who sets the behavior. The idea of > the > > > > > >> > interceptor > > > > > >> > > is > > > > > >> > > > > > that > > > > > >> > > > > > > it > > > > > >> > > > > > > > > > > doesn't > > > > > >> > > > > > > > > > > > > > > require any code change in apps so > you > > > can > > > > > >> > globally > > > > > >> > > > add > > > > > >> > > > > > > > > behavior > > > > > >> > > > > > > > > > to > > > > > >> > > > > > > > > > > > > your > > > > > >> > > > > > > > > > > > > > > Kafka usage without changing app > code. > > > > > Whereas > > > > > >> > the > > > > > >> > > > > > callback > > > > > >> > > > > > > > is > > > > > >> > > > > > > > > > > added > > > > > >> > > > > > > > > > > > by > > > > > >> > > > > > > > > > > > > > the > > > > > >> > > > > > > > > > > > > > > app. The idea is to kind of obviate > > the > > > > need > > > > > >> for > > > > > >> > > the > > > > > >> > > > > > > wrapper > > > > > >> > > > > > > > > code > > > > > >> > > > > > > > > > > > that > > > > > >> > > > > > > > > > > > > > e.g. > > > > > >> > > > > > > > > > > > > > > LinkedIn maintains to hold this kind > > of > > > > > stuff. > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > -Jay > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > On Sun, Jan 24, 2016 at 4:21 PM, > > Becket > > > > Qin > > > > > < > > > > > >> > > > > > > > > > becket....@gmail.com> > > > > > >> > > > > > > > > > > > > > wrote: > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > This could be a useful feature. > And > > I > > > > > think > > > > > >> > there > > > > > >> > > > are > > > > > >> > > > > > > some > > > > > >> > > > > > > > > use > > > > > >> > > > > > > > > > > > cases > > > > > >> > > > > > > > > > > > > to > > > > > >> > > > > > > > > > > > > > > > mutate the data like rejected > > > > alternative > > > > > >> one > > > > > >> > > > > > mentioned. > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > I am wondering if there is > > functional > > > > > >> > overlapping > > > > > >> > > > > > between > > > > > >> > > > > > > > > > > > > > > > > > > ProducerInterceptor.onAcknowledgement() > > > > > and > > > > > >> the > > > > > >> > > > > > producer > > > > > >> > > > > > > > > > > callback? > > > > > >> > > > > > > > > > > > I > > > > > >> > > > > > > > > > > > > > can > > > > > >> > > > > > > > > > > > > > > > see that the Callback could be a > per > > > > > record > > > > > >> > > setting > > > > > >> > > > > > while > > > > > >> > > > > > > > > > > > > > > > onAcknowledgement() is a producer > > > level > > > > > >> > setting. > > > > > >> > > > > Other > > > > > >> > > > > > > than > > > > > >> > > > > > > > > > that, > > > > > >> > > > > > > > > > > > is > > > > > >> > > > > > > > > > > > > > > there > > > > > >> > > > > > > > > > > > > > > > any difference between them? > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > Thanks, > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > Jiangjie (Becket) Qin > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > On Fri, Jan 22, 2016 at 6:21 PM, > > Neha > > > > > >> Narkhede > > > > > >> > < > > > > > >> > > > > > > > > > > n...@confluent.io> > > > > > >> > > > > > > > > > > > > > > wrote: > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > James, > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > That is one of the many > monitoring > > > use > > > > > >> cases > > > > > >> > > for > > > > > >> > > > > the > > > > > >> > > > > > > > > > > interceptor > > > > > >> > > > > > > > > > > > > > > > interface. > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > Thanks, > > > > > >> > > > > > > > > > > > > > > > > Neha > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > On Fri, Jan 22, 2016 at 6:18 PM, > > > James > > > > > >> Cheng > > > > > >> > < > > > > > >> > > > > > > > > > jch...@tivo.com> > > > > > >> > > > > > > > > > > > > > wrote: > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > Anna, > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > I'm trying to understand a > > > concrete > > > > > use > > > > > >> > case. > > > > > >> > > > It > > > > > >> > > > > > > sounds > > > > > >> > > > > > > > > > like > > > > > >> > > > > > > > > > > > > > producer > > > > > >> > > > > > > > > > > > > > > > > > interceptors could be used to > > > > > implement > > > > > >> > part > > > > > >> > > of > > > > > >> > > > > > > > > LinkedIn's > > > > > >> > > > > > > > > > > > Kafak > > > > > >> > > > > > > > > > > > > > > Audit > > > > > >> > > > > > > > > > > > > > > > > > tool? > > > > > >> > > > > > > > > > > > > > > > > >> > > https://engineering.linkedin.com/kafka/running-kafka-scale > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > Part of that is done by a > > wrapper > > > > > >> library > > > > > >> > > > around > > > > > >> > > > > > the > > > > > >> > > > > > > > > kafka > > > > > >> > > > > > > > > > > > > producer > > > > > >> > > > > > > > > > > > > > > > that > > > > > >> > > > > > > > > > > > > > > > > > keeps a count of the number of > > > > > messages > > > > > >> > > > produced, > > > > > >> > > > > > and > > > > > >> > > > > > > > > then > > > > > >> > > > > > > > > > > > sends > > > > > >> > > > > > > > > > > > > > that > > > > > >> > > > > > > > > > > > > > > > > count > > > > > >> > > > > > > > > > > > > > > > > > to a side-topic. It sounds > like > > > the > > > > > >> > producer > > > > > >> > > > > > > > interceptors > > > > > >> > > > > > > > > > > could > > > > > >> > > > > > > > > > > > > > > > possibly > > > > > >> > > > > > > > > > > > > > > > > be > > > > > >> > > > > > > > > > > > > > > > > > used to implement that? > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > -James > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > On Jan 22, 2016, at 4:33 PM, > > > Anna > > > > > >> > Povzner < > > > > > >> > > > > > > > > > > a...@confluent.io > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > wrote: > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > Hi, > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > I just created a KIP-42 for > > > adding > > > > > >> > producer > > > > > >> > > > and > > > > > >> > > > > > > > > consumer > > > > > >> > > > > > > > > > > > > > > interceptors > > > > > >> > > > > > > > > > > > > > > > > for > > > > > >> > > > > > > > > > > > > > > > > > > intercepting messages at > > > different > > > > > >> points > > > > > >> > > on > > > > > >> > > > > > > producer > > > > > >> > > > > > > > > and > > > > > >> > > > > > > > > > > > > > consumer. > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > Comments and suggestions are > > > > > welcome! > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > Thanks, > > > > > >> > > > > > > > > > > > > > > > > > > Anna > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > ________________________________ > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > This email and any attachments > > may > > > > > >> contain > > > > > >> > > > > > > confidential > > > > > >> > > > > > > > > and > > > > > >> > > > > > > > > > > > > > > privileged > > > > > >> > > > > > > > > > > > > > > > > > material for the sole use of > the > > > > > >> intended > > > > > >> > > > > > recipient. > > > > > >> > > > > > > > Any > > > > > >> > > > > > > > > > > > review, > > > > > >> > > > > > > > > > > > > > > > copying, > > > > > >> > > > > > > > > > > > > > > > > > or distribution of this email > > (or > > > > any > > > > > >> > > > > attachments) > > > > > >> > > > > > by > > > > > >> > > > > > > > > > others > > > > > >> > > > > > > > > > > is > > > > > >> > > > > > > > > > > > > > > > > prohibited. > > > > > >> > > > > > > > > > > > > > > > > > If you are not the intended > > > > recipient, > > > > > >> > please > > > > > >> > > > > > contact > > > > > >> > > > > > > > the > > > > > >> > > > > > > > > > > > sender > > > > > >> > > > > > > > > > > > > > > > > > immediately and permanently > > delete > > > > > this > > > > > >> > email > > > > > >> > > > and > > > > > >> > > > > > any > > > > > >> > > > > > > > > > > > > attachments. > > > > > >> > > > > > > > > > > > > > No > > > > > >> > > > > > > > > > > > > > > > > > employee or agent of TiVo Inc. > > is > > > > > >> > authorized > > > > > >> > > to > > > > > >> > > > > > > > conclude > > > > > >> > > > > > > > > > any > > > > > >> > > > > > > > > > > > > > binding > > > > > >> > > > > > > > > > > > > > > > > > agreement on behalf of TiVo > Inc. > > > by > > > > > >> email. > > > > > >> > > > > Binding > > > > > >> > > > > > > > > > agreements > > > > > >> > > > > > > > > > > > > with > > > > > >> > > > > > > > > > > > > > > TiVo > > > > > >> > > > > > > > > > > > > > > > > > Inc. may only be made by a > > signed > > > > > >> written > > > > > >> > > > > > agreement. > > > > > >> > > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > -- > > > > > >> > > > > > > > > > > > > > > > > Thanks, > > > > > >> > > > > > > > > > > > > > > > > Neha > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > -- > > > > > >> > > > > > > > > > > > > > *—-* > > > > > >> > > > > > > > > > > > > > *Todd Palino* > > > > > >> > > > > > > > > > > > > > Staff Site Reliability Engineer > > > > > >> > > > > > > > > > > > > > Data Infrastructure Streaming > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > linkedin.com/in/toddpalino > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > -- > > > > > >> > > > > > > > > > > > > -Regards, > > > > > >> > > > > > > > > > > > > Mayuresh R. Gharat > > > > > >> > > > > > > > > > > > > (862) 250-7125 > > > > > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > -- > > > > > >> > > > > > > > > > Thanks, > > > > > >> > > > > > > > > > Neha > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > -- > > > > > >> > > > > > > > *—-* > > > > > >> > > > > > > > *Todd Palino* > > > > > >> > > > > > > > Staff Site Reliability Engineer > > > > > >> > > > > > > > Data Infrastructure Streaming > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > linkedin.com/in/toddpalino > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > -- > > > > > >> > > > > > -Regards, > > > > > >> > > > > > Mayuresh R. Gharat > > > > > >> > > > > > (862) 250-7125 > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > -- > > > > > >> > > > *—-* > > > > > >> > > > *Todd Palino* > > > > > >> > > > Staff Site Reliability Engineer > > > > > >> > > > Data Infrastructure Streaming > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > linkedin.com/in/toddpalino > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > -- > > > > > >> > > -Regards, > > > > > >> > > Mayuresh R. Gharat > > > > > >> > > (862) 250-7125 > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > >> > > > > > >> -- > > > > > >> -Regards, > > > > > >> Mayuresh R. Gharat > > > > > >> (862) 250-7125 > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > Thanks, > > > > Neha > > > > > > > > > >