HI Becket, Thanks for answering and providing feedback. I will withdraw KIP and put into rejected section.
Thanks, Bhavesh On Mon, Sep 21, 2015 at 9:53 AM, Jiangjie Qin <j...@linkedin.com.invalid> wrote: > Hey Bhavesh, > > I kind of think this metadata change capture logic should be implemented by > each user by themselves for the following reasons: > > 1. Most user do not really care about partition change. Adding the > logic/interface to default partitioner means for users who don't care about > the partition change, they are paying the price of making a cluster diff > for each metadata update. For a big cluster, this metadata diff could be > costly depending on how frequent the metadata is refreshed. > > 2. In some cases, user might only care about partition change for some > specific topic, in that case, there is no need to do a cluster diff for all > the topics a producer is producing data to. If the cluster diff is > implemented in user code, it would be more efficient because user can only > check the topic they are interested. Also, different users might care about > different changes in the metadata, e.g. topic create/delete/node change, > etc. So it seems better to leave the actual metadata change capture logic > to user instead of doing it in the producer. > > 3. The cluster diff code itself is short and not complicated so even if > user do it on their own it should be simple. e.g.: > { > if (this.cachedCluster.hashCode() != cluster.hashCode()) > for (String topic : cluster.topics()) { > if (this.cachedCluster.hashCode().contains(topic) && > this.cachedCluster.partitionsForTopic(topic).partition() != > cluster.partitionsForTopic(topic).partition()) > // handle partition change. > } > } > > Thanks, > > Jiangjie (Becket) Qin > > On Mon, Sep 21, 2015 at 9:13 AM, Bhavesh Mistry <mistry.p.bhav...@gmail.com> > wrote: > >> HI Jiagjie, >> >> Thanks for valuable feedback. >> >> 1) Thread Coordination for Change of partitions could be issue. >> >> I do agree with you that coordination between the application thread >> and sender thread would be tough one. The only concern I had was to >> share the same logic you had described among all the partitioner >> interface implementation, and let the Kafka framework level take care >> of doing the diff like you exactly describe >> >> In multithreaded environment, the change listener is being called from >> same thread that just finish the MetaData update will receive. >> >> Metadata Listener: >> >> >> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L163 >> >> producer.send() >> >> >> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L370 >> >> But the behavior of onChange() will not be different than what is today. >> >> For example, >> >> public Future<RecordMetadata> send(ProducerRecord<K, V> record, >> Callback callback) { >> >> // Determine partition for message >> >> int partition = partition(record, serializedKey, serializedValue, >> metadata.fetch()); >> >> /** >> >> Metadata update occurs after the application thread determine the >> partition for given method but before adding message to record queue >> the cluster change happened. So In my opinion behavior is same. >> >> ***/RecordAccumulator.RecordAppendResult result = >> accumulator.append(tp, serializedKey, serializedValue, callback); >> >> >> >> What do you think of adding the diff logic as you describe to Default >> Partitioner Implementation or (another implementation class called it >> Change Partitioner class ) which within partition() method calls >> onChange() method and whoever care or needs to know can do what they >> like (Log event, or use that to change partitioning strategy etc). >> >> This give ability to share the diff code and not all implementation >> have to implement diff logic that is main concern. >> >> >> Thanks, >> >> Bhavesh >> >> >> On Fri, Sep 18, 2015 at 3:47 PM, Jiangjie Qin <j...@linkedin.com.invalid> >> wrote: >> > Hey Bhavesh, >> > >> > I think it is useful to notify the user about the partition change. >> > >> > The problem of having a listener in producer is that it is hard to >> > guarantee the synchronization. For example, consider the following >> sequence: >> > 1. producer sender thread refreshes the metadata with partition change. >> > 2. user thread called send with customized partitioner, the partitioner >> > decided the partition with new metadata refreshed in step 1. >> > 3. producer sender thread calls onParitionChange() >> > >> > At that point, the message sent in step 2 was sent using the new >> metadata. >> > If we update the metadata after invoking onParttitionChange(), it is a >> > little strange because the metadata has not changed yet. >> > >> > Also the metadata refresh can happen in caller thread as well, not sure >> how >> > it would work with multiple caller thread. >> > >> > I am thinking it seems the user can actually get the idea of whether the >> > cluster has changed or not because the partition() method actually takes >> a >> > cluster parameter. So if user cares about the partition number change, >> they >> > can do the following: >> > 1. store a copy of cluster as cache in the partitioner. >> > 2. when partition() is called, check if the hash of this cluster is the >> > same as the cached cluster. >> > 3. If the hash of the passed in cluster is different from the hash of >> > cached cluster, that means a metadata refresh occurred, people can check >> if >> > there is partition change or not before do the partitioning. >> > >> > Thanks, >> > >> > Jiangjie (Becket) Qin >> > >> > On Wed, Sep 16, 2015 at 12:08 AM, Bhavesh Mistry < >> mistry.p.bhav...@gmail.com >> >> wrote: >> > >> >> Hi Kafka Dev Team, >> >> >> >> I would like you get your feedback about adding yet another method or >> API >> >> call to onPartitionsChange( ) to Partitioner Interface to get notify >> about >> >> partition changes upon metadata refresh. >> >> >> >> This will allow custom logic (implementor of Partitioner) to be >> notified if >> >> partition ownership or partition online vs offline, or partition >> >> increase/decrease event happen and when changes were propagated to >> >> individual producer instance. >> >> >> >> Please note this is my first KIP and if process is not followed >> correctly, >> >> please do let me know. I will be more than happy to follow or correct >> >> something that I may have missed. >> >> >> >> Thanks in advance and looking forward to your feedback. >> >> >> >> Thanks, >> >> >> >> Bhavesh >> >> >>