Hey Bhavesh, I kind of think this metadata change capture logic should be implemented by each user by themselves for the following reasons:
1. Most user do not really care about partition change. Adding the logic/interface to default partitioner means for users who don't care about the partition change, they are paying the price of making a cluster diff for each metadata update. For a big cluster, this metadata diff could be costly depending on how frequent the metadata is refreshed. 2. In some cases, user might only care about partition change for some specific topic, in that case, there is no need to do a cluster diff for all the topics a producer is producing data to. If the cluster diff is implemented in user code, it would be more efficient because user can only check the topic they are interested. Also, different users might care about different changes in the metadata, e.g. topic create/delete/node change, etc. So it seems better to leave the actual metadata change capture logic to user instead of doing it in the producer. 3. The cluster diff code itself is short and not complicated so even if user do it on their own it should be simple. e.g.: { if (this.cachedCluster.hashCode() != cluster.hashCode()) for (String topic : cluster.topics()) { if (this.cachedCluster.hashCode().contains(topic) && this.cachedCluster.partitionsForTopic(topic).partition() != cluster.partitionsForTopic(topic).partition()) // handle partition change. } } Thanks, Jiangjie (Becket) Qin On Mon, Sep 21, 2015 at 9:13 AM, Bhavesh Mistry <mistry.p.bhav...@gmail.com> wrote: > HI Jiagjie, > > Thanks for valuable feedback. > > 1) Thread Coordination for Change of partitions could be issue. > > I do agree with you that coordination between the application thread > and sender thread would be tough one. The only concern I had was to > share the same logic you had described among all the partitioner > interface implementation, and let the Kafka framework level take care > of doing the diff like you exactly describe > > In multithreaded environment, the change listener is being called from > same thread that just finish the MetaData update will receive. > > Metadata Listener: > > > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L163 > > producer.send() > > > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L370 > > But the behavior of onChange() will not be different than what is today. > > For example, > > public Future<RecordMetadata> send(ProducerRecord<K, V> record, > Callback callback) { > > // Determine partition for message > > int partition = partition(record, serializedKey, serializedValue, > metadata.fetch()); > > /** > > Metadata update occurs after the application thread determine the > partition for given method but before adding message to record queue > the cluster change happened. So In my opinion behavior is same. > > ***/RecordAccumulator.RecordAppendResult result = > accumulator.append(tp, serializedKey, serializedValue, callback); > > > > What do you think of adding the diff logic as you describe to Default > Partitioner Implementation or (another implementation class called it > Change Partitioner class ) which within partition() method calls > onChange() method and whoever care or needs to know can do what they > like (Log event, or use that to change partitioning strategy etc). > > This give ability to share the diff code and not all implementation > have to implement diff logic that is main concern. > > > Thanks, > > Bhavesh > > > On Fri, Sep 18, 2015 at 3:47 PM, Jiangjie Qin <j...@linkedin.com.invalid> > wrote: > > Hey Bhavesh, > > > > I think it is useful to notify the user about the partition change. > > > > The problem of having a listener in producer is that it is hard to > > guarantee the synchronization. For example, consider the following > sequence: > > 1. producer sender thread refreshes the metadata with partition change. > > 2. user thread called send with customized partitioner, the partitioner > > decided the partition with new metadata refreshed in step 1. > > 3. producer sender thread calls onParitionChange() > > > > At that point, the message sent in step 2 was sent using the new > metadata. > > If we update the metadata after invoking onParttitionChange(), it is a > > little strange because the metadata has not changed yet. > > > > Also the metadata refresh can happen in caller thread as well, not sure > how > > it would work with multiple caller thread. > > > > I am thinking it seems the user can actually get the idea of whether the > > cluster has changed or not because the partition() method actually takes > a > > cluster parameter. So if user cares about the partition number change, > they > > can do the following: > > 1. store a copy of cluster as cache in the partitioner. > > 2. when partition() is called, check if the hash of this cluster is the > > same as the cached cluster. > > 3. If the hash of the passed in cluster is different from the hash of > > cached cluster, that means a metadata refresh occurred, people can check > if > > there is partition change or not before do the partitioning. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On Wed, Sep 16, 2015 at 12:08 AM, Bhavesh Mistry < > mistry.p.bhav...@gmail.com > >> wrote: > > > >> Hi Kafka Dev Team, > >> > >> I would like you get your feedback about adding yet another method or > API > >> call to onPartitionsChange( ) to Partitioner Interface to get notify > about > >> partition changes upon metadata refresh. > >> > >> This will allow custom logic (implementor of Partitioner) to be > notified if > >> partition ownership or partition online vs offline, or partition > >> increase/decrease event happen and when changes were propagated to > >> individual producer instance. > >> > >> Please note this is my first KIP and if process is not followed > correctly, > >> please do let me know. I will be more than happy to follow or correct > >> something that I may have missed. > >> > >> Thanks in advance and looking forward to your feedback. > >> > >> Thanks, > >> > >> Bhavesh > >> >