Just catching up on this discussion. My overall personal take is, that I am not a big fan of the interface `Named` that is used as a factory. I would rather prefer to add a control object parameter to all methods that don't have one yet. This KIP was started a while ago, and we added new naming capabilities in the meantime. Guozhang's example in the PR comment about naming in stream-stream join shows, that we might end up in a confusion situation for users if we use `Named`. Also, in 2.1, user can already name as repartition-/changelog-topics and stores. Thus, KIP-307 boils down to provide non-functional naming?
Hence, for all methods that allow to specify names already, I don't see any reason to change them, but use the existing API to also name the processor(s) instead of allowing uses to specify a new name. About the inconsistency in method naming. I agree, that `as` is very generic and maybe not the best choice. I think it might be helpful, to have a table overview in the KIP, that list all existing static/non-static methods that allow to specify a name, plus a columns with the new suggested naming for those methods? Thoughts? -Matthias On 12/12/18 12:45 AM, Florian Hussonnois wrote: > Thank you very much for your feedbacks. > > Currently, there is still lot of discussions regarding the Named interface. > On the one hand we should provided consistency over the stream API and on > the other hand we should not break the semantic as John point it up. > > Guozhang, I'm sorry, but I'm little bit confused, maybe I missed something. > In your comment you have suggested that : > * Produced/Consumed/Suppressed should extends Named > * Named should have a private-package method to get the specified processor > name internally (processorName()) > * Finally we should end up with something like : Named -> XXX -> > XXXInternal or Named -> Produced -> ProducedInternal > > The objective behind that is to : > * consolidate the internal method processorName() > * consolidate the method withName that exists now existing into Produced, > Consumed and Suppressed. > > But, Named is an interface so we can't define a private-package method on > it. Also, for example Produced and ProducedInternal are not in the same > package so having a private-package method doesn't really help. > In addition, if we add the withName method into Named interface this can > become confusing for developers because action interfaces (ValueMapper, > Reducer, etc) extend it. > The interface would look like : > > public interface Named<T extends Named<T>> { > default String name() { > return null; > } > default Named<T> withName(final String name) { > return null; > } > ... > } > > So maybe instead of adding another method to Named we could create a new > package-private class that could be extended by > Produced/Consumed/Joined/Suppressed. For exemple, > class SettableName<T extends SettableName<T>> implements Named { > > protected String processorName; > > SettableName(final SettableName settable) { > this(Objects.requireNonNull(settable, "settable can't be > null").name()); > } > > SettableName(final String processorName) { > this.processorName = processorName; > } > > @Override > public String name() { > return processorName; > } > public T withName(final String processorName) { > this.processorName = processorName; > return (T)this; > } > } > > In that way, we will get : public class Produced implements > SettableName<Produced> { ... > > WDYT? > > > Le mar. 11 déc. 2018 à 02:46, Guozhang Wang <wangg...@gmail.com> a écrit : > >> I had one meta comment on the PR: >> https://github.com/apache/kafka/pull/5909#discussion_r240447153 >> >> On Mon, Dec 10, 2018 at 5:22 PM John Roesler <j...@confluent.io> wrote: >> >>> Hi Florian, >>> >>> I hope it's ok if I ask a few questions at this late stage... >>> >>> Comment 1 ====== >>> >>> It seems like the proposal is to add a new "Named" interface that is >>> intended to be mixed in with the existing API objects at various points. >>> >>> Just to preface some of my comments, it looks like your KIP was created >>> quite a while ago, so the API may have changed somewhat since you >> started. >>> >>> As I see the API, there are a few different kinds of DSL method >> arguments: >>> * functions: things like Initializer, Aggregator, ValueJoiner, >>> ForEachAction... All of these are essentially Streams-flavored Function >>> interfaces with different arities, type bounds, and semantics. >>> * config objects: things like Produced, Consumed, Joined, Grouped... >> These >>> are containers for configurations, where the target of the configuration >> is >>> the operation itself >>> * raw configurations: things like a raw topic-name string and >> Materialized: >>> These are configurations for operations that have no config object, and >> for >>> various reasons, we didn't make one. The distinguishing feature is that >> the >>> target of the configuration is not the operation itself, but some aspect >> of >>> it. For example, in Materialized, we are not setting the caching behavior >>> of, for example, an aggregation; we're setting the caching behavior of a >>> materialized state store attached to the aggregation. >>> >>> It seems like choosing to mix the Named interface in with the functions >> has >>> a couple of unfortunate side-effects: >>> * Aggregator is not the only function passed to any of the relevant >>> aggregate methods, so it seems a little arbitrary to pick that function >>> over Initializer or Merger. >>> * As you noted, branch() takes an array of Predicate, so we just ignore >> the >>> provided name(s), even though Predicate names are used elsewhere. >>> * Not all things that we want to name have function arguments, notably >>> source and sink, so we'd switch paradigms and use the config object >>> instead. >>> * Adding an extra method to the function interfaces means that those are >> no >>> longer SAM interfaces. You proposed to add a default implementation, so >> we >>> could still pass a lambda if we don't want to set the name, but if we >> *do* >>> want to set the name, we can no longer use lambdas. >>> >>> I think the obvious other choice would be to mix Named in with the config >>> objects instead, but this has one main downside of its own... >>> * not every operator we wish to name has a config object. I don't know if >>> everyone involved is comfortable with adding a config object to every >>> operator that's missing one. >>> >>> Personally, I favor moving toward a more consistent state that's forward >>> compatible with any further changes we wish to make. I *think* that >> giving >>> every operator two forms (one with no config and one with a config >> object) >>> would be such an API. >>> >>> Comment 2 ========= >>> >>> Finally, just a minor comment: the static method in Named wouldn't work >>> properly as defined. Assuming that we mix Named in with Produced, for >>> example, we'd need to be able to use it like: >>>> kStream.to("out", Produced.with("myOut")) >>> This doesn't work because with() returns a Named, but we need a Produced. >>> >>> We can pull off a builder method in the interface, but not a static >> method. >>> To define a builder method in the interface that returns an instance of >> the >>> concrete subtype, you have to use the "curiously recurring generic" >>> pattern. >>> >>> It would look like: >>> >>> public interface Named<N extends Named<N>> { >>> String name(); >>> N withName(String name); >>> } >>> >>> You can see where the name of the pattern comes from ;) >>> An implementation would then look like: >>> >>> public class Produced implements Named<Produced> { >>> String name() { return name; } >>> Produced withName(final String name) { this.name = name; return this; >> } >>> } >>> >>> Note that the generic parameter gets filled in properly in the >> implementing >>> class, so that you get the right return type out. >>> >>> It doesn't work at all with a static factory method at the interface >> level, >>> so it would be up to Produced to define a static factory if it wants to >>> present one. >>> >>> ====== >>> >>> Those are my two feedbacks! >>> >>> I hope you find this helpful, rather than frustrating. I'm sorry I didn't >>> get a chance to comment sooner. >>> >>> Thanks for the KIP, I think it will be much nicer to be able to name the >>> processor nodes. >>> >>> -John >>> >>> On Tue, Nov 27, 2018 at 6:34 PM Guozhang Wang <wangg...@gmail.com> >> wrote: >>> >>>> Hi Florian, >>>> >>>> I've made a pass over the PR. There are some comments that are related >> to >>>> the function names which may be affecting the KIP wiki page, but >> overall >>> I >>>> think it looks good already. >>>> >>>> >>>> Guozhang >>>> >>>> >>>> On Fri, Nov 16, 2018 at 4:21 PM Guozhang Wang <wangg...@gmail.com> >>> wrote: >>>> >>>>> Thanks Florian! I will take a look at the PR. >>>>> >>>>> >>>>> >>>>> On Mon, Nov 12, 2018 at 2:44 PM Florian Hussonnois < >>>> fhussonn...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Matthias, >>>>>> >>>>>> Sorry I was absent for a while. I have started a new PR for this >> KIP. >>> It >>>>>> is >>>>>> still in progress for now. I'm working on it. >>>>>> https://github.com/apache/kafka/pull/5909 >>>>>> >>>>>> Le ven. 19 oct. 2018 à 20:13, Matthias J. Sax < >> matth...@confluent.io> >>> a >>>>>> écrit : >>>>>> >>>>>>> What is the status of this KIP? >>>>>>> >>>>>>> -Matthias >>>>>>> >>>>>>> On 7/19/18 5:17 PM, Guozhang Wang wrote: >>>>>>>> Hello Florian, >>>>>>>> >>>>>>>> Sorry for being late... Found myself keep apologizing for late >>>> replies >>>>>>>> these days. But I do want to push this KIP's progress forward >> as I >>>>>> see it >>>>>>>> very important and helpful feature for extensibility. >>>>>>>> >>>>>>>> About the exceptions, I've gone through them and hopefully it is >>> an >>>>>>>> exhaustive list: >>>>>>>> >>>>>>>> 1. KTable#toStream() >>>>>>>> 2. KStream#merge(KStream) >>>>>>>> 3. KStream#process() / transform() / transformValues() >>>>>>>> 4. KGroupedTable / KGroupedStream#count() >>>>>>>> >>>>>>>> >>>>>>>> Here's my reasoning: >>>>>>>> >>>>>>>> * It is okay not letting users to override the name for 1/2, >> since >>>>>> they >>>>>>> are >>>>>>>> too trivial to be useful for debugging, plus their processor >> names >>>>>> would >>>>>>>> not determine any related topic / store names. >>>>>>>> * For 3, I'd vote for adding overloaded functions with Named. >>>>>>>> * For 4, if users really want to name the processor she can call >>>>>>>> aggregate() instead, so I think it is okay to skip this case. >>>>>>>> >>>>>>>> >>>>>>>> Guozhang >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Jul 6, 2018 at 3:06 PM, Florian Hussonnois < >>>>>>> fhussonn...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> The option #3 seems to be a good alternative and I find the API >>>> more >>>>>>>>> elegant (thanks John). >>>>>>>>> >>>>>>>>> But, we still have the need to overload some methods either >>> because >>>>>>> they do >>>>>>>>> not accept an action instance or because they are translated to >>>>>> multiple >>>>>>>>> processors. >>>>>>>>> >>>>>>>>> For example, this is the case for methods branch() and merge(). >>> We >>>>>> could >>>>>>>>> introduce a new interface Named (or maybe a different name ?) >>> with >>>> a >>>>>>> method >>>>>>>>> name(). All action interfaces could extend this one to >> implement >>>> the >>>>>>> option >>>>>>>>> 3). >>>>>>>>> This would result by having the following overloads : >>>>>>>>> >>>>>>>>> Stream<K, V> merge(final Named name, final KStream<K, V> >> stream); >>>>>>>>> KStream<K, V>[] branch(final Named name, final Predicate<? >> super >>>> K, ? >>>>>>> super >>>>>>>>> V>... predicates) >>>>>>>>> >>>>>>>>> N.B : The list above is not exhaustive >>>>>>>>> >>>>>>>>> --------- >>>>>>>>> user's code will become : >>>>>>>>> >>>>>>>>> KStream<String, Integer> stream = >> builder.stream("test"); >>>>>>>>> KStream<String, Integer>[] branches = >>>>>>>>> stream.branch(Named.with("BRANCH-STREAM-ON-VALUE"), >>>>>>>>> Predicate.named("STREAM-PAIR-VALUE", (k, v) -> >> v >>> % >>>> 2 >>>>>> == >>>>>>>>> 0), >>>>>>>>> Predicate.named("STREAM-IMPAIR-VALUE", (k, v) >> -> >>> v >>>> % >>>>>> 2 >>>>>>> != >>>>>>>>> 0)); >>>>>>>>> >>>>>>>>> branches[0].to("pair"); >>>>>>>>> branches[1].to("impair"); >>>>>>>>> --------- >>>>>>>>> >>>>>>>>> This is a mix of the options 3) and 1) >>>>>>>>> >>>>>>>>> Le ven. 6 juil. 2018 à 22:58, Guozhang Wang < >> wangg...@gmail.com> >>> a >>>>>>> écrit : >>>>>>>>> >>>>>>>>>> Hi folks, just to summarize the options we have so far: >>>>>>>>>> >>>>>>>>>> 1) Add a new "as" for KTable / KStream, plus adding new fields >>> for >>>>>>>>>> operators-returns-void control objects (the current wiki's >>>>>> proposal). >>>>>>>>>> >>>>>>>>>> Pros: no more overloads. >>>>>>>>>> Cons: a bit departing with the current high-level API design >> of >>>> the >>>>>>> DSL, >>>>>>>>>> plus, the inconsistency between operators-returns-void and >>>>>>>>>> operators-not-return-voids. >>>>>>>>>> >>>>>>>>>> 2) Add overloaded functions for all operators, that accepts a >>> new >>>>>>> control >>>>>>>>>> object "Described". >>>>>>>>>> >>>>>>>>>> Pros: consistent with current APIs. >>>>>>>>>> Cons: lots of overloaded functions to add. >>>>>>>>>> >>>>>>>>>> 3) Add another default function in the interface (thank you >> J8!) >>>> as >>>>>>> John >>>>>>>>>> proposed. >>>>>>>>>> >>>>>>>>>> Pros: no overloaded functions, no "Described". >>>>>>>>>> Cons: do we lose lambda functions really (seems not if we >>> provide >>>> a >>>>>>>>> "named" >>>>>>>>>> for each func)? Plus "Described" may be more extensible than a >>>>>> single >>>>>>>>>> `String`. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> My principle of considering which one is better depends >>> primarily >>>> on >>>>>>> "how >>>>>>>>>> to make advanced users easily use the additional API, while >>>> keeping >>>>>> it >>>>>>>>>> hidden from normal users who do not care at all". For that >>>> purpose I >>>>>>>>> think >>>>>>>>>> 3) > 1) > 2). >>>>>>>>>> >>>>>>>>>> One caveat though, is that changing the interface would not be >>>>>>>>>> binary-compatible though source-compatible, right? I.e. users >>> need >>>>>> to >>>>>>>>>> recompile their code though no changes needed. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Another note: for 3), if we really want to keep extensibility >> of >>>>>>>>> Described >>>>>>>>>> we could do sth. like: >>>>>>>>>> >>>>>>>>>> --------- >>>>>>>>>> >>>>>>>>>> public interface Predicate<K, V> { >>>>>>>>>> // existing method >>>>>>>>>> boolean test(final K key, final V value); >>>>>>>>>> >>>>>>>>>> // new default method adds the ability to name the >> predicate >>>>>>>>>> default Described described() { >>>>>>>>>> return new Described(null); >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> ---------- >>>>>>>>>> >>>>>>>>>> where user's code becomes: >>>>>>>>>> >>>>>>>>>> stream.filter(named("key", (k, v) -> true)); // note `named` >>> now >>>>>> just >>>>>>>>>> sets a Described("key") in "described()". >>>>>>>>>> >>>>>>>>>> stream.filter(described(Described.as("key", /* any other fancy >>>>>>>>> parameters >>>>>>>>>> in the future*/), (k, v) -> true)); >>>>>>>>>> ---------- >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> I feel it is not much likely that we'd need to extend it >> further >>>> in >>>>>> the >>>>>>>>>> future, so just a `String` would be good enough. But just >>> listing >>>>>> all >>>>>>>>>> possibilities here. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Guozhang >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Jul 6, 2018 at 8:19 AM, John Roesler < >> j...@confluent.io >>>> >>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Florian, >>>>>>>>>>> >>>>>>>>>>> Sorry I'm late to the party, but I missed the message >>> originally. >>>>>>>>>>> >>>>>>>>>>> Regarding the names, it's probably a good idea to stick to >> the >>>> same >>>>>>>>>>> character set we're currently using: letters, numbers, and >>>> hyphens. >>>>>>> The >>>>>>>>>>> names are used in Kafka topics, files and folders, and >> RocksDB >>>>>>>>> databases, >>>>>>>>>>> and we also need them to work with the file systems of >> Windows, >>>>>> Linux, >>>>>>>>>> and >>>>>>>>>>> MacOS. My opinion is that with a situation like that, it's >>> better >>>>>> to >>>>>>> be >>>>>>>>>>> conservative. It might also be a good idea to impose an upper >>>>>> limit on >>>>>>>>>> name >>>>>>>>>>> length to avoid running afoul of any of those systems. >>>>>>>>>>> >>>>>>>>>>> --- >>>>>>>>>>> >>>>>>>>>>> It seems like there's a small debate between 1) adding a new >>>>>> method to >>>>>>>>>>> KStream (and maybe KTable) to modify its name after the fact, >>> or >>>> 2) >>>>>>>>>>> piggy-backing on the config objects where they exist and >> adding >>>> one >>>>>>>>> where >>>>>>>>>>> they don't. To me, #2 is the better alternative even though >> it >>>>>>> produces >>>>>>>>>>> more overloads and may be a bit awkward in places. >>>>>>>>>>> >>>>>>>>>>> The reason is simply that #1 is a high-level departure from >> the >>>>>>>>>>> graph-building paradigm we're using in the DSL. Consider: >>>>>>>>>>> >>>>>>>>>>> Graph.node1(config).node2(config) >>>>>>>>>>> >>>>>>>>>>> vs >>>>>>>>>>> >>>>>>>>>>> Graph.node1().config().node2().config() >>>>>>>>>>> >>>>>>>>>>> We could have done either, but we picked the former. I think >>> it's >>>>>>>>>> probably >>>>>>>>>>> a good goal to try and stick to it so that developers can >>> develop >>>>>> and >>>>>>>>>> rely >>>>>>>>>>> on their instincts for how the DSL will behave. >>>>>>>>>>> >>>>>>>>>>> I do want to present one alternative to adding new config >>>> objects: >>>>>> we >>>>>>>>> can >>>>>>>>>>> just add a "name()" method to all our "action" interfaces. >> For >>>>>>> example, >>>>>>>>>>> I'll demonstrate how we can add a "name" to Predicate and >> then >>>> use >>>>>> it >>>>>>>>> to >>>>>>>>>>> name a "KStream#filter" DSL operator: >>>>>>>>>>> >>>>>>>>>>> public interface Predicate<K, V> { >>>>>>>>>>> // existing method >>>>>>>>>>> boolean test(final K key, final V value); >>>>>>>>>>> >>>>>>>>>>> // new default method adds the ability to name the >>> predicate >>>>>>>>>>> default String name() { >>>>>>>>>>> return null; >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> // new static factory method adds the ability to wrap >>> lambda >>>>>>>>>> predicates >>>>>>>>>>> with a named predicate >>>>>>>>>>> static <K, V> Predicate<K, V> named(final String name, >>> final >>>>>>>>>>> Predicate<K, V> predicate) { >>>>>>>>>>> return new Predicate<K, V>() { >>>>>>>>>>> @Override >>>>>>>>>>> public boolean test(final K key, final V value) { >>>>>>>>>>> return predicate.test(key, value); >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> @Override >>>>>>>>>>> public String name() { >>>>>>>>>>> return name; >>>>>>>>>>> } >>>>>>>>>>> }; >>>>>>>>>>> } >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> Then, here's how it would look to use it: >>>>>>>>>>> >>>>>>>>>>> // Anonymous predicates continue to work just fine >>>>>>>>>>> stream.filter((k, v) -> true); >>>>>>>>>>> >>>>>>>>>>> // Devs can swap in a Predicate that implements the name() >>>> method. >>>>>>>>>>> stream.filter(new Predicate<Object, Object>() { >>>>>>>>>>> @Override >>>>>>>>>>> public boolean test(final Object key, final Object >> value) { >>>>>>>>>>> return true; >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> @Override >>>>>>>>>>> public String name() { >>>>>>>>>>> return "hey"; >>>>>>>>>>> } >>>>>>>>>>> }); >>>>>>>>>>> >>>>>>>>>>> // Or they can wrap their existing lambda using the static >>>> factory >>>>>>>>> method >>>>>>>>>>> stream.filter(named("key", (k, v) -> true)); >>>>>>>>>>> >>>>>>>>>>> Just a thought. >>>>>>>>>>> >>>>>>>>>>> Overall, I think it's really valuable to be able to name the >>>>>>>>> processors, >>>>>>>>>>> for all the reasons you mentioned in the KIP. So thank you >> for >>>>>>>>>> introducing >>>>>>>>>>> this! >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> -John >>>>>>>>>>> >>>>>>>>>>> On Thu, Jul 5, 2018 at 4:53 PM Florian Hussonnois < >>>>>>>>> fhussonn...@gmail.com >>>>>>>>>>> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi, thank you very much for all you suggestions. I've >> started >>> to >>>>>>>>> update >>>>>>>>>>> the >>>>>>>>>>>> KIP ( >>>>>>>>>>>> >>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>>>> >> 307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL >>>>>>>>>>>> ). >>>>>>>>>>>> Also, I propose to rename the Processed class into >> Described - >>>>>> this >>>>>>>>>> will >>>>>>>>>>> be >>>>>>>>>>>> more meaningful (but this is just a detail). >>>>>>>>>>>> >>>>>>>>>>>> I'm OK to not enforcing uppercase for specific names but >>> should >>>> we >>>>>>>>>> allow >>>>>>>>>>>> arbitrary names with whitespaces for example ? Currently, I >>>> can't >>>>>>>>> tell >>>>>>>>>> if >>>>>>>>>>>> this can lead to some side effects ? >>>>>>>>>>>> >>>>>>>>>>>> Le lun. 11 juin 2018 à 01:31, Matthias J. Sax < >>>>>> matth...@confluent.io >>>>>>>>>> >>>>>>>>>> a >>>>>>>>>>>> écrit : >>>>>>>>>>>> >>>>>>>>>>>>> Just catching up on this thread. >>>>>>>>>>>>> >>>>>>>>>>>>> I like the general idea. Couple of comments: >>>>>>>>>>>>> >>>>>>>>>>>>> - I think that adding `Processed` (or maybe a different >>> name?) >>>>>> is >>>>>>>>> a >>>>>>>>>>>>> valid proposal for stateless operators that only have a >>> single >>>>>>>>>> overload >>>>>>>>>>>>> atm. It would align with the overall API design. >>>>>>>>>>>>> >>>>>>>>>>>>> - for all methods with multiple existing overloads, we can >>>>>>>>> consider >>>>>>>>>> to >>>>>>>>>>>>> extend `Consumed`, `Produced`, `Materialized` etc to take >> an >>>>>>>>>> additional >>>>>>>>>>>>> processor name (not sure atm how elegant this is; we would >>> need >>>>>> to >>>>>>>>>>>>> "play" with the API a little bit; the advantage would be, >>> that >>>> we >>>>>>>>> do >>>>>>>>>>> not >>>>>>>>>>>>> add more overloads what seems to be key for this KIP) >>>>>>>>>>>>> >>>>>>>>>>>>> - operators return void: while I agree that the "name >> first" >>>>>>>>>> chaining >>>>>>>>>>>>> idea is not very intuitive, it might still work, if we name >>> the >>>>>>>>>> method >>>>>>>>>>>>> correctly (again, we would need to "play" with the API a >>> little >>>>>> bit >>>>>>>>>> to >>>>>>>>>>>> see) >>>>>>>>>>>>> >>>>>>>>>>>>> - for DSL operators that are translated to multiple nodes: >>> it >>>>>>>>> might >>>>>>>>>>>>> make sense to use the specified operator name as prefix and >>> add >>>>>>>>>>>>> reasonable suffixes. For example, a join translates into 5 >>>>>>>>> operators >>>>>>>>>>>>> that could be name "name-left-store-processor", >>>>>>>>>>>>> "name-left-join-processor", "name-right-store-processor", >>>>>>>>>>>>> "name-right-join-processor", and >> "name-join-merge-processor" >>>> (or >>>>>>>>>>>>> similar). Maybe just using numbers might also work. >>>>>>>>>>>>> >>>>>>>>>>>>> - I think, we should strip the number suffixes if a user >>>>>> provides >>>>>>>>>>> names >>>>>>>>>>>>> >>>>>>>>>>>>> - enforcing upper case seems to be tricky: for example, we >>> do >>>>>> not >>>>>>>>>>>>> enforce upper case for store names and we cannot easily >>> change >>>> it >>>>>>>>> as >>>>>>>>>> it >>>>>>>>>>>>> would break compatibility -- thus, for consistency reasons >> we >>>>>> might >>>>>>>>>> not >>>>>>>>>>>>> want to do this >>>>>>>>>>>>> >>>>>>>>>>>>> - for better understand of the impact of the KIP, it would >>> be >>>>>>>>> quite >>>>>>>>>>>>> helpful if you would list all method names that are >> affected >>> in >>>>>> the >>>>>>>>>> KIP >>>>>>>>>>>>> (ie, list all newly added overloads) >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -Matthias >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On 5/31/18 6:40 PM, Guozhang Wang wrote: >>>>>>>>>>>>>> Hi Florian, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Re 1: I think changing the KStreamImpl / KTableImpl to >> allow >>>>>>>>>>> modifying >>>>>>>>>>>>> the >>>>>>>>>>>>>> processor name after the operator is fine as long as we do >>> the >>>>>>>>>> check >>>>>>>>>>>>> again >>>>>>>>>>>>>> when modifying that. In fact, we are having some topology >>>>>>>>>>> optimization >>>>>>>>>>>>>> going on which may modify processor names in the final >>>> topology >>>>>>>>>>>> anyways ( >>>>>>>>>>>>>> https://github.com/apache/kafka/pull/4983). Semantically >> I >>>>>> think >>>>>>>>>> it >>>>>>>>>>> is >>>>>>>>>>>>>> easier to understand to developers than "deciding the >>>> processor >>>>>>>>>> name >>>>>>>>>>>> for >>>>>>>>>>>>>> the next operator". >>>>>>>>>>>>>> >>>>>>>>>>>>>> Re 2: Yeah I'm thinking that for operators that translates >>> to >>>>>>>>>>> multiple >>>>>>>>>>>>>> processor names, we can still use the provided "hint" to >>> name >>>>>> the >>>>>>>>>>>>> processor >>>>>>>>>>>>>> names, e.g. for Joins we can name them as `join-foo-this` >>> and >>>>>>>>>>>>>> `join-foo-that` etc if user calls `as("foo")`. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Re 3: The motivation I had about removing the suffix is >> that >>>> it >>>>>>>>> has >>>>>>>>>>>> huge >>>>>>>>>>>>>> restrictions on topology compatibilities: consider if user >>>> code >>>>>>>>>>> added a >>>>>>>>>>>>> new >>>>>>>>>>>>>> operator, or library does some optimization to remove some >>>>>>>>>> operators, >>>>>>>>>>>> the >>>>>>>>>>>>>> suffix indexing may be changed for a large amount of the >>>>>>>>> processor >>>>>>>>>>>> names: >>>>>>>>>>>>>> this will in turn change the internal state store names, >> as >>>> well >>>>>>>>> as >>>>>>>>>>>>>> internal topic names as well, making the new application >>>>>> topology >>>>>>>>>> to >>>>>>>>>>> be >>>>>>>>>>>>>> incompatible with the ones. One rationale I had about this >>> KIP >>>>>> is >>>>>>>>>>> that >>>>>>>>>>>>>> aligned this effort, moving forward we can allow users to >>>>>>>>> customize >>>>>>>>>>>>>> internal names so that they can still be reused even with >>>>>>>>> topology >>>>>>>>>>>>> changes >>>>>>>>>>>>>> (e.g. KIP-230), so I think removing the suffix index would >>> be >>>>>>>>> more >>>>>>>>>>>>>> applicable in the long run. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, May 31, 2018 at 3:08 PM, Florian Hussonnois < >>>>>>>>>>>>> fhussonn...@gmail.com> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi , >>>>>>>>>>>>>>> Thank you very much for your feedback. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 1/ >>>>>>>>>>>>>>> I agree that overloading most of the methods with a >>> Processed >>>>>> is >>>>>>>>>> not >>>>>>>>>>>>> ideal. >>>>>>>>>>>>>>> I've started modifying the KStream API and I got to the >>> same >>>>>>>>>>>> conclusion. >>>>>>>>>>>>>>> Also ading a new method directly to KStreamImpl and >>>> KTableImpl >>>>>>>>>>> classes >>>>>>>>>>>>>>> seems to be a better option. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> However a processor name cannot be redefined after >> calling >>> an >>>>>>>>>>> operator >>>>>>>>>>>>> (or >>>>>>>>>>>>>>> maybe I miss something in the code). >>>>>>>>>>>>>>> From my understanding, this will only set the KStream >> name >>>>>>>>>> property >>>>>>>>>>>> not >>>>>>>>>>>>> the >>>>>>>>>>>>>>> processor name previsouly added to the topology builder - >>>>>>>>> leading >>>>>>>>>> to >>>>>>>>>>>>>>> InvalidTopology exception. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> So the new method should actually defines the name of the >>>> next >>>>>>>>>>>>> processor : >>>>>>>>>>>>>>> Below is an example : >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> *stream.as <http://stream.as >>>>>>>>>>> (Processed.name("MAPPE_TO_UPPERCASE")* >>>>>>>>>>>>>>> * .map( (k, v) -> KeyValue.pair(k, >>>> v.toUpperCase()))* >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I think this approach could solve the cases for methods >>>>>>>>> returning >>>>>>>>>>>> void ? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Regarding this new method we have two possible >>>> implementations >>>>>> : >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 1. Adding a method like : withName(String >> processorName) >>>>>>>>>>>>>>> 2. or adding a method accepting an Processed object : >>>>>>>>>>>> as(Processed). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I think solution 2. is preferable as the Processed class >>>> could >>>>>>>>> be >>>>>>>>>>>>> enriched >>>>>>>>>>>>>>> further (in futur). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 2/ >>>>>>>>>>>>>>> As Guozhang said some operators add internal processors. >>>>>>>>>>>>>>> For example the branch() method create one KStreamBranch >>>>>>>>> processor >>>>>>>>>>> to >>>>>>>>>>>>> route >>>>>>>>>>>>>>> records and one KStreamPassThrough processor for each >>> branch. >>>>>>>>>>>>>>> In that situation only the parent processor can be named. >>> For >>>>>>>>>>> children >>>>>>>>>>>>>>> processors we could keep the current behaviour that add a >>>>>> suffix >>>>>>>>>>> (i.e >>>>>>>>>>>>>>> KSTREAM-BRANCHCHILD-) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> This also the case for the join() method that result to >>>> adding >>>>>>>>>>>> multiple >>>>>>>>>>>>>>> processors to the topology (windowing, left/right joins >>> and a >>>>>>>>>> merge >>>>>>>>>>>>>>> processor). >>>>>>>>>>>>>>> I think, like for the branch method users could only >>> define a >>>>>>>>>>>> processor >>>>>>>>>>>>>>> name prefix. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 3/ >>>>>>>>>>>>>>> I think we should still added a suffix like >> "-0000000000" >>> to >>>>>>>>>>>> processor >>>>>>>>>>>>>>> name and enforce uppercases as this will keep some >>>> consistency >>>>>>>>>> with >>>>>>>>>>>> the >>>>>>>>>>>>>>> ones generated by the API. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 4/ >>>>>>>>>>>>>>> Yes, the KTable interface should be modified like KStream >>> to >>>>>>>>> allow >>>>>>>>>>>>> custom >>>>>>>>>>>>>>> processor names definition. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Le jeu. 31 mai 2018 à 19:18, Damian Guy < >>>> damian....@gmail.com> >>>>>>>>> a >>>>>>>>>>>> écrit >>>>>>>>>>>>> : >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hi Florian, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks for the KIP. What about KTable and other DSL >>>>>> interfaces? >>>>>>>>>>> Will >>>>>>>>>>>>> they >>>>>>>>>>>>>>>> not want to be able to do the same thing? >>>>>>>>>>>>>>>> It would be good to see a complete set of the public API >>>>>>>>> changes. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>> Damian >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Wed, 30 May 2018 at 19:45 Guozhang Wang < >>>>>> wangg...@gmail.com >>>>>>>>>> >>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hello Florian, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks for the KIP. I have some meta feedbacks on the >>>>>>>>> proposal: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 1. You mentioned that this `Processed` object will be >>> added >>>>>>>>> to a >>>>>>>>>>> new >>>>>>>>>>>>>>>>> overloaded variant of all the stateless operators, what >>>> about >>>>>>>>>> the >>>>>>>>>>>>>>>> stateful >>>>>>>>>>>>>>>>> operators? Would like to hear your opinions if you have >>>>>>>>> thought >>>>>>>>>>>> about >>>>>>>>>>>>>>>> that: >>>>>>>>>>>>>>>>> note for stateful operators they will usually be mapped >>> to >>>>>>>>>>> multiple >>>>>>>>>>>>>>>>> processor node names, so we probably need to come up >> with >>>>>> some >>>>>>>>>>> ways >>>>>>>>>>>> to >>>>>>>>>>>>>>>>> define all their names. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 2. I share the same concern with Bill as for adding >> lots >>> of >>>>>>>>> new >>>>>>>>>>>>>>> overload >>>>>>>>>>>>>>>>> functions into the stateless operators, as we have just >>>> spent >>>>>>>>>>> quite >>>>>>>>>>>>>>> some >>>>>>>>>>>>>>>>> effort in trimming them since 1.0.0 release. If the >> goal >>> is >>>>>> to >>>>>>>>>>> just >>>>>>>>>>>>>>>> provide >>>>>>>>>>>>>>>>> some "hints" on the generated processor node names, not >>>>>>>>> strictly >>>>>>>>>>>>>>>> enforcing >>>>>>>>>>>>>>>>> the exact names that to be generated, then how about we >>>> just >>>>>>>>>> add a >>>>>>>>>>>> new >>>>>>>>>>>>>>>>> function to `KStream` and `KTable` classes like: >>>>>>>>>> "as(Processed)", >>>>>>>>>>>> with >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>> semantics as "the latest operators that generate this >>>> KStream >>>>>>>>> / >>>>>>>>>>>> KTable >>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>> be named accordingly to this hint". >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The only caveat, is that for all operators like >>>> `KStream#to` >>>>>>>>> and >>>>>>>>>>>>>>>>> `KStream#print` that returns void, this alternative >> would >>>> not >>>>>>>>>>> work. >>>>>>>>>>>>> But >>>>>>>>>>>>>>>> for >>>>>>>>>>>>>>>>> the current operators: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> a. KStream#print, >>>>>>>>>>>>>>>>> b. KStream#foreach, >>>>>>>>>>>>>>>>> c. KStream#to, >>>>>>>>>>>>>>>>> d. KStream#process >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I personally felt that except `KStream#process` users >>> would >>>>>>>>> not >>>>>>>>>>>>> usually >>>>>>>>>>>>>>>>> bother to override their names, and for >> `KStream#process` >>>> we >>>>>>>>>> could >>>>>>>>>>>> add >>>>>>>>>>>>>>> an >>>>>>>>>>>>>>>>> overload variant with the additional Processed object. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 3. In your example, the processor names are still added >>>> with >>>>>> a >>>>>>>>>>>> suffix >>>>>>>>>>>>>>>> like >>>>>>>>>>>>>>>>> " >>>>>>>>>>>>>>>>> -0000000000", is this intentional? If yes, why (I >> thought >>>>>> with >>>>>>>>>>> user >>>>>>>>>>>>>>>>> specified processor name hints we will not add suffix >> to >>>>>>>>>>> distinguish >>>>>>>>>>>>>>>>> different nodes of the same type any more)? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Tue, May 29, 2018 at 6:47 AM, Bill Bejeck < >>>>>>>>> bbej...@gmail.com >>>>>>>>>>> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi Florian, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks for the KIP. I think being able to add more >>>> context >>>>>>>>> to >>>>>>>>>>> the >>>>>>>>>>>>>>>>>> processor names would be useful. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I like the idea of adding a "withProcessorName" to >>>> Produced, >>>>>>>>>>>> Consumed >>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>> Joined. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> But instead of adding the "Processed" parameter to a >>> large >>>>>>>>>>>> percentage >>>>>>>>>>>>>>>> of >>>>>>>>>>>>>>>>>> the methods, which would result in overloaded methods >>>> (which >>>>>>>>> we >>>>>>>>>>>>>>> removed >>>>>>>>>>>>>>>>>> quite a bit with KIP-182) what do you think of adding >> a >>>>>>>>> method >>>>>>>>>>>>>>>>>> to the AbstractStream class "withName(String >>>>>> processorName)"? >>>>>>>>>> BTW >>>>>>>>>>>> I"m >>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>> married to the method name, it's the best I can do off >>> the >>>>>>>>> top >>>>>>>>>> of >>>>>>>>>>>> my >>>>>>>>>>>>>>>>> head. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> For the methods that return void, we'd have to add a >>>>>>>>> parameter, >>>>>>>>>>> but >>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>> would at least cut down on the number of overloaded >>>> methods >>>>>>>>> in >>>>>>>>>>> the >>>>>>>>>>>>>>> API. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Just my 2 cents. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>> Bill >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Sun, May 27, 2018 at 4:13 PM, Florian Hussonnois < >>>>>>>>>>>>>>>>> fhussonn...@gmail.com >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I would like to start a new discussion on following >>> KIP : >>>>>>>>>>>>>>>>>>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>>>>>>>>>>>>>>> >>>>>>>>>> >> 307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> This is still a draft. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Looking forward for your feedback. >>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>> Florian HUSSONNOIS >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>> -- Guozhang >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>> Florian HUSSONNOIS >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Florian HUSSONNOIS >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> -- Guozhang >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Florian HUSSONNOIS >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Florian HUSSONNOIS >>>>>> >>>>> >>>>> >>>>> -- >>>>> -- Guozhang >>>>> >>>> >>>> >>>> -- >>>> -- Guozhang >>>> >>> >> >> >> -- >> -- Guozhang >> > >
signature.asc
Description: OpenPGP digital signature