Just catching up on this thread. I like the general idea. Couple of comments:
- I think that adding `Processed` (or maybe a different name?) is a valid proposal for stateless operators that only have a single overload atm. It would align with the overall API design. - for all methods with multiple existing overloads, we can consider to extend `Consumed`, `Produced`, `Materialized` etc to take an additional processor name (not sure atm how elegant this is; we would need to "play" with the API a little bit; the advantage would be, that we do not add more overloads what seems to be key for this KIP) - operators return void: while I agree that the "name first" chaining idea is not very intuitive, it might still work, if we name the method correctly (again, we would need to "play" with the API a little bit to see) - for DSL operators that are translated to multiple nodes: it might make sense to use the specified operator name as prefix and add reasonable suffixes. For example, a join translates into 5 operators that could be name "name-left-store-processor", "name-left-join-processor", "name-right-store-processor", "name-right-join-processor", and "name-join-merge-processor" (or similar). Maybe just using numbers might also work. - I think, we should strip the number suffixes if a user provides names - enforcing upper case seems to be tricky: for example, we do not enforce upper case for store names and we cannot easily change it as it would break compatibility -- thus, for consistency reasons we might not want to do this - for better understand of the impact of the KIP, it would be quite helpful if you would list all method names that are affected in the KIP (ie, list all newly added overloads) -Matthias On 5/31/18 6:40 PM, Guozhang Wang wrote: > Hi Florian, > > Re 1: I think changing the KStreamImpl / KTableImpl to allow modifying the > processor name after the operator is fine as long as we do the check again > when modifying that. In fact, we are having some topology optimization > going on which may modify processor names in the final topology anyways ( > https://github.com/apache/kafka/pull/4983). Semantically I think it is > easier to understand to developers than "deciding the processor name for > the next operator". > > Re 2: Yeah I'm thinking that for operators that translates to multiple > processor names, we can still use the provided "hint" to name the processor > names, e.g. for Joins we can name them as `join-foo-this` and > `join-foo-that` etc if user calls `as("foo")`. > > Re 3: The motivation I had about removing the suffix is that it has huge > restrictions on topology compatibilities: consider if user code added a new > operator, or library does some optimization to remove some operators, the > suffix indexing may be changed for a large amount of the processor names: > this will in turn change the internal state store names, as well as > internal topic names as well, making the new application topology to be > incompatible with the ones. One rationale I had about this KIP is that > aligned this effort, moving forward we can allow users to customize > internal names so that they can still be reused even with topology changes > (e.g. KIP-230), so I think removing the suffix index would be more > applicable in the long run. > > > > Guozhang > > > > > > On Thu, May 31, 2018 at 3:08 PM, Florian Hussonnois <fhussonn...@gmail.com> > wrote: > >> Hi , >> Thank you very much for your feedback. >> >> 1/ >> I agree that overloading most of the methods with a Processed is not ideal. >> I've started modifying the KStream API and I got to the same conclusion. >> Also ading a new method directly to KStreamImpl and KTableImpl classes >> seems to be a better option. >> >> However a processor name cannot be redefined after calling an operator (or >> maybe I miss something in the code). >> From my understanding, this will only set the KStream name property not the >> processor name previsouly added to the topology builder - leading to >> InvalidTopology exception. >> >> So the new method should actually defines the name of the next processor : >> Below is an example : >> >> *stream.as <http://stream.as>(Processed.name("MAPPE_TO_UPPERCASE")* >> * .map( (k, v) -> KeyValue.pair(k, v.toUpperCase()))* >> >> I think this approach could solve the cases for methods returning void ? >> >> Regarding this new method we have two possible implementations : >> >> 1. Adding a method like : withName(String processorName) >> 2. or adding a method accepting an Processed object : as(Processed). >> >> I think solution 2. is preferable as the Processed class could be enriched >> further (in futur). >> >> 2/ >> As Guozhang said some operators add internal processors. >> For example the branch() method create one KStreamBranch processor to route >> records and one KStreamPassThrough processor for each branch. >> In that situation only the parent processor can be named. For children >> processors we could keep the current behaviour that add a suffix (i.e >> KSTREAM-BRANCHCHILD-) >> >> This also the case for the join() method that result to adding multiple >> processors to the topology (windowing, left/right joins and a merge >> processor). >> I think, like for the branch method users could only define a processor >> name prefix. >> >> 3/ >> I think we should still added a suffix like "-0000000000" to processor >> name and enforce uppercases as this will keep some consistency with the >> ones generated by the API. >> >> 4/ >> Yes, the KTable interface should be modified like KStream to allow custom >> processor names definition. >> >> Thanks, >> >> >> Le jeu. 31 mai 2018 à 19:18, Damian Guy <damian....@gmail.com> a écrit : >> >>> Hi Florian, >>> >>> Thanks for the KIP. What about KTable and other DSL interfaces? Will they >>> not want to be able to do the same thing? >>> It would be good to see a complete set of the public API changes. >>> >>> Cheers, >>> Damian >>> >>> On Wed, 30 May 2018 at 19:45 Guozhang Wang <wangg...@gmail.com> wrote: >>> >>>> Hello Florian, >>>> >>>> Thanks for the KIP. I have some meta feedbacks on the proposal: >>>> >>>> 1. You mentioned that this `Processed` object will be added to a new >>>> overloaded variant of all the stateless operators, what about the >>> stateful >>>> operators? Would like to hear your opinions if you have thought about >>> that: >>>> note for stateful operators they will usually be mapped to multiple >>>> processor node names, so we probably need to come up with some ways to >>>> define all their names. >>>> >>>> 2. I share the same concern with Bill as for adding lots of new >> overload >>>> functions into the stateless operators, as we have just spent quite >> some >>>> effort in trimming them since 1.0.0 release. If the goal is to just >>> provide >>>> some "hints" on the generated processor node names, not strictly >>> enforcing >>>> the exact names that to be generated, then how about we just add a new >>>> function to `KStream` and `KTable` classes like: "as(Processed)", with >>> the >>>> semantics as "the latest operators that generate this KStream / KTable >>> will >>>> be named accordingly to this hint". >>>> >>>> The only caveat, is that for all operators like `KStream#to` and >>>> `KStream#print` that returns void, this alternative would not work. But >>> for >>>> the current operators: >>>> >>>> a. KStream#print, >>>> b. KStream#foreach, >>>> c. KStream#to, >>>> d. KStream#process >>>> >>>> I personally felt that except `KStream#process` users would not usually >>>> bother to override their names, and for `KStream#process` we could add >> an >>>> overload variant with the additional Processed object. >>>> >>>> >>>> 3. In your example, the processor names are still added with a suffix >>> like >>>> " >>>> -0000000000", is this intentional? If yes, why (I thought with user >>>> specified processor name hints we will not add suffix to distinguish >>>> different nodes of the same type any more)? >>>> >>>> >>>> Guozhang >>>> >>>> >>>> On Tue, May 29, 2018 at 6:47 AM, Bill Bejeck <bbej...@gmail.com> >> wrote: >>>> >>>>> Hi Florian, >>>>> >>>>> Thanks for the KIP. I think being able to add more context to the >>>>> processor names would be useful. >>>>> >>>>> I like the idea of adding a "withProcessorName" to Produced, Consumed >>> and >>>>> Joined. >>>>> >>>>> But instead of adding the "Processed" parameter to a large percentage >>> of >>>>> the methods, which would result in overloaded methods (which we >> removed >>>>> quite a bit with KIP-182) what do you think of adding a method >>>>> to the AbstractStream class "withName(String processorName)"? BTW I"m >>> not >>>>> married to the method name, it's the best I can do off the top of my >>>> head. >>>>> >>>>> For the methods that return void, we'd have to add a parameter, but >>> that >>>>> would at least cut down on the number of overloaded methods in the >> API. >>>>> >>>>> Just my 2 cents. >>>>> >>>>> Thanks, >>>>> Bill >>>>> >>>>> On Sun, May 27, 2018 at 4:13 PM, Florian Hussonnois < >>>> fhussonn...@gmail.com >>>>>> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I would like to start a new discussion on following KIP : >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>> 307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL >>>>>> >>>>>> This is still a draft. >>>>>> >>>>>> Looking forward for your feedback. >>>>>> -- >>>>>> Florian HUSSONNOIS >>>>>> >>>>> >>>> >>>> >>>> >>>> -- >>>> -- Guozhang >>>> >>> >> >> >> -- >> Florian HUSSONNOIS >> > > >
signature.asc
Description: OpenPGP digital signature