Hello Florian, Sorry for being late... Found myself keep apologizing for late replies these days. But I do want to push this KIP's progress forward as I see it very important and helpful feature for extensibility.
About the exceptions, I've gone through them and hopefully it is an exhaustive list: 1. KTable#toStream() 2. KStream#merge(KStream) 3. KStream#process() / transform() / transformValues() 4. KGroupedTable / KGroupedStream#count() Here's my reasoning: * It is okay not letting users to override the name for 1/2, since they are too trivial to be useful for debugging, plus their processor names would not determine any related topic / store names. * For 3, I'd vote for adding overloaded functions with Named. * For 4, if users really want to name the processor she can call aggregate() instead, so I think it is okay to skip this case. Guozhang On Fri, Jul 6, 2018 at 3:06 PM, Florian Hussonnois <fhussonn...@gmail.com> wrote: > Hi, > > The option #3 seems to be a good alternative and I find the API more > elegant (thanks John). > > But, we still have the need to overload some methods either because they do > not accept an action instance or because they are translated to multiple > processors. > > For example, this is the case for methods branch() and merge(). We could > introduce a new interface Named (or maybe a different name ?) with a method > name(). All action interfaces could extend this one to implement the option > 3). > This would result by having the following overloads : > > Stream<K, V> merge(final Named name, final KStream<K, V> stream); > KStream<K, V>[] branch(final Named name, final Predicate<? super K, ? super > V>... predicates) > > N.B : The list above is not exhaustive > > --------- > user's code will become : > > KStream<String, Integer> stream = builder.stream("test"); > KStream<String, Integer>[] branches = > stream.branch(Named.with("BRANCH-STREAM-ON-VALUE"), > Predicate.named("STREAM-PAIR-VALUE", (k, v) -> v % 2 == > 0), > Predicate.named("STREAM-IMPAIR-VALUE", (k, v) -> v % 2 != > 0)); > > branches[0].to("pair"); > branches[1].to("impair"); > --------- > > This is a mix of the options 3) and 1) > > Le ven. 6 juil. 2018 à 22:58, Guozhang Wang <wangg...@gmail.com> a écrit : > > > Hi folks, just to summarize the options we have so far: > > > > 1) Add a new "as" for KTable / KStream, plus adding new fields for > > operators-returns-void control objects (the current wiki's proposal). > > > > Pros: no more overloads. > > Cons: a bit departing with the current high-level API design of the DSL, > > plus, the inconsistency between operators-returns-void and > > operators-not-return-voids. > > > > 2) Add overloaded functions for all operators, that accepts a new control > > object "Described". > > > > Pros: consistent with current APIs. > > Cons: lots of overloaded functions to add. > > > > 3) Add another default function in the interface (thank you J8!) as John > > proposed. > > > > Pros: no overloaded functions, no "Described". > > Cons: do we lose lambda functions really (seems not if we provide a > "named" > > for each func)? Plus "Described" may be more extensible than a single > > `String`. > > > > > > My principle of considering which one is better depends primarily on "how > > to make advanced users easily use the additional API, while keeping it > > hidden from normal users who do not care at all". For that purpose I > think > > 3) > 1) > 2). > > > > One caveat though, is that changing the interface would not be > > binary-compatible though source-compatible, right? I.e. users need to > > recompile their code though no changes needed. > > > > > > > > Another note: for 3), if we really want to keep extensibility of > Described > > we could do sth. like: > > > > --------- > > > > public interface Predicate<K, V> { > > // existing method > > boolean test(final K key, final V value); > > > > // new default method adds the ability to name the predicate > > default Described described() { > > return new Described(null); > > } > > } > > > > ---------- > > > > where user's code becomes: > > > > stream.filter(named("key", (k, v) -> true)); // note `named` now just > > sets a Described("key") in "described()". > > > > stream.filter(described(Described.as("key", /* any other fancy > parameters > > in the future*/), (k, v) -> true)); > > ---------- > > > > > > I feel it is not much likely that we'd need to extend it further in the > > future, so just a `String` would be good enough. But just listing all > > possibilities here. > > > > > > > > Guozhang > > > > > > > > > > > > > > On Fri, Jul 6, 2018 at 8:19 AM, John Roesler <j...@confluent.io> wrote: > > > > > Hi Florian, > > > > > > Sorry I'm late to the party, but I missed the message originally. > > > > > > Regarding the names, it's probably a good idea to stick to the same > > > character set we're currently using: letters, numbers, and hyphens. The > > > names are used in Kafka topics, files and folders, and RocksDB > databases, > > > and we also need them to work with the file systems of Windows, Linux, > > and > > > MacOS. My opinion is that with a situation like that, it's better to be > > > conservative. It might also be a good idea to impose an upper limit on > > name > > > length to avoid running afoul of any of those systems. > > > > > > --- > > > > > > It seems like there's a small debate between 1) adding a new method to > > > KStream (and maybe KTable) to modify its name after the fact, or 2) > > > piggy-backing on the config objects where they exist and adding one > where > > > they don't. To me, #2 is the better alternative even though it produces > > > more overloads and may be a bit awkward in places. > > > > > > The reason is simply that #1 is a high-level departure from the > > > graph-building paradigm we're using in the DSL. Consider: > > > > > > Graph.node1(config).node2(config) > > > > > > vs > > > > > > Graph.node1().config().node2().config() > > > > > > We could have done either, but we picked the former. I think it's > > probably > > > a good goal to try and stick to it so that developers can develop and > > rely > > > on their instincts for how the DSL will behave. > > > > > > I do want to present one alternative to adding new config objects: we > can > > > just add a "name()" method to all our "action" interfaces. For example, > > > I'll demonstrate how we can add a "name" to Predicate and then use it > to > > > name a "KStream#filter" DSL operator: > > > > > > public interface Predicate<K, V> { > > > // existing method > > > boolean test(final K key, final V value); > > > > > > // new default method adds the ability to name the predicate > > > default String name() { > > > return null; > > > } > > > > > > // new static factory method adds the ability to wrap lambda > > predicates > > > with a named predicate > > > static <K, V> Predicate<K, V> named(final String name, final > > > Predicate<K, V> predicate) { > > > return new Predicate<K, V>() { > > > @Override > > > public boolean test(final K key, final V value) { > > > return predicate.test(key, value); > > > } > > > > > > @Override > > > public String name() { > > > return name; > > > } > > > }; > > > } > > > } > > > > > > Then, here's how it would look to use it: > > > > > > // Anonymous predicates continue to work just fine > > > stream.filter((k, v) -> true); > > > > > > // Devs can swap in a Predicate that implements the name() method. > > > stream.filter(new Predicate<Object, Object>() { > > > @Override > > > public boolean test(final Object key, final Object value) { > > > return true; > > > } > > > > > > @Override > > > public String name() { > > > return "hey"; > > > } > > > }); > > > > > > // Or they can wrap their existing lambda using the static factory > method > > > stream.filter(named("key", (k, v) -> true)); > > > > > > Just a thought. > > > > > > Overall, I think it's really valuable to be able to name the > processors, > > > for all the reasons you mentioned in the KIP. So thank you for > > introducing > > > this! > > > > > > Thanks, > > > -John > > > > > > On Thu, Jul 5, 2018 at 4:53 PM Florian Hussonnois < > fhussonn...@gmail.com > > > > > > wrote: > > > > > > > Hi, thank you very much for all you suggestions. I've started to > update > > > the > > > > KIP ( > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > 307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL > > > > ). > > > > Also, I propose to rename the Processed class into Described - this > > will > > > be > > > > more meaningful (but this is just a detail). > > > > > > > > I'm OK to not enforcing uppercase for specific names but should we > > allow > > > > arbitrary names with whitespaces for example ? Currently, I can't > tell > > if > > > > this can lead to some side effects ? > > > > > > > > Le lun. 11 juin 2018 à 01:31, Matthias J. Sax <matth...@confluent.io > > > > a > > > > écrit : > > > > > > > > > Just catching up on this thread. > > > > > > > > > > I like the general idea. Couple of comments: > > > > > > > > > > - I think that adding `Processed` (or maybe a different name?) is > a > > > > > valid proposal for stateless operators that only have a single > > overload > > > > > atm. It would align with the overall API design. > > > > > > > > > > - for all methods with multiple existing overloads, we can > consider > > to > > > > > extend `Consumed`, `Produced`, `Materialized` etc to take an > > additional > > > > > processor name (not sure atm how elegant this is; we would need to > > > > > "play" with the API a little bit; the advantage would be, that we > do > > > not > > > > > add more overloads what seems to be key for this KIP) > > > > > > > > > > - operators return void: while I agree that the "name first" > > chaining > > > > > idea is not very intuitive, it might still work, if we name the > > method > > > > > correctly (again, we would need to "play" with the API a little bit > > to > > > > see) > > > > > > > > > > - for DSL operators that are translated to multiple nodes: it > might > > > > > make sense to use the specified operator name as prefix and add > > > > > reasonable suffixes. For example, a join translates into 5 > operators > > > > > that could be name "name-left-store-processor", > > > > > "name-left-join-processor", "name-right-store-processor", > > > > > "name-right-join-processor", and "name-join-merge-processor" (or > > > > > similar). Maybe just using numbers might also work. > > > > > > > > > > - I think, we should strip the number suffixes if a user provides > > > names > > > > > > > > > > - enforcing upper case seems to be tricky: for example, we do not > > > > > enforce upper case for store names and we cannot easily change it > as > > it > > > > > would break compatibility -- thus, for consistency reasons we might > > not > > > > > want to do this > > > > > > > > > > - for better understand of the impact of the KIP, it would be > quite > > > > > helpful if you would list all method names that are affected in the > > KIP > > > > > (ie, list all newly added overloads) > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > > > > On 5/31/18 6:40 PM, Guozhang Wang wrote: > > > > > > Hi Florian, > > > > > > > > > > > > Re 1: I think changing the KStreamImpl / KTableImpl to allow > > > modifying > > > > > the > > > > > > processor name after the operator is fine as long as we do the > > check > > > > > again > > > > > > when modifying that. In fact, we are having some topology > > > optimization > > > > > > going on which may modify processor names in the final topology > > > > anyways ( > > > > > > https://github.com/apache/kafka/pull/4983). Semantically I think > > it > > > is > > > > > > easier to understand to developers than "deciding the processor > > name > > > > for > > > > > > the next operator". > > > > > > > > > > > > Re 2: Yeah I'm thinking that for operators that translates to > > > multiple > > > > > > processor names, we can still use the provided "hint" to name the > > > > > processor > > > > > > names, e.g. for Joins we can name them as `join-foo-this` and > > > > > > `join-foo-that` etc if user calls `as("foo")`. > > > > > > > > > > > > Re 3: The motivation I had about removing the suffix is that it > has > > > > huge > > > > > > restrictions on topology compatibilities: consider if user code > > > added a > > > > > new > > > > > > operator, or library does some optimization to remove some > > operators, > > > > the > > > > > > suffix indexing may be changed for a large amount of the > processor > > > > names: > > > > > > this will in turn change the internal state store names, as well > as > > > > > > internal topic names as well, making the new application topology > > to > > > be > > > > > > incompatible with the ones. One rationale I had about this KIP is > > > that > > > > > > aligned this effort, moving forward we can allow users to > customize > > > > > > internal names so that they can still be reused even with > topology > > > > > changes > > > > > > (e.g. KIP-230), so I think removing the suffix index would be > more > > > > > > applicable in the long run. > > > > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, May 31, 2018 at 3:08 PM, Florian Hussonnois < > > > > > fhussonn...@gmail.com> > > > > > > wrote: > > > > > > > > > > > >> Hi , > > > > > >> Thank you very much for your feedback. > > > > > >> > > > > > >> 1/ > > > > > >> I agree that overloading most of the methods with a Processed is > > not > > > > > ideal. > > > > > >> I've started modifying the KStream API and I got to the same > > > > conclusion. > > > > > >> Also ading a new method directly to KStreamImpl and KTableImpl > > > classes > > > > > >> seems to be a better option. > > > > > >> > > > > > >> However a processor name cannot be redefined after calling an > > > operator > > > > > (or > > > > > >> maybe I miss something in the code). > > > > > >> From my understanding, this will only set the KStream name > > property > > > > not > > > > > the > > > > > >> processor name previsouly added to the topology builder - > leading > > to > > > > > >> InvalidTopology exception. > > > > > >> > > > > > >> So the new method should actually defines the name of the next > > > > > processor : > > > > > >> Below is an example : > > > > > >> > > > > > >> *stream.as <http://stream.as > > >(Processed.name("MAPPE_TO_UPPERCASE")* > > > > > >> * .map( (k, v) -> KeyValue.pair(k, v.toUpperCase()))* > > > > > >> > > > > > >> I think this approach could solve the cases for methods > returning > > > > void ? > > > > > >> > > > > > >> Regarding this new method we have two possible implementations : > > > > > >> > > > > > >> 1. Adding a method like : withName(String processorName) > > > > > >> 2. or adding a method accepting an Processed object : > > > > as(Processed). > > > > > >> > > > > > >> I think solution 2. is preferable as the Processed class could > be > > > > > enriched > > > > > >> further (in futur). > > > > > >> > > > > > >> 2/ > > > > > >> As Guozhang said some operators add internal processors. > > > > > >> For example the branch() method create one KStreamBranch > processor > > > to > > > > > route > > > > > >> records and one KStreamPassThrough processor for each branch. > > > > > >> In that situation only the parent processor can be named. For > > > children > > > > > >> processors we could keep the current behaviour that add a suffix > > > (i.e > > > > > >> KSTREAM-BRANCHCHILD-) > > > > > >> > > > > > >> This also the case for the join() method that result to adding > > > > multiple > > > > > >> processors to the topology (windowing, left/right joins and a > > merge > > > > > >> processor). > > > > > >> I think, like for the branch method users could only define a > > > > processor > > > > > >> name prefix. > > > > > >> > > > > > >> 3/ > > > > > >> I think we should still added a suffix like "-0000000000" to > > > > processor > > > > > >> name and enforce uppercases as this will keep some consistency > > with > > > > the > > > > > >> ones generated by the API. > > > > > >> > > > > > >> 4/ > > > > > >> Yes, the KTable interface should be modified like KStream to > allow > > > > > custom > > > > > >> processor names definition. > > > > > >> > > > > > >> Thanks, > > > > > >> > > > > > >> > > > > > >> Le jeu. 31 mai 2018 à 19:18, Damian Guy <damian....@gmail.com> > a > > > > écrit > > > > > : > > > > > >> > > > > > >>> Hi Florian, > > > > > >>> > > > > > >>> Thanks for the KIP. What about KTable and other DSL interfaces? > > > Will > > > > > they > > > > > >>> not want to be able to do the same thing? > > > > > >>> It would be good to see a complete set of the public API > changes. > > > > > >>> > > > > > >>> Cheers, > > > > > >>> Damian > > > > > >>> > > > > > >>> On Wed, 30 May 2018 at 19:45 Guozhang Wang <wangg...@gmail.com > > > > > > wrote: > > > > > >>> > > > > > >>>> Hello Florian, > > > > > >>>> > > > > > >>>> Thanks for the KIP. I have some meta feedbacks on the > proposal: > > > > > >>>> > > > > > >>>> 1. You mentioned that this `Processed` object will be added > to a > > > new > > > > > >>>> overloaded variant of all the stateless operators, what about > > the > > > > > >>> stateful > > > > > >>>> operators? Would like to hear your opinions if you have > thought > > > > about > > > > > >>> that: > > > > > >>>> note for stateful operators they will usually be mapped to > > > multiple > > > > > >>>> processor node names, so we probably need to come up with some > > > ways > > > > to > > > > > >>>> define all their names. > > > > > >>>> > > > > > >>>> 2. I share the same concern with Bill as for adding lots of > new > > > > > >> overload > > > > > >>>> functions into the stateless operators, as we have just spent > > > quite > > > > > >> some > > > > > >>>> effort in trimming them since 1.0.0 release. If the goal is to > > > just > > > > > >>> provide > > > > > >>>> some "hints" on the generated processor node names, not > strictly > > > > > >>> enforcing > > > > > >>>> the exact names that to be generated, then how about we just > > add a > > > > new > > > > > >>>> function to `KStream` and `KTable` classes like: > > "as(Processed)", > > > > with > > > > > >>> the > > > > > >>>> semantics as "the latest operators that generate this KStream > / > > > > KTable > > > > > >>> will > > > > > >>>> be named accordingly to this hint". > > > > > >>>> > > > > > >>>> The only caveat, is that for all operators like `KStream#to` > and > > > > > >>>> `KStream#print` that returns void, this alternative would not > > > work. > > > > > But > > > > > >>> for > > > > > >>>> the current operators: > > > > > >>>> > > > > > >>>> a. KStream#print, > > > > > >>>> b. KStream#foreach, > > > > > >>>> c. KStream#to, > > > > > >>>> d. KStream#process > > > > > >>>> > > > > > >>>> I personally felt that except `KStream#process` users would > not > > > > > usually > > > > > >>>> bother to override their names, and for `KStream#process` we > > could > > > > add > > > > > >> an > > > > > >>>> overload variant with the additional Processed object. > > > > > >>>> > > > > > >>>> > > > > > >>>> 3. In your example, the processor names are still added with a > > > > suffix > > > > > >>> like > > > > > >>>> " > > > > > >>>> -0000000000", is this intentional? If yes, why (I thought with > > > user > > > > > >>>> specified processor name hints we will not add suffix to > > > distinguish > > > > > >>>> different nodes of the same type any more)? > > > > > >>>> > > > > > >>>> > > > > > >>>> Guozhang > > > > > >>>> > > > > > >>>> > > > > > >>>> On Tue, May 29, 2018 at 6:47 AM, Bill Bejeck < > bbej...@gmail.com > > > > > > > > >> wrote: > > > > > >>>> > > > > > >>>>> Hi Florian, > > > > > >>>>> > > > > > >>>>> Thanks for the KIP. I think being able to add more context > to > > > the > > > > > >>>>> processor names would be useful. > > > > > >>>>> > > > > > >>>>> I like the idea of adding a "withProcessorName" to Produced, > > > > Consumed > > > > > >>> and > > > > > >>>>> Joined. > > > > > >>>>> > > > > > >>>>> But instead of adding the "Processed" parameter to a large > > > > percentage > > > > > >>> of > > > > > >>>>> the methods, which would result in overloaded methods (which > we > > > > > >> removed > > > > > >>>>> quite a bit with KIP-182) what do you think of adding a > method > > > > > >>>>> to the AbstractStream class "withName(String processorName)"? > > BTW > > > > I"m > > > > > >>> not > > > > > >>>>> married to the method name, it's the best I can do off the > top > > of > > > > my > > > > > >>>> head. > > > > > >>>>> > > > > > >>>>> For the methods that return void, we'd have to add a > parameter, > > > but > > > > > >>> that > > > > > >>>>> would at least cut down on the number of overloaded methods > in > > > the > > > > > >> API. > > > > > >>>>> > > > > > >>>>> Just my 2 cents. > > > > > >>>>> > > > > > >>>>> Thanks, > > > > > >>>>> Bill > > > > > >>>>> > > > > > >>>>> On Sun, May 27, 2018 at 4:13 PM, Florian Hussonnois < > > > > > >>>> fhussonn...@gmail.com > > > > > >>>>>> > > > > > >>>>> wrote: > > > > > >>>>> > > > > > >>>>>> Hi, > > > > > >>>>>> > > > > > >>>>>> I would like to start a new discussion on following KIP : > > > > > >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > > >>>>>> > > 307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL > > > > > >>>>>> > > > > > >>>>>> This is still a draft. > > > > > >>>>>> > > > > > >>>>>> Looking forward for your feedback. > > > > > >>>>>> -- > > > > > >>>>>> Florian HUSSONNOIS > > > > > >>>>>> > > > > > >>>>> > > > > > >>>> > > > > > >>>> > > > > > >>>> > > > > > >>>> -- > > > > > >>>> -- Guozhang > > > > > >>>> > > > > > >>> > > > > > >> > > > > > >> > > > > > >> -- > > > > > >> Florian HUSSONNOIS > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > Florian HUSSONNOIS > > > > > > > > > > > > > > > -- > > -- Guozhang > > > > > -- > Florian HUSSONNOIS > -- -- Guozhang