Hello Bruno/Chris, Since these are the last set of changes(I am assuming haha), it would be great if you could review the 2 options from above so that we can close the voting. Of course I am happy to incorporate any other requisite changes.
Thanks! Sagar. On Wed, Aug 31, 2022 at 10:07 PM Sagar <sagarmeansoc...@gmail.com> wrote: > Thanks Bruno for the great points. > > I see 2 options here => > > 1) As Chris suggested, drop the support for dropping records in the > partitioner. That way, an empty list could signify the usage of a default > partitioner. Also, if the deprecated partition() method returns null > thereby signifying the default partitioner, the partitions() can return an > empty list i.e default partitioner. > > 2) OR we treat a null return type of partitions() method to signify the > usage of the default partitioner. In the default implementation of > partitions() method, if partition() returns null, then even partitions() > can return null(instead of an empty list). The RecordCollectorImpl code can > also be modified accordingly. @Chris, to your point, we can even drop the > support of dropping of records. It came up during KIP discussion, and I > thought it might be a useful feature. Let me know what you think. > > 3) Lastly about the partition number check. I wanted to avoid the throwing > of exception so I thought adding it might be a useful feature. But as you > pointed out, if it can break backwards compatibility, it's better to remove > it. > > Thanks! > Sagar. > > > On Tue, Aug 30, 2022 at 6:32 PM Chris Egerton <chr...@aiven.io.invalid> > wrote: > >> +1 to Bruno's concerns about backward compatibility. Do we actually need >> support for dropping records in the partitioner? It doesn't seem necessary >> based on the motivation for the KIP. If we remove that feature, we could >> handle null and/or empty lists by using the default partitioning, >> equivalent to how we handle null return values from the existing partition >> method today. >> >> On Tue, Aug 30, 2022 at 8:55 AM Bruno Cadonna <cado...@apache.org> wrote: >> >> > Hi Sagar, >> > >> > Thank you for the updates! >> > >> > I do not intend to prolong this vote thread more than needed, but I >> > still have some points. >> > >> > The deprecated partition method can return null if the default >> > partitioning logic of the producer should be used. >> > With the new method partitions() it seems that it is not possible to use >> > the default partitioning logic, anymore. >> > >> > Also, in the default implementation of method partitions(), a record >> > that would use the default partitioning logic in method partition() >> > would be dropped, which would break backward compatibility since Streams >> > would always call the new method partitions() even though the users >> > still implement the deprecated method partition(). >> > >> > I have a last point that we should probably discuss on the PR and not on >> > the KIP but since you added the code in the KIP I need to mention it. I >> > do not think you should check the validity of the partition number since >> > the ProducerRecord does the same check and throws an exception. If >> > Streams adds the same check but does not throw, the behavior is not >> > backward compatible. >> > >> > Best, >> > Bruno >> > >> > >> > On 30.08.22 12:43, Sagar wrote: >> > > Thanks Bruno/Chris, >> > > >> > > Even I agree that might be better to keep it simple like the way Chris >> > > suggested. I have updated the KIP accordingly. I made couple of minor >> > > changes to the KIP: >> > > >> > > 1) One of them being the change of return type of partitions method >> from >> > > List to Set. This is to ensure that in case the implementation of >> > > StreamPartitoner is buggy and ends up returning duplicate >> > > partition numbers, we won't have duplicates thereby not trying to >> send to >> > > the same partition multiple times due to this. >> > > 2) I also added a check to send the record only to valid partition >> > numbers >> > > and log and drop when the partition number is invalid. This is again >> to >> > > prevent errors for cases when the StreamPartitioner implementation has >> > some >> > > bugs (since there are no validations as such). >> > > 3) I also updated the Test Plan section based on the suggestion from >> > Bruno. >> > > 4) I updated the default implementation of partitions method based on >> the >> > > great catch from Chris! >> > > >> > > Let me know if it looks fine now. >> > > >> > > Thanks! >> > > Sagar. >> > > >> > > >> > > On Tue, Aug 30, 2022 at 3:00 PM Bruno Cadonna <cado...@apache.org> >> > wrote: >> > > >> > >> Hi, >> > >> >> > >> I am favour of discarding the sugar for broadcasting and leave the >> > >> broadcasting to the implementation as Chris suggests. I think that is >> > >> the cleanest option. >> > >> >> > >> Best, >> > >> Bruno >> > >> >> > >> On 29.08.22 19:50, Chris Egerton wrote: >> > >>> Hi all, >> > >>> >> > >>> I think it'd be useful to be more explicit about broadcasting to all >> > >> topic >> > >>> partitions rather than add implicit behavior for empty cases (empty >> > >>> optional, empty list, etc.). The suggested enum approach would >> address >> > >> that >> > >>> nicely. >> > >>> >> > >>> It's also worth noting that there's no hard requirement to add sugar >> > for >> > >>> broadcasting to all topic partitions since the API already provides >> the >> > >>> number of topic partitions available when calling a stream >> partitioner. >> > >> If >> > >>> we can't find a clean way to add this support, it might be better to >> > >> leave >> > >>> it out and just let people implement this themselves with a line of >> > Java >> > >> 8 >> > >>> streams: >> > >>> >> > >>> return IntStream.range(0, >> > >>> numPartitions).boxed().collect(Collectors.toList()); >> > >>> >> > >>> Also worth noting that there may be a bug in the default >> implementation >> > >> for >> > >>> the new StreamPartitioner::partitions method, since it doesn't >> appear >> > to >> > >>> correctly pick up on null return values from the partition method >> and >> > >>> translate them into empty lists. >> > >>> >> > >>> Cheers, >> > >>> >> > >>> Chris >> > >>> >> > >>> On Mon, Aug 29, 2022 at 7:32 AM Bruno Cadonna <cado...@apache.org> >> > >> wrote: >> > >>> >> > >>>> Hi Sagar, >> > >>>> >> > >>>> I do not see an issue with using an empty list together with an >> empty >> > >>>> Optional. A non-empty Optional that contains an empty list would >> just >> > >>>> say that there is no partition to which the record should be sent. >> If >> > >>>> there is no partition, the record is dropped. >> > >>>> >> > >>>> An empty Optional might be a way to say, broadcast to all >> partitions. >> > >>>> >> > >>>> Alternatively -- to make it more explicit -- you could return an >> > object >> > >>>> with an enum and a list of partitions. The enum could have values >> > SOME, >> > >>>> ALL, and NONE. If the value is SOME, the list of partitions >> contains >> > the >> > >>>> partitions to which to send the record. If the value of the enum is >> > ALL >> > >>>> or NONE, the list of partitions is not used and might be even null >> > >>>> (since in that case the list should not be used and it would be a >> bug >> > to >> > >>>> use it). >> > >>>> >> > >>>> Best, >> > >>>> Bruno >> > >>>> >> > >>>> On 24.08.22 20:11, Sagar wrote: >> > >>>>> Thank you Bruno/Matthew for your comments. >> > >>>>> >> > >>>>> I agree using null does seem error prone. However I think using a >> > >>>> singleton >> > >>>>> list of [-1] might be better in terms of usability, I am saying >> this >> > >>>>> because the KIP also has a provision to return an empty list to >> refer >> > >> to >> > >>>>> dropping the record. So, an empty optional and an empty list have >> > >> totally >> > >>>>> different meanings which could get confusing. >> > >>>>> >> > >>>>> Let me know what you think. >> > >>>>> >> > >>>>> Thanks! >> > >>>>> Sagar. >> > >>>>> >> > >>>>> >> > >>>>> On Wed, Aug 24, 2022 at 7:30 PM Matthew Benedict de Detrich >> > >>>>> <matthew.dedetr...@aiven.io.invalid> wrote: >> > >>>>> >> > >>>>>> I also concur with this, having an Optional in the type makes it >> > very >> > >>>>>> clear what’s going on and better signifies an absence of value >> (or >> > in >> > >>>> this >> > >>>>>> case the broadcast value). >> > >>>>>> >> > >>>>>> -- >> > >>>>>> Matthew de Detrich >> > >>>>>> Aiven Deutschland GmbH >> > >>>>>> Immanuelkirchstraße 26, 10405 Berlin >> > >>>>>> Amtsgericht Charlottenburg, HRB 209739 B >> > >>>>>> >> > >>>>>> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen >> > >>>>>> m: +491603708037 >> > >>>>>> w: aiven.io e: matthew.dedetr...@aiven.io >> > >>>>>> On 24. Aug 2022, 14:19 +0200, dev@kafka.apache.org, wrote: >> > >>>>>>> >> > >>>>>>> 2. >> > >>>>>>> I would prefer changing the return type of partitions() to >> > >>>>>>> Optional<List<Integer>> and using Optional.empty() as the >> broadcast >> > >>>>>>> value. IMO, The chances that an implementation returns null due >> to >> > a >> > >>>> bug >> > >>>>>>> is much higher than that an implementation returns an empty >> > Optional >> > >>>> due >> > >>>>>>> to a bug. >> > >>>>>> >> > >>>>> >> > >>>> >> > >>> >> > >> >> > > >> > >> >