Thanks Shay! You and Matthias have convinced me, I'm happy with the current proposal. I think once you make the minor updates to the KIP document this will be ready for voting again.
Cheers, Sophie On Mon, Jul 24, 2023 at 8:26 AM Shay Lin <lqxs...@gmail.com> wrote: > Hi Sophie and Matthias, thanks for your comments and replies. > > 1. Scope of change: KStreams only or KStreams/KTable > I took some time to digest your points, looking through how KStreams > triggers repartitions today. I noticed that `repartitionRequired`is a flag > in KStreamImpl etc and not in KTableImpl etc. When I look further, in the > case of KTable, instead of passing in a boolean flag, a repartition node ` > TableRepartitionMapNode` is directly created. I went back and referenced > the two issue tickets KAFKA-10844 and KAFKA-4835, both requests were > focused on KStreams, i.e. not to change the partition why the input streams > are already correctly keyed. Is it possible that in the case of KTable, > users always intend to repartition (change key) when they call on > aggregate? -- (this was written before I saw Matthias's comment) > > Overall, based on the tickets, I see the benefit of doing a contained > change focusing on KStreams, i.e. repartitionRequired, which would solve > the pain points nicely. If we ran into similar complaints/optimization > requests for KTable down the line, we can address them on top of this(let > me know if we have these requests already, I might just be negligent). > > 2. API: markAsPartitioned() vs config > If we go with the KStreams only scope, markAsPartition() is more > adequate, i.e. maps nicely to repartitionRequired. There is a list of > NamedOperations that may or may not trigger repartition based on its > context(KStreams or KTable) which would make the implementation more > confusing. > > 3. KIP documentation: Thanks for providing the links to previous KIPs. I > will be adding the three use cases and javadoc. I will also document the > risks when it relates to IQ and Join. > > Best, > Shay > > On Fri, Jul 21, 2023 at 5:55 PM Matthias J. Sax <mj...@apache.org> wrote: > > > I agree that it could easily be misused. There is a few Jira tickets for > > cases when people want to "cancel" a repartition step. I would hope > > those tickets are linked to the KIP (if not, we should do this, and > > maybe even c&p those cases as motivation into the KIP itself)? > > > > It's always a tricky question to what extend we want to guide users, and > > to what extend we need to give levers for advances case (and how to > > design those levers...) It's for sure a good idea to call out "use with > > case" in the JavaDocs for the new method. > > > > > > -Matthias > > > > On 7/21/23 3:34 PM, Sophie Blee-Goldman wrote: > > > I guess I felt a bit uneasy about how this could be used/abused while > > > reading the KIP, but if we truly believe this is an advanced feature, > I'm > > > fine with the way things currently are. It doesn't feel like the best > > API, > > > but it does seem to be the best *possible* API given the way things > are. > > > > > > W.r.t the KTable notes, that all makes sense to me. I just wanted to > lay > > > out all the potential cases to make sure we had our bases covered. > > > > > > I still think an example or two would help, but the only thing I will > > > actually wait on before feeling comfortable enough to vote on this > would > > be > > > a clear method signature (and maybe sample javadocs) in the "Public > > > Interfaces" section. > > > > > > Thanks again for the KIP Shay! Hope I haven't dragged it out too much > > > > > > On Fri, Jul 21, 2023 at 3:19 PM Matthias J. Sax <mj...@apache.org> > > wrote: > > > > > >> Some thought about the API question. > > >> > > >> > > >>>> A. kstream.groupBy(...).aggregate(...) > > >> > > >> This can be re-writtten as > > >> > > >> kstream.selectKey(...) > > >> .markAsRepartitioned() > > >> .groupByKey() > > >> .aggregate() > > >> > > >> Given that `markAsRepartitoned` is an advanced feature, I think it > would > > >> be ok? > > >> > > >> > > >>>> B. ktable.groupBy(...).aggregate(...) > > >> > > >> For KTable aggregation, not sure how useful it would be? In the end, > an > > >> table aggregation does only make sense if we pick something from the > > >> value, ie, we indeed change the key? > > >> > > >> > > >>>> C. kstream.selectKey(...).join(ktable) > > >> > > >> We can just insert a `markAsRepartitioned()` after `selectKey` to > avoid > > >> repartitioning of the left input KStream. > > >> > > >> > > >>> KStream.selectKey(...).toTable().join(...) > > >> > > >> Not sure if I understand what you try to say with this example? In the > > >> end, `selectKey(...).toTable()` would repartiton. If I know that one > can > > >> upsert directly, one inserts a `markAsRepartitioned()` in between. > > >> > > >> > > >> In general, the use case seems to be that the key is not in the right > > >> "format", or there is no key, but data was partitioned by a > > >> value-attribute upstream and we just want to extract this > > >> value-attribute into the key. Both seems to be KStream cases? > > >> > > >> > > >> -Matthias > > >> > > >> > > >> > > >> On 7/15/23 1:43 PM, Sophie Blee-Goldman wrote: > > >>> Hey Shay, while I don't have any specific concerns about the new > public > > >> API > > >>> in this KIP, I'd like to better understand how this feature will work > > >>> before I vote. We should document the behavior of this new operator > > >> clearly > > >>> in the KIP as well -- you don't necessarily need to write the > complete > > >>> javadocs up front, but it should be possible for a user to read the > KIP > > >> and > > >>> then understand how this feature will work and how they would need to > > >> apply > > >>> it. > > >>> > > >>> To that end, I recommend framing this proposal with a few examples to > > >> help > > >>> clarify the semantics. When and where can you apply the > > >> markAsPartitioned() > > >>> operator? Some suggestions below. > > >>> > > >>> Specific notes: > > >>> > > >>> 1. The KIP opens with "Each key changing operation in Kafka Streams > > >>> (selectKey, map, transform, etc.) now leads to automatic repartition > > >> before > > >>> an aggregation." We should change "aggregation" to "stateful > operation" > > >> as > > >>> this is true for things like joins as well as aggregations > > >>> 2. The callout on IQ makes me a bit uncomfortable -- basically it > says > > >> this > > >>> should not be a concern "if we use markAsPartitioned correctly". Does > > >> this > > >>> mean if we, the devs implementing this, write the feature correctly? > Or > > >> is > > >>> it saying that this won't be a problem as long as "we", the users of > > this > > >>> feature, use it correctly"? Just wondering if you've put any thought > > into > > >>> how this would work yet (I personally have not) > > >>> 3. The KIP should lay out the proposed API exactly, even if there's > > only > > >>> one new method. Check out this KIP > > >>> < > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL > > >>> > > >>> (or this KIP > > >>> < > > >> > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=128651808 > > >>> ) > > >>> for a good reference on what the Public Interfaces section should > > include > > >>> 4. Regarding the proposed API itself, I wonder if KStream is really > the > > >>> most appropriate interface for the new operator. A repartition can be > > >>> triggered on just a KTable. Here's where some examples would help. > > >> Perhaps > > >>> we could focus on these three cases: > > >>> > > >>> A. kstream.groupBy(...).aggregate(...) > > >>> B. ktable.groupBy(...).aggregate(...) > > >>> C. kstream.selectKey(...).join(ktable) > > >>> > > >>> I'm sure someone will correct me if I'm missing any additional vital > > >>> examples, but at the very least, these are the three to consider: > > either > > >> a > > >>> KStream or KTable followed by a groupBy/aggregation, or a KStream > with > > >>> key-changing operator followed by a join. Note that you could have > > >>> something like KStream.selectKey(...).toTable().join(...) as well, > but > > >>> since there are no pure key-changing operators (like #selectKey) on > > >>> KTables, only groupBy() which must always be followed by aggregation, > > >> this > > >>> 4th case can be reduced to an example like C of a KStream with > > >> key-changing > > >>> operation and downstream join -- ie there's no way to do this without > > >>> #toTable which is more like syntactic sugar for the purposes of this > > >>> repartitioning discussion. > > >>> > > >>> I worry that making this a DSL operator on KStream is too generic, > and > > we > > >>> would also need to add it to KTable for example B, despite KTables > not > > >>> having any true pure key-changing operators outside of #groupBy. > Would > > we > > >>> throw an exception if you invoked #markAsPartitioned on a KTable that > > >>> wasn't followed by a groupBy? If you have multiple key-changing > > >> operators, > > >>> would you need to add markAsPartitioned after each one? If not, what > > are > > >>> the semantics of that? These are the main questions that got me > > thinking > > >>> here, and will definitely need to be clarified in the KIP if we do go > > >> with > > >>> the current proposal. But I wanted to throw out another idea for an > > API I > > >>> think would help with some of this awkwardness by having clearly > > defined > > >>> semantics: > > >>> > > >>> Fundamentally it seems to me that these issues are arising from that > > >> "being > > >>> partitioned" is conceptually a property of other operations applied > to > > a > > >>> KStream/KTable, rather than an operation itself. So rather than > making > > >> this > > >>> a DSL operator itself, what if we added it to the Grouped and various > > >>> Joined configuration classes? It would allow us to more carefully hit > > >> only > > >>> the relevant parts of the DSL, so there are no questions about > > >> whether/when > > >>> to throw errors when the operator is incorrectly applied -- there > would > > >> be > > >>> no way to apply it incorrectly. The main drawback I can think of is > > >> simply > > >>> that this touches on a larger surface area of the API. I personally > > don't > > >>> believe this is a good enough reason to make it a DSL operator as one > > >> could > > >>> make that argument for nearly any kind of KStream or KTable operator > > >>> configuration going forward, and would explode the KStream/KTable API > > >>> surface area instead. Perhaps this was discussed during the previous > > >>> iteration of this KIP, or I'm missing something here, so I just > wanted > > to > > >>> put this out there and see what people think > > >>> > > >>> Either way, thanks for picking up this KIP. It's been a long time > > coming > > >> :) > > >>> > > >>> -Sophie > > >>> > > >>> > > >>> > > >>> > > >>> > > >>> On Mon, Jul 10, 2023 at 2:05 PM Shay Lin <lqxs...@gmail.com> wrote: > > >>> > > >>>> Hi all, > > >>>> > > >>>> It's been a few days so I went ahead with editing the KIP, the main > > >> change > > >>>> is on the method name > > >>>> > > >>>> > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-759%3A+Unneeded+repartition+canceling > > >>>> . > > >>>> I will follow up with a VOTE separately. > > >>>> > > >>>> Best, > > >>>> Shay > > >>>> > > >>>> On Thu, Jun 29, 2023 at 4:52 PM Matthias J. Sax <mj...@apache.org> > > >> wrote: > > >>>> > > >>>>> Shay, > > >>>>> > > >>>>> thanks for picking up this KIP. It's a pity that the discussion > > stalled > > >>>>> for such a long time. > > >>>>> > > >>>>> As expressed previously, I am happy with the name > > `markAsPartitioned()` > > >>>>> and also believe it's ok to just document the impact and leave it > to > > >> the > > >>>>> user to do the right thing. > > >>>>> > > >>>>> If we really get a lot of users that ask about it, because they did > > not > > >>>>> do the right thing, we could still add something (eg, a > > reverse-mapper > > >>>>> function) in a follow-up KIP. But we don't know if it's necessary; > > >> thus, > > >>>>> making a small incremental step sounds like a good approach to me. > > >>>>> > > >>>>> Let's see if others agree or not. > > >>>>> > > >>>>> > > >>>>> -Matthias > > >>>>> > > >>>>> On 6/28/23 5:29 PM, Shay Lin wrote: > > >>>>>> Hi all, > > >>>>>> > > >>>>>> Great discussion thread. May I take this KIP up? If it’s alright > my > > >>>> plan > > >>>>> is > > >>>>>> to update the KIP with the operator `markAsPartitioned()`. > > >>>>>> > > >>>>>> As you have discussed and pointed out, there are implications to > > >>>>> downstream > > >>>>>> joins or aggregation operations. Still, the operator is intended > for > > >>>>>> advanced users so my two cents is it would be a valuable addition > > >>>>>> nonetheless. We could add this as a caution/consideration as part > of > > >>>> the > > >>>>>> java doc. > > >>>>>> > > >>>>>> Let me know, thanks. > > >>>>>> Shay > > >>>>>> > > >>>>> > > >>>> > > >>> > > >> > > > > > >