Thanks Matthias,

Well, as things stand, we did have internal discussions on this and it
seemed ok to open it up for IQ and more importantly not ok to have it
opened up for FK-Join. And more importantly, the PR for this is already
merged and some of these things came up during that. Here's the PR link:
https://github.com/apache/kafka/pull/12803.

Thanks!
Sagar.


On Fri, Dec 9, 2022 at 5:15 AM Matthias J. Sax <mj...@apache.org> wrote:

> Ah. Missed it as it does not have a nice "code block" similar to
> `StreamPartitioner` changes.
>
> I understand the motivation, but I am wondering if we might head into a
> tricky direction? State stores (at least the built-in ones) and IQ are
> kinda build with the idea to have sharded data and that a multi-cast of
> keys is an anti-pattern?
>
> Maybe it's fine, but I also don't want to open Pandora's Box. Are we
> sure that generalizing the concepts does not cause issues in the future?
>
> Ie, should we claim that the multi-cast feature should be used for
> KStreams only, but not for KTables?
>
> Just want to double check that we are not doing something we regret later.
>
>
> -Matthias
>
>
> On 12/7/22 6:45 PM, Sagar wrote:
> > Hi Mathias,
> >
> > I did save it. The changes are added under Public Interfaces (Pt#2 about
> > enhancing KeyQueryMetadata with partitions method) and
> > throwing IllegalArgumentException when StreamPartitioner#partitions
> method
> > returns multiple partitions for just FK-join instead of the earlier
> decided
> > FK-Join and IQ.
> >
> > The background is that for IQ, if the users have multi casted records to
> > multiple partitions during ingestion but the fetch returns only a single
> > partition, then it would be wrong. That's why the restriction was lifted
> > for IQ and that's the reason KeyQueryMetadata now has another
> partitions()
> > method to signify the same.
> >
> > FK-Join also has a similar case, but while reviewing it was felt that
> > FK-Join on it's own is fairly complicated and we don't need this feature
> > right away so the restriction still exists.
> >
> > Thanks!
> > Sagar.
> >
> >
> > On Wed, Dec 7, 2022 at 9:42 PM Matthias J. Sax <mj...@apache.org> wrote:
> >
> >> I don't see any update on the wiki about it. Did you forget to hit
> "save"?
> >>
> >> Can you also provide some background? I am not sure right now if I
> >> understand the proposed changes?
> >>
> >>
> >> -Matthias
> >>
> >> On 12/6/22 6:36 PM, Sophie Blee-Goldman wrote:
> >>> Thanks Sagar, this makes sense to me -- we clearly need additional
> >> changes
> >>> to
> >>> avoid breaking IQ when using this feature, but I agree with continuing
> to
> >>> restrict
> >>> FKJ since they wouldn't stop working without it, and would become much
> >>> harder
> >>> to reason about (than they already are) if we did enable them to use
> it.
> >>>
> >>> And of course, they can still multicast the final results of a FKJ,
> they
> >>> just can't
> >>> mess with the internal workings of it in this way.
> >>>
> >>> On Tue, Dec 6, 2022 at 9:48 AM Sagar <sagarmeansoc...@gmail.com>
> wrote:
> >>>
> >>>> Hi All,
> >>>>
> >>>> I made a couple of edits to the KIP which came up during the code
> >> review.
> >>>> Changes at a high level are:
> >>>>
> >>>> 1) KeyQueryMetada enhanced to have a new method called partitions().
> >>>> 2) Lifting the restriction of a single partition for IQ. Now the
> >>>> restriction holds only for FK Join.
> >>>>
> >>>> Updated KIP:
> >>>>
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=211883356
> >>>>
> >>>> Thanks!
> >>>> Sagar.
> >>>>
> >>>> On Mon, Sep 12, 2022 at 6:43 PM Sagar <sagarmeansoc...@gmail.com>
> >> wrote:
> >>>>
> >>>>> Thanks Bruno,
> >>>>>
> >>>>> Marking this as accepted.
> >>>>>
> >>>>> Thanks everyone for their comments/feedback.
> >>>>>
> >>>>> Thanks!
> >>>>> Sagar.
> >>>>>
> >>>>> On Mon, Sep 12, 2022 at 1:53 PM Bruno Cadonna <cado...@apache.org>
> >>>> wrote:
> >>>>>
> >>>>>> Hi Sagar,
> >>>>>>
> >>>>>> Thanks for the update and the PR!
> >>>>>>
> >>>>>> +1 (binding)
> >>>>>>
> >>>>>> Best,
> >>>>>> Bruno
> >>>>>>
> >>>>>> On 10.09.22 18:57, Sagar wrote:
> >>>>>>> Hi Bruno,
> >>>>>>>
> >>>>>>> Thanks, I think these changes make sense to me. I have updated the
> >> KIP
> >>>>>>> accordingly.
> >>>>>>>
> >>>>>>> Thanks!
> >>>>>>> Sagar.
> >>>>>>>
> >>>>>>> On Wed, Sep 7, 2022 at 2:16 PM Bruno Cadonna <cado...@apache.org>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi Sagar,
> >>>>>>>>
> >>>>>>>> I would not drop the support for dropping records. I would also
> not
> >>>>>>>> return null from partitions(). Maybe an Optional can help here. An
> >>>>>> empty
> >>>>>>>> Optional would mean to use the default partitioning behavior of
> the
> >>>>>>>> producer. So we would have:
> >>>>>>>>
> >>>>>>>> - non-empty Optional, non-empty list of integers: partitions to
> send
> >>>>>> the
> >>>>>>>> record to
> >>>>>>>> - non-empty Optional, empty list of integers: drop the record
> >>>>>>>> - empty Optional: use default behavior
> >>>>>>>>
> >>>>>>>> What do other think?
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Bruno
> >>>>>>>>
> >>>>>>>> On 02.09.22 13:53, Sagar wrote:
> >>>>>>>>> Hello Bruno/Chris,
> >>>>>>>>>
> >>>>>>>>> Since these are the last set of changes(I am assuming haha), it
> >>>> would
> >>>>>> be
> >>>>>>>>> great if you could review the 2 options from above so that we can
> >>>>>> close
> >>>>>>>> the
> >>>>>>>>> voting. Of course I am happy to incorporate any other requisite
> >>>>>> changes.
> >>>>>>>>>
> >>>>>>>>> Thanks!
> >>>>>>>>> Sagar.
> >>>>>>>>>
> >>>>>>>>> On Wed, Aug 31, 2022 at 10:07 PM Sagar <
> sagarmeansoc...@gmail.com>
> >>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Thanks Bruno for the great points.
> >>>>>>>>>>
> >>>>>>>>>> I see 2 options here =>
> >>>>>>>>>>
> >>>>>>>>>> 1) As Chris suggested, drop the support for dropping records in
> >> the
> >>>>>>>>>> partitioner. That way, an empty list could signify the usage of
> a
> >>>>>>>> default
> >>>>>>>>>> partitioner. Also, if the deprecated partition() method returns
> >>>> null
> >>>>>>>>>> thereby signifying the default partitioner, the partitions() can
> >>>>>> return
> >>>>>>>> an
> >>>>>>>>>> empty list i.e default partitioner.
> >>>>>>>>>>
> >>>>>>>>>> 2) OR we treat a null return type of partitions() method to
> >> signify
> >>>>>> the
> >>>>>>>>>> usage of the default partitioner. In the default implementation
> of
> >>>>>>>>>> partitions() method, if partition() returns null, then even
> >>>>>> partitions()
> >>>>>>>>>> can return null(instead of an empty list). The
> RecordCollectorImpl
> >>>>>> code
> >>>>>>>> can
> >>>>>>>>>> also be modified accordingly. @Chris, to your point, we can even
> >>>> drop
> >>>>>>>> the
> >>>>>>>>>> support of dropping of records. It came up during KIP
> discussion,
> >>>>>> and I
> >>>>>>>>>> thought it might be a useful feature. Let me know what you
> think.
> >>>>>>>>>>
> >>>>>>>>>> 3) Lastly about the partition number check. I wanted to avoid
> the
> >>>>>>>> throwing
> >>>>>>>>>> of exception so I thought adding it might be a useful feature.
> But
> >>>> as
> >>>>>>>> you
> >>>>>>>>>> pointed out, if it can break backwards compatibility, it's
> better
> >>>> to
> >>>>>>>> remove
> >>>>>>>>>> it.
> >>>>>>>>>>
> >>>>>>>>>> Thanks!
> >>>>>>>>>> Sagar.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Tue, Aug 30, 2022 at 6:32 PM Chris Egerton
> >>>>>> <chr...@aiven.io.invalid>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> +1 to Bruno's concerns about backward compatibility. Do we
> >>>> actually
> >>>>>>>> need
> >>>>>>>>>>> support for dropping records in the partitioner? It doesn't
> seem
> >>>>>>>> necessary
> >>>>>>>>>>> based on the motivation for the KIP. If we remove that feature,
> >> we
> >>>>>>>> could
> >>>>>>>>>>> handle null and/or empty lists by using the default
> partitioning,
> >>>>>>>>>>> equivalent to how we handle null return values from the
> existing
> >>>>>>>> partition
> >>>>>>>>>>> method today.
> >>>>>>>>>>>
> >>>>>>>>>>> On Tue, Aug 30, 2022 at 8:55 AM Bruno Cadonna <
> >> cado...@apache.org
> >>>>>
> >>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi Sagar,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thank you for the updates!
> >>>>>>>>>>>>
> >>>>>>>>>>>> I do not intend to prolong this vote thread more than needed,
> >>>> but I
> >>>>>>>>>>>> still have some points.
> >>>>>>>>>>>>
> >>>>>>>>>>>> The deprecated partition method can return null if the default
> >>>>>>>>>>>> partitioning logic of the producer should be used.
> >>>>>>>>>>>> With the new method partitions() it seems that it is not
> >> possible
> >>>>>> to
> >>>>>>>> use
> >>>>>>>>>>>> the default partitioning logic, anymore.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Also, in the default implementation of method partitions(), a
> >>>>>> record
> >>>>>>>>>>>> that would use the default partitioning logic in method
> >>>> partition()
> >>>>>>>>>>>> would be dropped, which would break backward compatibility
> since
> >>>>>>>> Streams
> >>>>>>>>>>>> would always call the new method partitions() even though the
> >>>> users
> >>>>>>>>>>>> still implement the deprecated method partition().
> >>>>>>>>>>>>
> >>>>>>>>>>>> I have a last point that we should probably discuss on the PR
> >> and
> >>>>>> not
> >>>>>>>> on
> >>>>>>>>>>>> the KIP but since you added the code in the KIP I need to
> >> mention
> >>>>>> it.
> >>>>>>>> I
> >>>>>>>>>>>> do not think you should check the validity of the partition
> >>>> number
> >>>>>>>> since
> >>>>>>>>>>>> the ProducerRecord does the same check and throws an
> exception.
> >>>> If
> >>>>>>>>>>>> Streams adds the same check but does not throw, the behavior
> is
> >>>> not
> >>>>>>>>>>>> backward compatible.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Best,
> >>>>>>>>>>>> Bruno
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 30.08.22 12:43, Sagar wrote:
> >>>>>>>>>>>>> Thanks Bruno/Chris,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Even I agree that might be better to keep it simple like the
> >> way
> >>>>>>>> Chris
> >>>>>>>>>>>>> suggested. I have updated the KIP accordingly. I made couple
> of
> >>>>>> minor
> >>>>>>>>>>>>> changes to the KIP:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 1) One of them being the change of return type of partitions
> >>>>>> method
> >>>>>>>>>>> from
> >>>>>>>>>>>>> List to Set. This is to ensure that in case the
> implementation
> >>>> of
> >>>>>>>>>>>>> StreamPartitoner is buggy and ends up returning duplicate
> >>>>>>>>>>>>> partition numbers, we won't have duplicates thereby not
> trying
> >>>> to
> >>>>>>>>>>> send to
> >>>>>>>>>>>>> the same partition multiple times due to this.
> >>>>>>>>>>>>> 2) I also added a check to send the record only to valid
> >>>> partition
> >>>>>>>>>>>> numbers
> >>>>>>>>>>>>> and log and drop when the partition number is invalid. This
> is
> >>>>>> again
> >>>>>>>>>>> to
> >>>>>>>>>>>>> prevent errors for cases when the StreamPartitioner
> >>>> implementation
> >>>>>>>> has
> >>>>>>>>>>>> some
> >>>>>>>>>>>>> bugs (since there are no validations as such).
> >>>>>>>>>>>>> 3) I also updated the Test Plan section based on the
> suggestion
> >>>>>> from
> >>>>>>>>>>>> Bruno.
> >>>>>>>>>>>>> 4) I updated the default implementation of partitions method
> >>>>>> based on
> >>>>>>>>>>> the
> >>>>>>>>>>>>> great catch from Chris!
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Let me know if it looks fine now.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks!
> >>>>>>>>>>>>> Sagar.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Tue, Aug 30, 2022 at 3:00 PM Bruno Cadonna <
> >>>> cado...@apache.org
> >>>>>>>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I am favour of discarding the sugar for broadcasting and
> leave
> >>>>>> the
> >>>>>>>>>>>>>> broadcasting to the implementation as Chris suggests. I
> think
> >>>>>> that
> >>>>>>>> is
> >>>>>>>>>>>>>> the cleanest option.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>> Bruno
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On 29.08.22 19:50, Chris Egerton wrote:
> >>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I think it'd be useful to be more explicit about
> broadcasting
> >>>> to
> >>>>>>>> all
> >>>>>>>>>>>>>> topic
> >>>>>>>>>>>>>>> partitions rather than add implicit behavior for empty
> cases
> >>>>>> (empty
> >>>>>>>>>>>>>>> optional, empty list, etc.). The suggested enum approach
> >> would
> >>>>>>>>>>> address
> >>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>> nicely.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> It's also worth noting that there's no hard requirement to
> >> add
> >>>>>>>> sugar
> >>>>>>>>>>>> for
> >>>>>>>>>>>>>>> broadcasting to all topic partitions since the API already
> >>>>>> provides
> >>>>>>>>>>> the
> >>>>>>>>>>>>>>> number of topic partitions available when calling a stream
> >>>>>>>>>>> partitioner.
> >>>>>>>>>>>>>> If
> >>>>>>>>>>>>>>> we can't find a clean way to add this support, it might be
> >>>>>> better
> >>>>>>>> to
> >>>>>>>>>>>>>> leave
> >>>>>>>>>>>>>>> it out and just let people implement this themselves with a
> >>>>>> line of
> >>>>>>>>>>>> Java
> >>>>>>>>>>>>>> 8
> >>>>>>>>>>>>>>> streams:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>           return IntStream.range(0,
> >>>>>>>>>>>>>>> numPartitions).boxed().collect(Collectors.toList());
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Also worth noting that there may be a bug in the default
> >>>>>>>>>>> implementation
> >>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>> the new StreamPartitioner::partitions method, since it
> >> doesn't
> >>>>>>>>>>> appear
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>> correctly pick up on null return values from the partition
> >>>>>> method
> >>>>>>>>>>> and
> >>>>>>>>>>>>>>> translate them into empty lists.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Cheers,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Chris
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Mon, Aug 29, 2022 at 7:32 AM Bruno Cadonna <
> >>>>>> cado...@apache.org>
> >>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Hi Sagar,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> I do not see an issue with using an empty list together
> with
> >>>> an
> >>>>>>>>>>> empty
> >>>>>>>>>>>>>>>> Optional. A non-empty Optional that contains an empty list
> >>>>>> would
> >>>>>>>>>>> just
> >>>>>>>>>>>>>>>> say that there is no partition to which the record should
> be
> >>>>>> sent.
> >>>>>>>>>>> If
> >>>>>>>>>>>>>>>> there is no partition, the record is dropped.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> An empty Optional might be a way to say, broadcast to all
> >>>>>>>>>>> partitions.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Alternatively -- to make it more explicit -- you could
> >> return
> >>>>>> an
> >>>>>>>>>>>> object
> >>>>>>>>>>>>>>>> with an enum and a list of partitions. The enum could have
> >>>>>> values
> >>>>>>>>>>>> SOME,
> >>>>>>>>>>>>>>>> ALL, and NONE. If the value is SOME, the list of
> partitions
> >>>>>>>>>>> contains
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>>>> partitions to which to send the record. If the value of
> the
> >>>>>> enum
> >>>>>>>> is
> >>>>>>>>>>>> ALL
> >>>>>>>>>>>>>>>> or NONE, the list of partitions is not used and might be
> >> even
> >>>>>> null
> >>>>>>>>>>>>>>>> (since in that case the list should not be used and it
> would
> >>>>>> be a
> >>>>>>>>>>> bug
> >>>>>>>>>>>> to
> >>>>>>>>>>>>>>>> use it).
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>> Bruno
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On 24.08.22 20:11, Sagar wrote:
> >>>>>>>>>>>>>>>>> Thank you Bruno/Matthew for your comments.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I agree using null does seem error prone. However I think
> >>>>>> using a
> >>>>>>>>>>>>>>>> singleton
> >>>>>>>>>>>>>>>>> list of [-1] might be better in terms of usability, I am
> >>>>>> saying
> >>>>>>>>>>> this
> >>>>>>>>>>>>>>>>> because the KIP also has a provision to return an empty
> >> list
> >>>>>> to
> >>>>>>>>>>> refer
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>> dropping the record. So, an empty optional and an empty
> >> list
> >>>>>> have
> >>>>>>>>>>>>>> totally
> >>>>>>>>>>>>>>>>> different meanings which could get confusing.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Let me know what you think.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks!
> >>>>>>>>>>>>>>>>> Sagar.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Wed, Aug 24, 2022 at 7:30 PM Matthew Benedict de
> Detrich
> >>>>>>>>>>>>>>>>> <matthew.dedetr...@aiven.io.invalid> wrote:
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I also concur with this, having an Optional in the type
> >>>>>> makes it
> >>>>>>>>>>>> very
> >>>>>>>>>>>>>>>>>> clear what’s going on and better signifies an absence of
> >>>>>> value
> >>>>>>>>>>> (or
> >>>>>>>>>>>> in
> >>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>> case the broadcast value).
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>> Matthew de Detrich
> >>>>>>>>>>>>>>>>>> Aiven Deutschland GmbH
> >>>>>>>>>>>>>>>>>> Immanuelkirchstraße 26, 10405 Berlin
> >>>>>>>>>>>>>>>>>> Amtsgericht Charlottenburg, HRB 209739 B
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen
> >>>>>>>>>>>>>>>>>> m: +491603708037
> >>>>>>>>>>>>>>>>>> w: aiven.io e: matthew.dedetr...@aiven.io
> >>>>>>>>>>>>>>>>>> On 24. Aug 2022, 14:19 +0200, dev@kafka.apache.org,
> >> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 2.
> >>>>>>>>>>>>>>>>>>> I would prefer changing the return type of partitions()
> >> to
> >>>>>>>>>>>>>>>>>>> Optional<List<Integer>> and using Optional.empty() as
> the
> >>>>>>>>>>> broadcast
> >>>>>>>>>>>>>>>>>>> value. IMO, The chances that an implementation returns
> >>>> null
> >>>>>> due
> >>>>>>>>>>> to
> >>>>>>>>>>>> a
> >>>>>>>>>>>>>>>> bug
> >>>>>>>>>>>>>>>>>>> is much higher than that an implementation returns an
> >>>> empty
> >>>>>>>>>>>> Optional
> >>>>>>>>>>>>>>>> due
> >>>>>>>>>>>>>>>>>>> to a bug.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>

Reply via email to