What is the status of this KIP? -Matthias
On 7/19/18 5:17 PM, Guozhang Wang wrote: > Hello Florian, > > Sorry for being late... Found myself keep apologizing for late replies > these days. But I do want to push this KIP's progress forward as I see it > very important and helpful feature for extensibility. > > About the exceptions, I've gone through them and hopefully it is an > exhaustive list: > > 1. KTable#toStream() > 2. KStream#merge(KStream) > 3. KStream#process() / transform() / transformValues() > 4. KGroupedTable / KGroupedStream#count() > > > Here's my reasoning: > > * It is okay not letting users to override the name for 1/2, since they are > too trivial to be useful for debugging, plus their processor names would > not determine any related topic / store names. > * For 3, I'd vote for adding overloaded functions with Named. > * For 4, if users really want to name the processor she can call > aggregate() instead, so I think it is okay to skip this case. > > > Guozhang > > > > On Fri, Jul 6, 2018 at 3:06 PM, Florian Hussonnois <fhussonn...@gmail.com> > wrote: > >> Hi, >> >> The option #3 seems to be a good alternative and I find the API more >> elegant (thanks John). >> >> But, we still have the need to overload some methods either because they do >> not accept an action instance or because they are translated to multiple >> processors. >> >> For example, this is the case for methods branch() and merge(). We could >> introduce a new interface Named (or maybe a different name ?) with a method >> name(). All action interfaces could extend this one to implement the option >> 3). >> This would result by having the following overloads : >> >> Stream<K, V> merge(final Named name, final KStream<K, V> stream); >> KStream<K, V>[] branch(final Named name, final Predicate<? super K, ? super >> V>... predicates) >> >> N.B : The list above is not exhaustive >> >> --------- >> user's code will become : >> >> KStream<String, Integer> stream = builder.stream("test"); >> KStream<String, Integer>[] branches = >> stream.branch(Named.with("BRANCH-STREAM-ON-VALUE"), >> Predicate.named("STREAM-PAIR-VALUE", (k, v) -> v % 2 == >> 0), >> Predicate.named("STREAM-IMPAIR-VALUE", (k, v) -> v % 2 != >> 0)); >> >> branches[0].to("pair"); >> branches[1].to("impair"); >> --------- >> >> This is a mix of the options 3) and 1) >> >> Le ven. 6 juil. 2018 à 22:58, Guozhang Wang <wangg...@gmail.com> a écrit : >> >>> Hi folks, just to summarize the options we have so far: >>> >>> 1) Add a new "as" for KTable / KStream, plus adding new fields for >>> operators-returns-void control objects (the current wiki's proposal). >>> >>> Pros: no more overloads. >>> Cons: a bit departing with the current high-level API design of the DSL, >>> plus, the inconsistency between operators-returns-void and >>> operators-not-return-voids. >>> >>> 2) Add overloaded functions for all operators, that accepts a new control >>> object "Described". >>> >>> Pros: consistent with current APIs. >>> Cons: lots of overloaded functions to add. >>> >>> 3) Add another default function in the interface (thank you J8!) as John >>> proposed. >>> >>> Pros: no overloaded functions, no "Described". >>> Cons: do we lose lambda functions really (seems not if we provide a >> "named" >>> for each func)? Plus "Described" may be more extensible than a single >>> `String`. >>> >>> >>> My principle of considering which one is better depends primarily on "how >>> to make advanced users easily use the additional API, while keeping it >>> hidden from normal users who do not care at all". For that purpose I >> think >>> 3) > 1) > 2). >>> >>> One caveat though, is that changing the interface would not be >>> binary-compatible though source-compatible, right? I.e. users need to >>> recompile their code though no changes needed. >>> >>> >>> >>> Another note: for 3), if we really want to keep extensibility of >> Described >>> we could do sth. like: >>> >>> --------- >>> >>> public interface Predicate<K, V> { >>> // existing method >>> boolean test(final K key, final V value); >>> >>> // new default method adds the ability to name the predicate >>> default Described described() { >>> return new Described(null); >>> } >>> } >>> >>> ---------- >>> >>> where user's code becomes: >>> >>> stream.filter(named("key", (k, v) -> true)); // note `named` now just >>> sets a Described("key") in "described()". >>> >>> stream.filter(described(Described.as("key", /* any other fancy >> parameters >>> in the future*/), (k, v) -> true)); >>> ---------- >>> >>> >>> I feel it is not much likely that we'd need to extend it further in the >>> future, so just a `String` would be good enough. But just listing all >>> possibilities here. >>> >>> >>> >>> Guozhang >>> >>> >>> >>> >>> >>> >>> On Fri, Jul 6, 2018 at 8:19 AM, John Roesler <j...@confluent.io> wrote: >>> >>>> Hi Florian, >>>> >>>> Sorry I'm late to the party, but I missed the message originally. >>>> >>>> Regarding the names, it's probably a good idea to stick to the same >>>> character set we're currently using: letters, numbers, and hyphens. The >>>> names are used in Kafka topics, files and folders, and RocksDB >> databases, >>>> and we also need them to work with the file systems of Windows, Linux, >>> and >>>> MacOS. My opinion is that with a situation like that, it's better to be >>>> conservative. It might also be a good idea to impose an upper limit on >>> name >>>> length to avoid running afoul of any of those systems. >>>> >>>> --- >>>> >>>> It seems like there's a small debate between 1) adding a new method to >>>> KStream (and maybe KTable) to modify its name after the fact, or 2) >>>> piggy-backing on the config objects where they exist and adding one >> where >>>> they don't. To me, #2 is the better alternative even though it produces >>>> more overloads and may be a bit awkward in places. >>>> >>>> The reason is simply that #1 is a high-level departure from the >>>> graph-building paradigm we're using in the DSL. Consider: >>>> >>>> Graph.node1(config).node2(config) >>>> >>>> vs >>>> >>>> Graph.node1().config().node2().config() >>>> >>>> We could have done either, but we picked the former. I think it's >>> probably >>>> a good goal to try and stick to it so that developers can develop and >>> rely >>>> on their instincts for how the DSL will behave. >>>> >>>> I do want to present one alternative to adding new config objects: we >> can >>>> just add a "name()" method to all our "action" interfaces. For example, >>>> I'll demonstrate how we can add a "name" to Predicate and then use it >> to >>>> name a "KStream#filter" DSL operator: >>>> >>>> public interface Predicate<K, V> { >>>> // existing method >>>> boolean test(final K key, final V value); >>>> >>>> // new default method adds the ability to name the predicate >>>> default String name() { >>>> return null; >>>> } >>>> >>>> // new static factory method adds the ability to wrap lambda >>> predicates >>>> with a named predicate >>>> static <K, V> Predicate<K, V> named(final String name, final >>>> Predicate<K, V> predicate) { >>>> return new Predicate<K, V>() { >>>> @Override >>>> public boolean test(final K key, final V value) { >>>> return predicate.test(key, value); >>>> } >>>> >>>> @Override >>>> public String name() { >>>> return name; >>>> } >>>> }; >>>> } >>>> } >>>> >>>> Then, here's how it would look to use it: >>>> >>>> // Anonymous predicates continue to work just fine >>>> stream.filter((k, v) -> true); >>>> >>>> // Devs can swap in a Predicate that implements the name() method. >>>> stream.filter(new Predicate<Object, Object>() { >>>> @Override >>>> public boolean test(final Object key, final Object value) { >>>> return true; >>>> } >>>> >>>> @Override >>>> public String name() { >>>> return "hey"; >>>> } >>>> }); >>>> >>>> // Or they can wrap their existing lambda using the static factory >> method >>>> stream.filter(named("key", (k, v) -> true)); >>>> >>>> Just a thought. >>>> >>>> Overall, I think it's really valuable to be able to name the >> processors, >>>> for all the reasons you mentioned in the KIP. So thank you for >>> introducing >>>> this! >>>> >>>> Thanks, >>>> -John >>>> >>>> On Thu, Jul 5, 2018 at 4:53 PM Florian Hussonnois < >> fhussonn...@gmail.com >>>> >>>> wrote: >>>> >>>>> Hi, thank you very much for all you suggestions. I've started to >> update >>>> the >>>>> KIP ( >>>>> >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>> 307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL >>>>> ). >>>>> Also, I propose to rename the Processed class into Described - this >>> will >>>> be >>>>> more meaningful (but this is just a detail). >>>>> >>>>> I'm OK to not enforcing uppercase for specific names but should we >>> allow >>>>> arbitrary names with whitespaces for example ? Currently, I can't >> tell >>> if >>>>> this can lead to some side effects ? >>>>> >>>>> Le lun. 11 juin 2018 à 01:31, Matthias J. Sax <matth...@confluent.io >>> >>> a >>>>> écrit : >>>>> >>>>>> Just catching up on this thread. >>>>>> >>>>>> I like the general idea. Couple of comments: >>>>>> >>>>>> - I think that adding `Processed` (or maybe a different name?) is >> a >>>>>> valid proposal for stateless operators that only have a single >>> overload >>>>>> atm. It would align with the overall API design. >>>>>> >>>>>> - for all methods with multiple existing overloads, we can >> consider >>> to >>>>>> extend `Consumed`, `Produced`, `Materialized` etc to take an >>> additional >>>>>> processor name (not sure atm how elegant this is; we would need to >>>>>> "play" with the API a little bit; the advantage would be, that we >> do >>>> not >>>>>> add more overloads what seems to be key for this KIP) >>>>>> >>>>>> - operators return void: while I agree that the "name first" >>> chaining >>>>>> idea is not very intuitive, it might still work, if we name the >>> method >>>>>> correctly (again, we would need to "play" with the API a little bit >>> to >>>>> see) >>>>>> >>>>>> - for DSL operators that are translated to multiple nodes: it >> might >>>>>> make sense to use the specified operator name as prefix and add >>>>>> reasonable suffixes. For example, a join translates into 5 >> operators >>>>>> that could be name "name-left-store-processor", >>>>>> "name-left-join-processor", "name-right-store-processor", >>>>>> "name-right-join-processor", and "name-join-merge-processor" (or >>>>>> similar). Maybe just using numbers might also work. >>>>>> >>>>>> - I think, we should strip the number suffixes if a user provides >>>> names >>>>>> >>>>>> - enforcing upper case seems to be tricky: for example, we do not >>>>>> enforce upper case for store names and we cannot easily change it >> as >>> it >>>>>> would break compatibility -- thus, for consistency reasons we might >>> not >>>>>> want to do this >>>>>> >>>>>> - for better understand of the impact of the KIP, it would be >> quite >>>>>> helpful if you would list all method names that are affected in the >>> KIP >>>>>> (ie, list all newly added overloads) >>>>>> >>>>>> >>>>>> -Matthias >>>>>> >>>>>> >>>>>> On 5/31/18 6:40 PM, Guozhang Wang wrote: >>>>>>> Hi Florian, >>>>>>> >>>>>>> Re 1: I think changing the KStreamImpl / KTableImpl to allow >>>> modifying >>>>>> the >>>>>>> processor name after the operator is fine as long as we do the >>> check >>>>>> again >>>>>>> when modifying that. In fact, we are having some topology >>>> optimization >>>>>>> going on which may modify processor names in the final topology >>>>> anyways ( >>>>>>> https://github.com/apache/kafka/pull/4983). Semantically I think >>> it >>>> is >>>>>>> easier to understand to developers than "deciding the processor >>> name >>>>> for >>>>>>> the next operator". >>>>>>> >>>>>>> Re 2: Yeah I'm thinking that for operators that translates to >>>> multiple >>>>>>> processor names, we can still use the provided "hint" to name the >>>>>> processor >>>>>>> names, e.g. for Joins we can name them as `join-foo-this` and >>>>>>> `join-foo-that` etc if user calls `as("foo")`. >>>>>>> >>>>>>> Re 3: The motivation I had about removing the suffix is that it >> has >>>>> huge >>>>>>> restrictions on topology compatibilities: consider if user code >>>> added a >>>>>> new >>>>>>> operator, or library does some optimization to remove some >>> operators, >>>>> the >>>>>>> suffix indexing may be changed for a large amount of the >> processor >>>>> names: >>>>>>> this will in turn change the internal state store names, as well >> as >>>>>>> internal topic names as well, making the new application topology >>> to >>>> be >>>>>>> incompatible with the ones. One rationale I had about this KIP is >>>> that >>>>>>> aligned this effort, moving forward we can allow users to >> customize >>>>>>> internal names so that they can still be reused even with >> topology >>>>>> changes >>>>>>> (e.g. KIP-230), so I think removing the suffix index would be >> more >>>>>>> applicable in the long run. >>>>>>> >>>>>>> >>>>>>> >>>>>>> Guozhang >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Thu, May 31, 2018 at 3:08 PM, Florian Hussonnois < >>>>>> fhussonn...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi , >>>>>>>> Thank you very much for your feedback. >>>>>>>> >>>>>>>> 1/ >>>>>>>> I agree that overloading most of the methods with a Processed is >>> not >>>>>> ideal. >>>>>>>> I've started modifying the KStream API and I got to the same >>>>> conclusion. >>>>>>>> Also ading a new method directly to KStreamImpl and KTableImpl >>>> classes >>>>>>>> seems to be a better option. >>>>>>>> >>>>>>>> However a processor name cannot be redefined after calling an >>>> operator >>>>>> (or >>>>>>>> maybe I miss something in the code). >>>>>>>> From my understanding, this will only set the KStream name >>> property >>>>> not >>>>>> the >>>>>>>> processor name previsouly added to the topology builder - >> leading >>> to >>>>>>>> InvalidTopology exception. >>>>>>>> >>>>>>>> So the new method should actually defines the name of the next >>>>>> processor : >>>>>>>> Below is an example : >>>>>>>> >>>>>>>> *stream.as <http://stream.as >>>> (Processed.name("MAPPE_TO_UPPERCASE")* >>>>>>>> * .map( (k, v) -> KeyValue.pair(k, v.toUpperCase()))* >>>>>>>> >>>>>>>> I think this approach could solve the cases for methods >> returning >>>>> void ? >>>>>>>> >>>>>>>> Regarding this new method we have two possible implementations : >>>>>>>> >>>>>>>> 1. Adding a method like : withName(String processorName) >>>>>>>> 2. or adding a method accepting an Processed object : >>>>> as(Processed). >>>>>>>> >>>>>>>> I think solution 2. is preferable as the Processed class could >> be >>>>>> enriched >>>>>>>> further (in futur). >>>>>>>> >>>>>>>> 2/ >>>>>>>> As Guozhang said some operators add internal processors. >>>>>>>> For example the branch() method create one KStreamBranch >> processor >>>> to >>>>>> route >>>>>>>> records and one KStreamPassThrough processor for each branch. >>>>>>>> In that situation only the parent processor can be named. For >>>> children >>>>>>>> processors we could keep the current behaviour that add a suffix >>>> (i.e >>>>>>>> KSTREAM-BRANCHCHILD-) >>>>>>>> >>>>>>>> This also the case for the join() method that result to adding >>>>> multiple >>>>>>>> processors to the topology (windowing, left/right joins and a >>> merge >>>>>>>> processor). >>>>>>>> I think, like for the branch method users could only define a >>>>> processor >>>>>>>> name prefix. >>>>>>>> >>>>>>>> 3/ >>>>>>>> I think we should still added a suffix like "-0000000000" to >>>>> processor >>>>>>>> name and enforce uppercases as this will keep some consistency >>> with >>>>> the >>>>>>>> ones generated by the API. >>>>>>>> >>>>>>>> 4/ >>>>>>>> Yes, the KTable interface should be modified like KStream to >> allow >>>>>> custom >>>>>>>> processor names definition. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> >>>>>>>> >>>>>>>> Le jeu. 31 mai 2018 à 19:18, Damian Guy <damian....@gmail.com> >> a >>>>> écrit >>>>>> : >>>>>>>> >>>>>>>>> Hi Florian, >>>>>>>>> >>>>>>>>> Thanks for the KIP. What about KTable and other DSL interfaces? >>>> Will >>>>>> they >>>>>>>>> not want to be able to do the same thing? >>>>>>>>> It would be good to see a complete set of the public API >> changes. >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Damian >>>>>>>>> >>>>>>>>> On Wed, 30 May 2018 at 19:45 Guozhang Wang <wangg...@gmail.com >>> >>>>> wrote: >>>>>>>>> >>>>>>>>>> Hello Florian, >>>>>>>>>> >>>>>>>>>> Thanks for the KIP. I have some meta feedbacks on the >> proposal: >>>>>>>>>> >>>>>>>>>> 1. You mentioned that this `Processed` object will be added >> to a >>>> new >>>>>>>>>> overloaded variant of all the stateless operators, what about >>> the >>>>>>>>> stateful >>>>>>>>>> operators? Would like to hear your opinions if you have >> thought >>>>> about >>>>>>>>> that: >>>>>>>>>> note for stateful operators they will usually be mapped to >>>> multiple >>>>>>>>>> processor node names, so we probably need to come up with some >>>> ways >>>>> to >>>>>>>>>> define all their names. >>>>>>>>>> >>>>>>>>>> 2. I share the same concern with Bill as for adding lots of >> new >>>>>>>> overload >>>>>>>>>> functions into the stateless operators, as we have just spent >>>> quite >>>>>>>> some >>>>>>>>>> effort in trimming them since 1.0.0 release. If the goal is to >>>> just >>>>>>>>> provide >>>>>>>>>> some "hints" on the generated processor node names, not >> strictly >>>>>>>>> enforcing >>>>>>>>>> the exact names that to be generated, then how about we just >>> add a >>>>> new >>>>>>>>>> function to `KStream` and `KTable` classes like: >>> "as(Processed)", >>>>> with >>>>>>>>> the >>>>>>>>>> semantics as "the latest operators that generate this KStream >> / >>>>> KTable >>>>>>>>> will >>>>>>>>>> be named accordingly to this hint". >>>>>>>>>> >>>>>>>>>> The only caveat, is that for all operators like `KStream#to` >> and >>>>>>>>>> `KStream#print` that returns void, this alternative would not >>>> work. >>>>>> But >>>>>>>>> for >>>>>>>>>> the current operators: >>>>>>>>>> >>>>>>>>>> a. KStream#print, >>>>>>>>>> b. KStream#foreach, >>>>>>>>>> c. KStream#to, >>>>>>>>>> d. KStream#process >>>>>>>>>> >>>>>>>>>> I personally felt that except `KStream#process` users would >> not >>>>>> usually >>>>>>>>>> bother to override their names, and for `KStream#process` we >>> could >>>>> add >>>>>>>> an >>>>>>>>>> overload variant with the additional Processed object. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> 3. In your example, the processor names are still added with a >>>>> suffix >>>>>>>>> like >>>>>>>>>> " >>>>>>>>>> -0000000000", is this intentional? If yes, why (I thought with >>>> user >>>>>>>>>> specified processor name hints we will not add suffix to >>>> distinguish >>>>>>>>>> different nodes of the same type any more)? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Guozhang >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Tue, May 29, 2018 at 6:47 AM, Bill Bejeck < >> bbej...@gmail.com >>>> >>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Florian, >>>>>>>>>>> >>>>>>>>>>> Thanks for the KIP. I think being able to add more context >> to >>>> the >>>>>>>>>>> processor names would be useful. >>>>>>>>>>> >>>>>>>>>>> I like the idea of adding a "withProcessorName" to Produced, >>>>> Consumed >>>>>>>>> and >>>>>>>>>>> Joined. >>>>>>>>>>> >>>>>>>>>>> But instead of adding the "Processed" parameter to a large >>>>> percentage >>>>>>>>> of >>>>>>>>>>> the methods, which would result in overloaded methods (which >> we >>>>>>>> removed >>>>>>>>>>> quite a bit with KIP-182) what do you think of adding a >> method >>>>>>>>>>> to the AbstractStream class "withName(String processorName)"? >>> BTW >>>>> I"m >>>>>>>>> not >>>>>>>>>>> married to the method name, it's the best I can do off the >> top >>> of >>>>> my >>>>>>>>>> head. >>>>>>>>>>> >>>>>>>>>>> For the methods that return void, we'd have to add a >> parameter, >>>> but >>>>>>>>> that >>>>>>>>>>> would at least cut down on the number of overloaded methods >> in >>>> the >>>>>>>> API. >>>>>>>>>>> >>>>>>>>>>> Just my 2 cents. >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Bill >>>>>>>>>>> >>>>>>>>>>> On Sun, May 27, 2018 at 4:13 PM, Florian Hussonnois < >>>>>>>>>> fhussonn...@gmail.com >>>>>>>>>>>> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> >>>>>>>>>>>> I would like to start a new discussion on following KIP : >>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>>>>> >>> 307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL >>>>>>>>>>>> >>>>>>>>>>>> This is still a draft. >>>>>>>>>>>> >>>>>>>>>>>> Looking forward for your feedback. >>>>>>>>>>>> -- >>>>>>>>>>>> Florian HUSSONNOIS >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> -- Guozhang >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Florian HUSSONNOIS >>>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>>> -- >>>>> Florian HUSSONNOIS >>>>> >>>> >>> >>> >>> >>> -- >>> -- Guozhang >>> >> >> >> -- >> Florian HUSSONNOIS >> > > >
signature.asc
Description: OpenPGP digital signature