Thanks Bruno, Marking this as accepted.
Thanks everyone for their comments/feedback. Thanks! Sagar. On Mon, Sep 12, 2022 at 1:53 PM Bruno Cadonna <cado...@apache.org> wrote: > Hi Sagar, > > Thanks for the update and the PR! > > +1 (binding) > > Best, > Bruno > > On 10.09.22 18:57, Sagar wrote: > > Hi Bruno, > > > > Thanks, I think these changes make sense to me. I have updated the KIP > > accordingly. > > > > Thanks! > > Sagar. > > > > On Wed, Sep 7, 2022 at 2:16 PM Bruno Cadonna <cado...@apache.org> wrote: > > > >> Hi Sagar, > >> > >> I would not drop the support for dropping records. I would also not > >> return null from partitions(). Maybe an Optional can help here. An empty > >> Optional would mean to use the default partitioning behavior of the > >> producer. So we would have: > >> > >> - non-empty Optional, non-empty list of integers: partitions to send the > >> record to > >> - non-empty Optional, empty list of integers: drop the record > >> - empty Optional: use default behavior > >> > >> What do other think? > >> > >> Best, > >> Bruno > >> > >> On 02.09.22 13:53, Sagar wrote: > >>> Hello Bruno/Chris, > >>> > >>> Since these are the last set of changes(I am assuming haha), it would > be > >>> great if you could review the 2 options from above so that we can close > >> the > >>> voting. Of course I am happy to incorporate any other requisite > changes. > >>> > >>> Thanks! > >>> Sagar. > >>> > >>> On Wed, Aug 31, 2022 at 10:07 PM Sagar <sagarmeansoc...@gmail.com> > >> wrote: > >>> > >>>> Thanks Bruno for the great points. > >>>> > >>>> I see 2 options here => > >>>> > >>>> 1) As Chris suggested, drop the support for dropping records in the > >>>> partitioner. That way, an empty list could signify the usage of a > >> default > >>>> partitioner. Also, if the deprecated partition() method returns null > >>>> thereby signifying the default partitioner, the partitions() can > return > >> an > >>>> empty list i.e default partitioner. > >>>> > >>>> 2) OR we treat a null return type of partitions() method to signify > the > >>>> usage of the default partitioner. In the default implementation of > >>>> partitions() method, if partition() returns null, then even > partitions() > >>>> can return null(instead of an empty list). The RecordCollectorImpl > code > >> can > >>>> also be modified accordingly. @Chris, to your point, we can even drop > >> the > >>>> support of dropping of records. It came up during KIP discussion, and > I > >>>> thought it might be a useful feature. Let me know what you think. > >>>> > >>>> 3) Lastly about the partition number check. I wanted to avoid the > >> throwing > >>>> of exception so I thought adding it might be a useful feature. But as > >> you > >>>> pointed out, if it can break backwards compatibility, it's better to > >> remove > >>>> it. > >>>> > >>>> Thanks! > >>>> Sagar. > >>>> > >>>> > >>>> On Tue, Aug 30, 2022 at 6:32 PM Chris Egerton <chr...@aiven.io.invalid > > > >>>> wrote: > >>>> > >>>>> +1 to Bruno's concerns about backward compatibility. Do we actually > >> need > >>>>> support for dropping records in the partitioner? It doesn't seem > >> necessary > >>>>> based on the motivation for the KIP. If we remove that feature, we > >> could > >>>>> handle null and/or empty lists by using the default partitioning, > >>>>> equivalent to how we handle null return values from the existing > >> partition > >>>>> method today. > >>>>> > >>>>> On Tue, Aug 30, 2022 at 8:55 AM Bruno Cadonna <cado...@apache.org> > >> wrote: > >>>>> > >>>>>> Hi Sagar, > >>>>>> > >>>>>> Thank you for the updates! > >>>>>> > >>>>>> I do not intend to prolong this vote thread more than needed, but I > >>>>>> still have some points. > >>>>>> > >>>>>> The deprecated partition method can return null if the default > >>>>>> partitioning logic of the producer should be used. > >>>>>> With the new method partitions() it seems that it is not possible to > >> use > >>>>>> the default partitioning logic, anymore. > >>>>>> > >>>>>> Also, in the default implementation of method partitions(), a record > >>>>>> that would use the default partitioning logic in method partition() > >>>>>> would be dropped, which would break backward compatibility since > >> Streams > >>>>>> would always call the new method partitions() even though the users > >>>>>> still implement the deprecated method partition(). > >>>>>> > >>>>>> I have a last point that we should probably discuss on the PR and > not > >> on > >>>>>> the KIP but since you added the code in the KIP I need to mention > it. > >> I > >>>>>> do not think you should check the validity of the partition number > >> since > >>>>>> the ProducerRecord does the same check and throws an exception. If > >>>>>> Streams adds the same check but does not throw, the behavior is not > >>>>>> backward compatible. > >>>>>> > >>>>>> Best, > >>>>>> Bruno > >>>>>> > >>>>>> > >>>>>> On 30.08.22 12:43, Sagar wrote: > >>>>>>> Thanks Bruno/Chris, > >>>>>>> > >>>>>>> Even I agree that might be better to keep it simple like the way > >> Chris > >>>>>>> suggested. I have updated the KIP accordingly. I made couple of > minor > >>>>>>> changes to the KIP: > >>>>>>> > >>>>>>> 1) One of them being the change of return type of partitions method > >>>>> from > >>>>>>> List to Set. This is to ensure that in case the implementation of > >>>>>>> StreamPartitoner is buggy and ends up returning duplicate > >>>>>>> partition numbers, we won't have duplicates thereby not trying to > >>>>> send to > >>>>>>> the same partition multiple times due to this. > >>>>>>> 2) I also added a check to send the record only to valid partition > >>>>>> numbers > >>>>>>> and log and drop when the partition number is invalid. This is > again > >>>>> to > >>>>>>> prevent errors for cases when the StreamPartitioner implementation > >> has > >>>>>> some > >>>>>>> bugs (since there are no validations as such). > >>>>>>> 3) I also updated the Test Plan section based on the suggestion > from > >>>>>> Bruno. > >>>>>>> 4) I updated the default implementation of partitions method based > on > >>>>> the > >>>>>>> great catch from Chris! > >>>>>>> > >>>>>>> Let me know if it looks fine now. > >>>>>>> > >>>>>>> Thanks! > >>>>>>> Sagar. > >>>>>>> > >>>>>>> > >>>>>>> On Tue, Aug 30, 2022 at 3:00 PM Bruno Cadonna <cado...@apache.org> > >>>>>> wrote: > >>>>>>> > >>>>>>>> Hi, > >>>>>>>> > >>>>>>>> I am favour of discarding the sugar for broadcasting and leave the > >>>>>>>> broadcasting to the implementation as Chris suggests. I think that > >> is > >>>>>>>> the cleanest option. > >>>>>>>> > >>>>>>>> Best, > >>>>>>>> Bruno > >>>>>>>> > >>>>>>>> On 29.08.22 19:50, Chris Egerton wrote: > >>>>>>>>> Hi all, > >>>>>>>>> > >>>>>>>>> I think it'd be useful to be more explicit about broadcasting to > >> all > >>>>>>>> topic > >>>>>>>>> partitions rather than add implicit behavior for empty cases > (empty > >>>>>>>>> optional, empty list, etc.). The suggested enum approach would > >>>>> address > >>>>>>>> that > >>>>>>>>> nicely. > >>>>>>>>> > >>>>>>>>> It's also worth noting that there's no hard requirement to add > >> sugar > >>>>>> for > >>>>>>>>> broadcasting to all topic partitions since the API already > provides > >>>>> the > >>>>>>>>> number of topic partitions available when calling a stream > >>>>> partitioner. > >>>>>>>> If > >>>>>>>>> we can't find a clean way to add this support, it might be better > >> to > >>>>>>>> leave > >>>>>>>>> it out and just let people implement this themselves with a line > of > >>>>>> Java > >>>>>>>> 8 > >>>>>>>>> streams: > >>>>>>>>> > >>>>>>>>> return IntStream.range(0, > >>>>>>>>> numPartitions).boxed().collect(Collectors.toList()); > >>>>>>>>> > >>>>>>>>> Also worth noting that there may be a bug in the default > >>>>> implementation > >>>>>>>> for > >>>>>>>>> the new StreamPartitioner::partitions method, since it doesn't > >>>>> appear > >>>>>> to > >>>>>>>>> correctly pick up on null return values from the partition method > >>>>> and > >>>>>>>>> translate them into empty lists. > >>>>>>>>> > >>>>>>>>> Cheers, > >>>>>>>>> > >>>>>>>>> Chris > >>>>>>>>> > >>>>>>>>> On Mon, Aug 29, 2022 at 7:32 AM Bruno Cadonna < > cado...@apache.org> > >>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi Sagar, > >>>>>>>>>> > >>>>>>>>>> I do not see an issue with using an empty list together with an > >>>>> empty > >>>>>>>>>> Optional. A non-empty Optional that contains an empty list would > >>>>> just > >>>>>>>>>> say that there is no partition to which the record should be > sent. > >>>>> If > >>>>>>>>>> there is no partition, the record is dropped. > >>>>>>>>>> > >>>>>>>>>> An empty Optional might be a way to say, broadcast to all > >>>>> partitions. > >>>>>>>>>> > >>>>>>>>>> Alternatively -- to make it more explicit -- you could return an > >>>>>> object > >>>>>>>>>> with an enum and a list of partitions. The enum could have > values > >>>>>> SOME, > >>>>>>>>>> ALL, and NONE. If the value is SOME, the list of partitions > >>>>> contains > >>>>>> the > >>>>>>>>>> partitions to which to send the record. If the value of the enum > >> is > >>>>>> ALL > >>>>>>>>>> or NONE, the list of partitions is not used and might be even > null > >>>>>>>>>> (since in that case the list should not be used and it would be > a > >>>>> bug > >>>>>> to > >>>>>>>>>> use it). > >>>>>>>>>> > >>>>>>>>>> Best, > >>>>>>>>>> Bruno > >>>>>>>>>> > >>>>>>>>>> On 24.08.22 20:11, Sagar wrote: > >>>>>>>>>>> Thank you Bruno/Matthew for your comments. > >>>>>>>>>>> > >>>>>>>>>>> I agree using null does seem error prone. However I think > using a > >>>>>>>>>> singleton > >>>>>>>>>>> list of [-1] might be better in terms of usability, I am saying > >>>>> this > >>>>>>>>>>> because the KIP also has a provision to return an empty list to > >>>>> refer > >>>>>>>> to > >>>>>>>>>>> dropping the record. So, an empty optional and an empty list > have > >>>>>>>> totally > >>>>>>>>>>> different meanings which could get confusing. > >>>>>>>>>>> > >>>>>>>>>>> Let me know what you think. > >>>>>>>>>>> > >>>>>>>>>>> Thanks! > >>>>>>>>>>> Sagar. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> On Wed, Aug 24, 2022 at 7:30 PM Matthew Benedict de Detrich > >>>>>>>>>>> <matthew.dedetr...@aiven.io.invalid> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> I also concur with this, having an Optional in the type makes > it > >>>>>> very > >>>>>>>>>>>> clear what’s going on and better signifies an absence of value > >>>>> (or > >>>>>> in > >>>>>>>>>> this > >>>>>>>>>>>> case the broadcast value). > >>>>>>>>>>>> > >>>>>>>>>>>> -- > >>>>>>>>>>>> Matthew de Detrich > >>>>>>>>>>>> Aiven Deutschland GmbH > >>>>>>>>>>>> Immanuelkirchstraße 26, 10405 Berlin > >>>>>>>>>>>> Amtsgericht Charlottenburg, HRB 209739 B > >>>>>>>>>>>> > >>>>>>>>>>>> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen > >>>>>>>>>>>> m: +491603708037 > >>>>>>>>>>>> w: aiven.io e: matthew.dedetr...@aiven.io > >>>>>>>>>>>> On 24. Aug 2022, 14:19 +0200, dev@kafka.apache.org, wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>> 2. > >>>>>>>>>>>>> I would prefer changing the return type of partitions() to > >>>>>>>>>>>>> Optional<List<Integer>> and using Optional.empty() as the > >>>>> broadcast > >>>>>>>>>>>>> value. IMO, The chances that an implementation returns null > due > >>>>> to > >>>>>> a > >>>>>>>>>> bug > >>>>>>>>>>>>> is much higher than that an implementation returns an empty > >>>>>> Optional > >>>>>>>>>> due > >>>>>>>>>>>>> to a bug. > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > > >