HI Becket,

Thanks for answering and  providing feedback.  I will withdraw KIP and
put into rejected section.

Thanks,

Bhavesh

On Mon, Sep 21, 2015 at 9:53 AM, Jiangjie Qin <j...@linkedin.com.invalid> wrote:
> Hey Bhavesh,
>
> I kind of think this metadata change capture logic should be implemented by
> each user by themselves for the following reasons:
>
> 1. Most user do not really care about partition change. Adding the
> logic/interface to default partitioner means for users who don't care about
> the partition change, they are paying the price of making a cluster diff
> for each metadata update. For a big cluster, this metadata diff could be
> costly depending on how frequent the metadata is refreshed.
>
> 2. In some cases, user might only care about partition change for some
> specific topic, in that case, there is no need to do a cluster diff for all
> the topics a producer is producing data to. If the cluster diff is
> implemented in user code, it would be more efficient because user can only
> check the topic they are interested. Also, different users might care about
> different changes in the metadata, e.g. topic create/delete/node change,
> etc. So it seems better to leave the actual metadata change capture logic
> to user instead of doing it in the producer.
>
> 3. The cluster diff code itself is short and not complicated so even if
> user do it on their own it should be simple. e.g.:
> {
>   if (this.cachedCluster.hashCode() != cluster.hashCode())
>     for (String topic : cluster.topics()) {
>       if (this.cachedCluster.hashCode().contains(topic) &&
>           this.cachedCluster.partitionsForTopic(topic).partition() !=
> cluster.partitionsForTopic(topic).partition())
>           // handle partition change.
>     }
> }
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Mon, Sep 21, 2015 at 9:13 AM, Bhavesh Mistry <mistry.p.bhav...@gmail.com>
> wrote:
>
>> HI Jiagjie,
>>
>> Thanks for valuable feedback.
>>
>> 1) Thread Coordination for Change of partitions could be issue.
>>
>> I do agree with you that coordination between the application thread
>> and sender thread would be tough one.   The only concern I had was to
>> share the same logic you had described among all the partitioner
>> interface implementation, and let the Kafka framework level take care
>> of doing the diff like you exactly describe
>>
>> In multithreaded environment, the change listener is being called from
>> same thread that just finish the MetaData update will receive.
>>
>> Metadata Listener:
>>
>>
>> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/Metadata.java#L163
>>
>> producer.send()
>>
>>
>> https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L370
>>
>> But the behavior of onChange() will not be different than what is today.
>>
>> For example,
>>
>> public Future<RecordMetadata> send(ProducerRecord<K, V> record,
>> Callback callback) {
>>
>> //  Determine partition for message
>>
>> int partition = partition(record, serializedKey, serializedValue,
>> metadata.fetch());
>>
>> /**
>>
>> Metadata update occurs after the application thread determine the
>> partition for given method but before adding message  to record queue
>> the cluster change happened.  So In my opinion behavior is same.
>>
>> ***/RecordAccumulator.RecordAppendResult result =
>> accumulator.append(tp, serializedKey, serializedValue, callback);
>>
>>
>>
>> What do you think of adding the diff logic as you describe to Default
>> Partitioner Implementation or (another implementation class called it
>> Change Partitioner class ) which within partition() method calls
>> onChange() method and whoever care or needs to know can do what they
>> like (Log event, or use that to change partitioning strategy etc).
>>
>> This give ability to share the diff code and not all implementation
>> have to implement diff logic that is main concern.
>>
>>
>> Thanks,
>>
>> Bhavesh
>>
>>
>> On Fri, Sep 18, 2015 at 3:47 PM, Jiangjie Qin <j...@linkedin.com.invalid>
>> wrote:
>> > Hey Bhavesh,
>> >
>> > I think it is useful to notify the user about the partition change.
>> >
>> > The problem of having a listener in producer is that it is hard to
>> > guarantee the synchronization. For example, consider the following
>> sequence:
>> > 1. producer sender thread refreshes the metadata with partition change.
>> > 2. user thread called send with customized partitioner, the partitioner
>> > decided the partition with new metadata refreshed in step 1.
>> > 3. producer sender thread calls onParitionChange()
>> >
>> > At that point, the message sent in step 2 was sent using the new
>> metadata.
>> > If we update the metadata after invoking onParttitionChange(), it is a
>> > little strange because the metadata has not changed yet.
>> >
>> > Also the metadata refresh can happen in caller thread as well, not sure
>> how
>> > it would work with multiple caller thread.
>> >
>> > I am thinking it seems the user can actually get the idea of whether the
>> > cluster has changed or not because the partition() method actually takes
>> a
>> > cluster parameter. So if user cares about the partition number change,
>> they
>> > can do the following:
>> > 1. store a copy of cluster as cache in the partitioner.
>> > 2. when partition() is called, check if the hash of this cluster is the
>> > same as the cached cluster.
>> > 3. If the hash of the passed in cluster is different from the hash of
>> > cached cluster, that means a metadata refresh occurred, people can check
>> if
>> > there is partition change or not before do the partitioning.
>> >
>> > Thanks,
>> >
>> > Jiangjie (Becket) Qin
>> >
>> > On Wed, Sep 16, 2015 at 12:08 AM, Bhavesh Mistry <
>> mistry.p.bhav...@gmail.com
>> >> wrote:
>> >
>> >> Hi Kafka Dev Team,
>> >>
>> >> I would like you get your feedback about adding yet another method or
>> API
>> >> call to onPartitionsChange( ) to Partitioner Interface to get notify
>> about
>> >> partition changes upon metadata refresh.
>> >>
>> >> This will allow custom logic (implementor of Partitioner) to be
>> notified if
>> >> partition ownership or partition online vs offline, or partition
>> >> increase/decrease event happen and when changes were propagated to
>> >> individual producer instance.
>> >>
>> >> Please note this is my first KIP and if process is not followed
>> correctly,
>> >> please do let me know.  I will be more than happy to follow or correct
>> >> something that I may have missed.
>> >>
>> >> Thanks in advance and looking forward to your feedback.
>> >>
>> >> Thanks,
>> >>
>> >> Bhavesh
>> >>
>>

Reply via email to