HI Jiagjie, Thanks for valuable feedback.
1) Thread Coordination for Change of partitions could be issue. I do agree with you that coordination between the application thread and sender thread would be tough one. The only concern I had was to share the same logic you had described among all the partitioner interface implementation, and let the Kafka framework level take care of doing the diff like you exactly describe In multithreaded environment, the change listener is being called from same thread that just finish the MetaData update will receive. Metadata Listener: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L163 producer.send() https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L370 But the behavior of onChange() will not be different than what is today. For example, public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { // Determine partition for message int partition = partition(record, serializedKey, serializedValue, metadata.fetch()); /** Metadata update occurs after the application thread determine the partition for given method but before adding message to record queue the cluster change happened. So In my opinion behavior is same. ***/RecordAccumulator.RecordAppendResult result = accumulator.append(tp, serializedKey, serializedValue, callback); What do you think of adding the diff logic as you describe to Default Partitioner Implementation or (another implementation class called it Change Partitioner class ) which within partition() method calls onChange() method and whoever care or needs to know can do what they like (Log event, or use that to change partitioning strategy etc). This give ability to share the diff code and not all implementation have to implement diff logic that is main concern. Thanks, Bhavesh On Fri, Sep 18, 2015 at 3:47 PM, Jiangjie Qin <j...@linkedin.com.invalid> wrote: > Hey Bhavesh, > > I think it is useful to notify the user about the partition change. > > The problem of having a listener in producer is that it is hard to > guarantee the synchronization. For example, consider the following sequence: > 1. producer sender thread refreshes the metadata with partition change. > 2. user thread called send with customized partitioner, the partitioner > decided the partition with new metadata refreshed in step 1. > 3. producer sender thread calls onParitionChange() > > At that point, the message sent in step 2 was sent using the new metadata. > If we update the metadata after invoking onParttitionChange(), it is a > little strange because the metadata has not changed yet. > > Also the metadata refresh can happen in caller thread as well, not sure how > it would work with multiple caller thread. > > I am thinking it seems the user can actually get the idea of whether the > cluster has changed or not because the partition() method actually takes a > cluster parameter. So if user cares about the partition number change, they > can do the following: > 1. store a copy of cluster as cache in the partitioner. > 2. when partition() is called, check if the hash of this cluster is the > same as the cached cluster. > 3. If the hash of the passed in cluster is different from the hash of > cached cluster, that means a metadata refresh occurred, people can check if > there is partition change or not before do the partitioning. > > Thanks, > > Jiangjie (Becket) Qin > > On Wed, Sep 16, 2015 at 12:08 AM, Bhavesh Mistry <mistry.p.bhav...@gmail.com >> wrote: > >> Hi Kafka Dev Team, >> >> I would like you get your feedback about adding yet another method or API >> call to onPartitionsChange( ) to Partitioner Interface to get notify about >> partition changes upon metadata refresh. >> >> This will allow custom logic (implementor of Partitioner) to be notified if >> partition ownership or partition online vs offline, or partition >> increase/decrease event happen and when changes were propagated to >> individual producer instance. >> >> Please note this is my first KIP and if process is not followed correctly, >> please do let me know. I will be more than happy to follow or correct >> something that I may have missed. >> >> Thanks in advance and looking forward to your feedback. >> >> Thanks, >> >> Bhavesh >>