I had one meta comment on the PR: https://github.com/apache/kafka/pull/5909#discussion_r240447153
On Mon, Dec 10, 2018 at 5:22 PM John Roesler <j...@confluent.io> wrote: > Hi Florian, > > I hope it's ok if I ask a few questions at this late stage... > > Comment 1 ====== > > It seems like the proposal is to add a new "Named" interface that is > intended to be mixed in with the existing API objects at various points. > > Just to preface some of my comments, it looks like your KIP was created > quite a while ago, so the API may have changed somewhat since you started. > > As I see the API, there are a few different kinds of DSL method arguments: > * functions: things like Initializer, Aggregator, ValueJoiner, > ForEachAction... All of these are essentially Streams-flavored Function > interfaces with different arities, type bounds, and semantics. > * config objects: things like Produced, Consumed, Joined, Grouped... These > are containers for configurations, where the target of the configuration is > the operation itself > * raw configurations: things like a raw topic-name string and Materialized: > These are configurations for operations that have no config object, and for > various reasons, we didn't make one. The distinguishing feature is that the > target of the configuration is not the operation itself, but some aspect of > it. For example, in Materialized, we are not setting the caching behavior > of, for example, an aggregation; we're setting the caching behavior of a > materialized state store attached to the aggregation. > > It seems like choosing to mix the Named interface in with the functions has > a couple of unfortunate side-effects: > * Aggregator is not the only function passed to any of the relevant > aggregate methods, so it seems a little arbitrary to pick that function > over Initializer or Merger. > * As you noted, branch() takes an array of Predicate, so we just ignore the > provided name(s), even though Predicate names are used elsewhere. > * Not all things that we want to name have function arguments, notably > source and sink, so we'd switch paradigms and use the config object > instead. > * Adding an extra method to the function interfaces means that those are no > longer SAM interfaces. You proposed to add a default implementation, so we > could still pass a lambda if we don't want to set the name, but if we *do* > want to set the name, we can no longer use lambdas. > > I think the obvious other choice would be to mix Named in with the config > objects instead, but this has one main downside of its own... > * not every operator we wish to name has a config object. I don't know if > everyone involved is comfortable with adding a config object to every > operator that's missing one. > > Personally, I favor moving toward a more consistent state that's forward > compatible with any further changes we wish to make. I *think* that giving > every operator two forms (one with no config and one with a config object) > would be such an API. > > Comment 2 ========= > > Finally, just a minor comment: the static method in Named wouldn't work > properly as defined. Assuming that we mix Named in with Produced, for > example, we'd need to be able to use it like: > > kStream.to("out", Produced.with("myOut")) > This doesn't work because with() returns a Named, but we need a Produced. > > We can pull off a builder method in the interface, but not a static method. > To define a builder method in the interface that returns an instance of the > concrete subtype, you have to use the "curiously recurring generic" > pattern. > > It would look like: > > public interface Named<N extends Named<N>> { > String name(); > N withName(String name); > } > > You can see where the name of the pattern comes from ;) > An implementation would then look like: > > public class Produced implements Named<Produced> { > String name() { return name; } > Produced withName(final String name) { this.name = name; return this; } > } > > Note that the generic parameter gets filled in properly in the implementing > class, so that you get the right return type out. > > It doesn't work at all with a static factory method at the interface level, > so it would be up to Produced to define a static factory if it wants to > present one. > > ====== > > Those are my two feedbacks! > > I hope you find this helpful, rather than frustrating. I'm sorry I didn't > get a chance to comment sooner. > > Thanks for the KIP, I think it will be much nicer to be able to name the > processor nodes. > > -John > > On Tue, Nov 27, 2018 at 6:34 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > Hi Florian, > > > > I've made a pass over the PR. There are some comments that are related to > > the function names which may be affecting the KIP wiki page, but overall > I > > think it looks good already. > > > > > > Guozhang > > > > > > On Fri, Nov 16, 2018 at 4:21 PM Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > Thanks Florian! I will take a look at the PR. > > > > > > > > > > > > On Mon, Nov 12, 2018 at 2:44 PM Florian Hussonnois < > > fhussonn...@gmail.com> > > > wrote: > > > > > >> Hi Matthias, > > >> > > >> Sorry I was absent for a while. I have started a new PR for this KIP. > It > > >> is > > >> still in progress for now. I'm working on it. > > >> https://github.com/apache/kafka/pull/5909 > > >> > > >> Le ven. 19 oct. 2018 à 20:13, Matthias J. Sax <matth...@confluent.io> > a > > >> écrit : > > >> > > >> > What is the status of this KIP? > > >> > > > >> > -Matthias > > >> > > > >> > On 7/19/18 5:17 PM, Guozhang Wang wrote: > > >> > > Hello Florian, > > >> > > > > >> > > Sorry for being late... Found myself keep apologizing for late > > replies > > >> > > these days. But I do want to push this KIP's progress forward as I > > >> see it > > >> > > very important and helpful feature for extensibility. > > >> > > > > >> > > About the exceptions, I've gone through them and hopefully it is > an > > >> > > exhaustive list: > > >> > > > > >> > > 1. KTable#toStream() > > >> > > 2. KStream#merge(KStream) > > >> > > 3. KStream#process() / transform() / transformValues() > > >> > > 4. KGroupedTable / KGroupedStream#count() > > >> > > > > >> > > > > >> > > Here's my reasoning: > > >> > > > > >> > > * It is okay not letting users to override the name for 1/2, since > > >> they > > >> > are > > >> > > too trivial to be useful for debugging, plus their processor names > > >> would > > >> > > not determine any related topic / store names. > > >> > > * For 3, I'd vote for adding overloaded functions with Named. > > >> > > * For 4, if users really want to name the processor she can call > > >> > > aggregate() instead, so I think it is okay to skip this case. > > >> > > > > >> > > > > >> > > Guozhang > > >> > > > > >> > > > > >> > > > > >> > > On Fri, Jul 6, 2018 at 3:06 PM, Florian Hussonnois < > > >> > fhussonn...@gmail.com> > > >> > > wrote: > > >> > > > > >> > >> Hi, > > >> > >> > > >> > >> The option #3 seems to be a good alternative and I find the API > > more > > >> > >> elegant (thanks John). > > >> > >> > > >> > >> But, we still have the need to overload some methods either > because > > >> > they do > > >> > >> not accept an action instance or because they are translated to > > >> multiple > > >> > >> processors. > > >> > >> > > >> > >> For example, this is the case for methods branch() and merge(). > We > > >> could > > >> > >> introduce a new interface Named (or maybe a different name ?) > with > > a > > >> > method > > >> > >> name(). All action interfaces could extend this one to implement > > the > > >> > option > > >> > >> 3). > > >> > >> This would result by having the following overloads : > > >> > >> > > >> > >> Stream<K, V> merge(final Named name, final KStream<K, V> stream); > > >> > >> KStream<K, V>[] branch(final Named name, final Predicate<? super > > K, ? > > >> > super > > >> > >> V>... predicates) > > >> > >> > > >> > >> N.B : The list above is not exhaustive > > >> > >> > > >> > >> --------- > > >> > >> user's code will become : > > >> > >> > > >> > >> KStream<String, Integer> stream = builder.stream("test"); > > >> > >> KStream<String, Integer>[] branches = > > >> > >> stream.branch(Named.with("BRANCH-STREAM-ON-VALUE"), > > >> > >> Predicate.named("STREAM-PAIR-VALUE", (k, v) -> v > % > > 2 > > >> == > > >> > >> 0), > > >> > >> Predicate.named("STREAM-IMPAIR-VALUE", (k, v) -> > v > > % > > >> 2 > > >> > != > > >> > >> 0)); > > >> > >> > > >> > >> branches[0].to("pair"); > > >> > >> branches[1].to("impair"); > > >> > >> --------- > > >> > >> > > >> > >> This is a mix of the options 3) and 1) > > >> > >> > > >> > >> Le ven. 6 juil. 2018 à 22:58, Guozhang Wang <wangg...@gmail.com> > a > > >> > écrit : > > >> > >> > > >> > >>> Hi folks, just to summarize the options we have so far: > > >> > >>> > > >> > >>> 1) Add a new "as" for KTable / KStream, plus adding new fields > for > > >> > >>> operators-returns-void control objects (the current wiki's > > >> proposal). > > >> > >>> > > >> > >>> Pros: no more overloads. > > >> > >>> Cons: a bit departing with the current high-level API design of > > the > > >> > DSL, > > >> > >>> plus, the inconsistency between operators-returns-void and > > >> > >>> operators-not-return-voids. > > >> > >>> > > >> > >>> 2) Add overloaded functions for all operators, that accepts a > new > > >> > control > > >> > >>> object "Described". > > >> > >>> > > >> > >>> Pros: consistent with current APIs. > > >> > >>> Cons: lots of overloaded functions to add. > > >> > >>> > > >> > >>> 3) Add another default function in the interface (thank you J8!) > > as > > >> > John > > >> > >>> proposed. > > >> > >>> > > >> > >>> Pros: no overloaded functions, no "Described". > > >> > >>> Cons: do we lose lambda functions really (seems not if we > provide > > a > > >> > >> "named" > > >> > >>> for each func)? Plus "Described" may be more extensible than a > > >> single > > >> > >>> `String`. > > >> > >>> > > >> > >>> > > >> > >>> My principle of considering which one is better depends > primarily > > on > > >> > "how > > >> > >>> to make advanced users easily use the additional API, while > > keeping > > >> it > > >> > >>> hidden from normal users who do not care at all". For that > > purpose I > > >> > >> think > > >> > >>> 3) > 1) > 2). > > >> > >>> > > >> > >>> One caveat though, is that changing the interface would not be > > >> > >>> binary-compatible though source-compatible, right? I.e. users > need > > >> to > > >> > >>> recompile their code though no changes needed. > > >> > >>> > > >> > >>> > > >> > >>> > > >> > >>> Another note: for 3), if we really want to keep extensibility of > > >> > >> Described > > >> > >>> we could do sth. like: > > >> > >>> > > >> > >>> --------- > > >> > >>> > > >> > >>> public interface Predicate<K, V> { > > >> > >>> // existing method > > >> > >>> boolean test(final K key, final V value); > > >> > >>> > > >> > >>> // new default method adds the ability to name the predicate > > >> > >>> default Described described() { > > >> > >>> return new Described(null); > > >> > >>> } > > >> > >>> } > > >> > >>> > > >> > >>> ---------- > > >> > >>> > > >> > >>> where user's code becomes: > > >> > >>> > > >> > >>> stream.filter(named("key", (k, v) -> true)); // note `named` > now > > >> just > > >> > >>> sets a Described("key") in "described()". > > >> > >>> > > >> > >>> stream.filter(described(Described.as("key", /* any other fancy > > >> > >> parameters > > >> > >>> in the future*/), (k, v) -> true)); > > >> > >>> ---------- > > >> > >>> > > >> > >>> > > >> > >>> I feel it is not much likely that we'd need to extend it further > > in > > >> the > > >> > >>> future, so just a `String` would be good enough. But just > listing > > >> all > > >> > >>> possibilities here. > > >> > >>> > > >> > >>> > > >> > >>> > > >> > >>> Guozhang > > >> > >>> > > >> > >>> > > >> > >>> > > >> > >>> > > >> > >>> > > >> > >>> > > >> > >>> On Fri, Jul 6, 2018 at 8:19 AM, John Roesler <j...@confluent.io > > > > >> > wrote: > > >> > >>> > > >> > >>>> Hi Florian, > > >> > >>>> > > >> > >>>> Sorry I'm late to the party, but I missed the message > originally. > > >> > >>>> > > >> > >>>> Regarding the names, it's probably a good idea to stick to the > > same > > >> > >>>> character set we're currently using: letters, numbers, and > > hyphens. > > >> > The > > >> > >>>> names are used in Kafka topics, files and folders, and RocksDB > > >> > >> databases, > > >> > >>>> and we also need them to work with the file systems of Windows, > > >> Linux, > > >> > >>> and > > >> > >>>> MacOS. My opinion is that with a situation like that, it's > better > > >> to > > >> > be > > >> > >>>> conservative. It might also be a good idea to impose an upper > > >> limit on > > >> > >>> name > > >> > >>>> length to avoid running afoul of any of those systems. > > >> > >>>> > > >> > >>>> --- > > >> > >>>> > > >> > >>>> It seems like there's a small debate between 1) adding a new > > >> method to > > >> > >>>> KStream (and maybe KTable) to modify its name after the fact, > or > > 2) > > >> > >>>> piggy-backing on the config objects where they exist and adding > > one > > >> > >> where > > >> > >>>> they don't. To me, #2 is the better alternative even though it > > >> > produces > > >> > >>>> more overloads and may be a bit awkward in places. > > >> > >>>> > > >> > >>>> The reason is simply that #1 is a high-level departure from the > > >> > >>>> graph-building paradigm we're using in the DSL. Consider: > > >> > >>>> > > >> > >>>> Graph.node1(config).node2(config) > > >> > >>>> > > >> > >>>> vs > > >> > >>>> > > >> > >>>> Graph.node1().config().node2().config() > > >> > >>>> > > >> > >>>> We could have done either, but we picked the former. I think > it's > > >> > >>> probably > > >> > >>>> a good goal to try and stick to it so that developers can > develop > > >> and > > >> > >>> rely > > >> > >>>> on their instincts for how the DSL will behave. > > >> > >>>> > > >> > >>>> I do want to present one alternative to adding new config > > objects: > > >> we > > >> > >> can > > >> > >>>> just add a "name()" method to all our "action" interfaces. For > > >> > example, > > >> > >>>> I'll demonstrate how we can add a "name" to Predicate and then > > use > > >> it > > >> > >> to > > >> > >>>> name a "KStream#filter" DSL operator: > > >> > >>>> > > >> > >>>> public interface Predicate<K, V> { > > >> > >>>> // existing method > > >> > >>>> boolean test(final K key, final V value); > > >> > >>>> > > >> > >>>> // new default method adds the ability to name the > predicate > > >> > >>>> default String name() { > > >> > >>>> return null; > > >> > >>>> } > > >> > >>>> > > >> > >>>> // new static factory method adds the ability to wrap > lambda > > >> > >>> predicates > > >> > >>>> with a named predicate > > >> > >>>> static <K, V> Predicate<K, V> named(final String name, > final > > >> > >>>> Predicate<K, V> predicate) { > > >> > >>>> return new Predicate<K, V>() { > > >> > >>>> @Override > > >> > >>>> public boolean test(final K key, final V value) { > > >> > >>>> return predicate.test(key, value); > > >> > >>>> } > > >> > >>>> > > >> > >>>> @Override > > >> > >>>> public String name() { > > >> > >>>> return name; > > >> > >>>> } > > >> > >>>> }; > > >> > >>>> } > > >> > >>>> } > > >> > >>>> > > >> > >>>> Then, here's how it would look to use it: > > >> > >>>> > > >> > >>>> // Anonymous predicates continue to work just fine > > >> > >>>> stream.filter((k, v) -> true); > > >> > >>>> > > >> > >>>> // Devs can swap in a Predicate that implements the name() > > method. > > >> > >>>> stream.filter(new Predicate<Object, Object>() { > > >> > >>>> @Override > > >> > >>>> public boolean test(final Object key, final Object value) { > > >> > >>>> return true; > > >> > >>>> } > > >> > >>>> > > >> > >>>> @Override > > >> > >>>> public String name() { > > >> > >>>> return "hey"; > > >> > >>>> } > > >> > >>>> }); > > >> > >>>> > > >> > >>>> // Or they can wrap their existing lambda using the static > > factory > > >> > >> method > > >> > >>>> stream.filter(named("key", (k, v) -> true)); > > >> > >>>> > > >> > >>>> Just a thought. > > >> > >>>> > > >> > >>>> Overall, I think it's really valuable to be able to name the > > >> > >> processors, > > >> > >>>> for all the reasons you mentioned in the KIP. So thank you for > > >> > >>> introducing > > >> > >>>> this! > > >> > >>>> > > >> > >>>> Thanks, > > >> > >>>> -John > > >> > >>>> > > >> > >>>> On Thu, Jul 5, 2018 at 4:53 PM Florian Hussonnois < > > >> > >> fhussonn...@gmail.com > > >> > >>>> > > >> > >>>> wrote: > > >> > >>>> > > >> > >>>>> Hi, thank you very much for all you suggestions. I've started > to > > >> > >> update > > >> > >>>> the > > >> > >>>>> KIP ( > > >> > >>>>> > > >> > >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > >> > >>>> 307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL > > >> > >>>>> ). > > >> > >>>>> Also, I propose to rename the Processed class into Described - > > >> this > > >> > >>> will > > >> > >>>> be > > >> > >>>>> more meaningful (but this is just a detail). > > >> > >>>>> > > >> > >>>>> I'm OK to not enforcing uppercase for specific names but > should > > we > > >> > >>> allow > > >> > >>>>> arbitrary names with whitespaces for example ? Currently, I > > can't > > >> > >> tell > > >> > >>> if > > >> > >>>>> this can lead to some side effects ? > > >> > >>>>> > > >> > >>>>> Le lun. 11 juin 2018 à 01:31, Matthias J. Sax < > > >> matth...@confluent.io > > >> > >>> > > >> > >>> a > > >> > >>>>> écrit : > > >> > >>>>> > > >> > >>>>>> Just catching up on this thread. > > >> > >>>>>> > > >> > >>>>>> I like the general idea. Couple of comments: > > >> > >>>>>> > > >> > >>>>>> - I think that adding `Processed` (or maybe a different > name?) > > >> is > > >> > >> a > > >> > >>>>>> valid proposal for stateless operators that only have a > single > > >> > >>> overload > > >> > >>>>>> atm. It would align with the overall API design. > > >> > >>>>>> > > >> > >>>>>> - for all methods with multiple existing overloads, we can > > >> > >> consider > > >> > >>> to > > >> > >>>>>> extend `Consumed`, `Produced`, `Materialized` etc to take an > > >> > >>> additional > > >> > >>>>>> processor name (not sure atm how elegant this is; we would > need > > >> to > > >> > >>>>>> "play" with the API a little bit; the advantage would be, > that > > we > > >> > >> do > > >> > >>>> not > > >> > >>>>>> add more overloads what seems to be key for this KIP) > > >> > >>>>>> > > >> > >>>>>> - operators return void: while I agree that the "name first" > > >> > >>> chaining > > >> > >>>>>> idea is not very intuitive, it might still work, if we name > the > > >> > >>> method > > >> > >>>>>> correctly (again, we would need to "play" with the API a > little > > >> bit > > >> > >>> to > > >> > >>>>> see) > > >> > >>>>>> > > >> > >>>>>> - for DSL operators that are translated to multiple nodes: > it > > >> > >> might > > >> > >>>>>> make sense to use the specified operator name as prefix and > add > > >> > >>>>>> reasonable suffixes. For example, a join translates into 5 > > >> > >> operators > > >> > >>>>>> that could be name "name-left-store-processor", > > >> > >>>>>> "name-left-join-processor", "name-right-store-processor", > > >> > >>>>>> "name-right-join-processor", and "name-join-merge-processor" > > (or > > >> > >>>>>> similar). Maybe just using numbers might also work. > > >> > >>>>>> > > >> > >>>>>> - I think, we should strip the number suffixes if a user > > >> provides > > >> > >>>> names > > >> > >>>>>> > > >> > >>>>>> - enforcing upper case seems to be tricky: for example, we > do > > >> not > > >> > >>>>>> enforce upper case for store names and we cannot easily > change > > it > > >> > >> as > > >> > >>> it > > >> > >>>>>> would break compatibility -- thus, for consistency reasons we > > >> might > > >> > >>> not > > >> > >>>>>> want to do this > > >> > >>>>>> > > >> > >>>>>> - for better understand of the impact of the KIP, it would > be > > >> > >> quite > > >> > >>>>>> helpful if you would list all method names that are affected > in > > >> the > > >> > >>> KIP > > >> > >>>>>> (ie, list all newly added overloads) > > >> > >>>>>> > > >> > >>>>>> > > >> > >>>>>> -Matthias > > >> > >>>>>> > > >> > >>>>>> > > >> > >>>>>> On 5/31/18 6:40 PM, Guozhang Wang wrote: > > >> > >>>>>>> Hi Florian, > > >> > >>>>>>> > > >> > >>>>>>> Re 1: I think changing the KStreamImpl / KTableImpl to allow > > >> > >>>> modifying > > >> > >>>>>> the > > >> > >>>>>>> processor name after the operator is fine as long as we do > the > > >> > >>> check > > >> > >>>>>> again > > >> > >>>>>>> when modifying that. In fact, we are having some topology > > >> > >>>> optimization > > >> > >>>>>>> going on which may modify processor names in the final > > topology > > >> > >>>>> anyways ( > > >> > >>>>>>> https://github.com/apache/kafka/pull/4983). Semantically I > > >> think > > >> > >>> it > > >> > >>>> is > > >> > >>>>>>> easier to understand to developers than "deciding the > > processor > > >> > >>> name > > >> > >>>>> for > > >> > >>>>>>> the next operator". > > >> > >>>>>>> > > >> > >>>>>>> Re 2: Yeah I'm thinking that for operators that translates > to > > >> > >>>> multiple > > >> > >>>>>>> processor names, we can still use the provided "hint" to > name > > >> the > > >> > >>>>>> processor > > >> > >>>>>>> names, e.g. for Joins we can name them as `join-foo-this` > and > > >> > >>>>>>> `join-foo-that` etc if user calls `as("foo")`. > > >> > >>>>>>> > > >> > >>>>>>> Re 3: The motivation I had about removing the suffix is that > > it > > >> > >> has > > >> > >>>>> huge > > >> > >>>>>>> restrictions on topology compatibilities: consider if user > > code > > >> > >>>> added a > > >> > >>>>>> new > > >> > >>>>>>> operator, or library does some optimization to remove some > > >> > >>> operators, > > >> > >>>>> the > > >> > >>>>>>> suffix indexing may be changed for a large amount of the > > >> > >> processor > > >> > >>>>> names: > > >> > >>>>>>> this will in turn change the internal state store names, as > > well > > >> > >> as > > >> > >>>>>>> internal topic names as well, making the new application > > >> topology > > >> > >>> to > > >> > >>>> be > > >> > >>>>>>> incompatible with the ones. One rationale I had about this > KIP > > >> is > > >> > >>>> that > > >> > >>>>>>> aligned this effort, moving forward we can allow users to > > >> > >> customize > > >> > >>>>>>> internal names so that they can still be reused even with > > >> > >> topology > > >> > >>>>>> changes > > >> > >>>>>>> (e.g. KIP-230), so I think removing the suffix index would > be > > >> > >> more > > >> > >>>>>>> applicable in the long run. > > >> > >>>>>>> > > >> > >>>>>>> > > >> > >>>>>>> > > >> > >>>>>>> Guozhang > > >> > >>>>>>> > > >> > >>>>>>> > > >> > >>>>>>> > > >> > >>>>>>> > > >> > >>>>>>> > > >> > >>>>>>> On Thu, May 31, 2018 at 3:08 PM, Florian Hussonnois < > > >> > >>>>>> fhussonn...@gmail.com> > > >> > >>>>>>> wrote: > > >> > >>>>>>> > > >> > >>>>>>>> Hi , > > >> > >>>>>>>> Thank you very much for your feedback. > > >> > >>>>>>>> > > >> > >>>>>>>> 1/ > > >> > >>>>>>>> I agree that overloading most of the methods with a > Processed > > >> is > > >> > >>> not > > >> > >>>>>> ideal. > > >> > >>>>>>>> I've started modifying the KStream API and I got to the > same > > >> > >>>>> conclusion. > > >> > >>>>>>>> Also ading a new method directly to KStreamImpl and > > KTableImpl > > >> > >>>> classes > > >> > >>>>>>>> seems to be a better option. > > >> > >>>>>>>> > > >> > >>>>>>>> However a processor name cannot be redefined after calling > an > > >> > >>>> operator > > >> > >>>>>> (or > > >> > >>>>>>>> maybe I miss something in the code). > > >> > >>>>>>>> From my understanding, this will only set the KStream name > > >> > >>> property > > >> > >>>>> not > > >> > >>>>>> the > > >> > >>>>>>>> processor name previsouly added to the topology builder - > > >> > >> leading > > >> > >>> to > > >> > >>>>>>>> InvalidTopology exception. > > >> > >>>>>>>> > > >> > >>>>>>>> So the new method should actually defines the name of the > > next > > >> > >>>>>> processor : > > >> > >>>>>>>> Below is an example : > > >> > >>>>>>>> > > >> > >>>>>>>> *stream.as <http://stream.as > > >> > >>>> (Processed.name("MAPPE_TO_UPPERCASE")* > > >> > >>>>>>>> * .map( (k, v) -> KeyValue.pair(k, > > v.toUpperCase()))* > > >> > >>>>>>>> > > >> > >>>>>>>> I think this approach could solve the cases for methods > > >> > >> returning > > >> > >>>>> void ? > > >> > >>>>>>>> > > >> > >>>>>>>> Regarding this new method we have two possible > > implementations > > >> : > > >> > >>>>>>>> > > >> > >>>>>>>> 1. Adding a method like : withName(String processorName) > > >> > >>>>>>>> 2. or adding a method accepting an Processed object : > > >> > >>>>> as(Processed). > > >> > >>>>>>>> > > >> > >>>>>>>> I think solution 2. is preferable as the Processed class > > could > > >> > >> be > > >> > >>>>>> enriched > > >> > >>>>>>>> further (in futur). > > >> > >>>>>>>> > > >> > >>>>>>>> 2/ > > >> > >>>>>>>> As Guozhang said some operators add internal processors. > > >> > >>>>>>>> For example the branch() method create one KStreamBranch > > >> > >> processor > > >> > >>>> to > > >> > >>>>>> route > > >> > >>>>>>>> records and one KStreamPassThrough processor for each > branch. > > >> > >>>>>>>> In that situation only the parent processor can be named. > For > > >> > >>>> children > > >> > >>>>>>>> processors we could keep the current behaviour that add a > > >> suffix > > >> > >>>> (i.e > > >> > >>>>>>>> KSTREAM-BRANCHCHILD-) > > >> > >>>>>>>> > > >> > >>>>>>>> This also the case for the join() method that result to > > adding > > >> > >>>>> multiple > > >> > >>>>>>>> processors to the topology (windowing, left/right joins > and a > > >> > >>> merge > > >> > >>>>>>>> processor). > > >> > >>>>>>>> I think, like for the branch method users could only > define a > > >> > >>>>> processor > > >> > >>>>>>>> name prefix. > > >> > >>>>>>>> > > >> > >>>>>>>> 3/ > > >> > >>>>>>>> I think we should still added a suffix like "-0000000000" > to > > >> > >>>>> processor > > >> > >>>>>>>> name and enforce uppercases as this will keep some > > consistency > > >> > >>> with > > >> > >>>>> the > > >> > >>>>>>>> ones generated by the API. > > >> > >>>>>>>> > > >> > >>>>>>>> 4/ > > >> > >>>>>>>> Yes, the KTable interface should be modified like KStream > to > > >> > >> allow > > >> > >>>>>> custom > > >> > >>>>>>>> processor names definition. > > >> > >>>>>>>> > > >> > >>>>>>>> Thanks, > > >> > >>>>>>>> > > >> > >>>>>>>> > > >> > >>>>>>>> Le jeu. 31 mai 2018 à 19:18, Damian Guy < > > damian....@gmail.com> > > >> > >> a > > >> > >>>>> écrit > > >> > >>>>>> : > > >> > >>>>>>>> > > >> > >>>>>>>>> Hi Florian, > > >> > >>>>>>>>> > > >> > >>>>>>>>> Thanks for the KIP. What about KTable and other DSL > > >> interfaces? > > >> > >>>> Will > > >> > >>>>>> they > > >> > >>>>>>>>> not want to be able to do the same thing? > > >> > >>>>>>>>> It would be good to see a complete set of the public API > > >> > >> changes. > > >> > >>>>>>>>> > > >> > >>>>>>>>> Cheers, > > >> > >>>>>>>>> Damian > > >> > >>>>>>>>> > > >> > >>>>>>>>> On Wed, 30 May 2018 at 19:45 Guozhang Wang < > > >> wangg...@gmail.com > > >> > >>> > > >> > >>>>> wrote: > > >> > >>>>>>>>> > > >> > >>>>>>>>>> Hello Florian, > > >> > >>>>>>>>>> > > >> > >>>>>>>>>> Thanks for the KIP. I have some meta feedbacks on the > > >> > >> proposal: > > >> > >>>>>>>>>> > > >> > >>>>>>>>>> 1. You mentioned that this `Processed` object will be > added > > >> > >> to a > > >> > >>>> new > > >> > >>>>>>>>>> overloaded variant of all the stateless operators, what > > about > > >> > >>> the > > >> > >>>>>>>>> stateful > > >> > >>>>>>>>>> operators? Would like to hear your opinions if you have > > >> > >> thought > > >> > >>>>> about > > >> > >>>>>>>>> that: > > >> > >>>>>>>>>> note for stateful operators they will usually be mapped > to > > >> > >>>> multiple > > >> > >>>>>>>>>> processor node names, so we probably need to come up with > > >> some > > >> > >>>> ways > > >> > >>>>> to > > >> > >>>>>>>>>> define all their names. > > >> > >>>>>>>>>> > > >> > >>>>>>>>>> 2. I share the same concern with Bill as for adding lots > of > > >> > >> new > > >> > >>>>>>>> overload > > >> > >>>>>>>>>> functions into the stateless operators, as we have just > > spent > > >> > >>>> quite > > >> > >>>>>>>> some > > >> > >>>>>>>>>> effort in trimming them since 1.0.0 release. If the goal > is > > >> to > > >> > >>>> just > > >> > >>>>>>>>> provide > > >> > >>>>>>>>>> some "hints" on the generated processor node names, not > > >> > >> strictly > > >> > >>>>>>>>> enforcing > > >> > >>>>>>>>>> the exact names that to be generated, then how about we > > just > > >> > >>> add a > > >> > >>>>> new > > >> > >>>>>>>>>> function to `KStream` and `KTable` classes like: > > >> > >>> "as(Processed)", > > >> > >>>>> with > > >> > >>>>>>>>> the > > >> > >>>>>>>>>> semantics as "the latest operators that generate this > > KStream > > >> > >> / > > >> > >>>>> KTable > > >> > >>>>>>>>> will > > >> > >>>>>>>>>> be named accordingly to this hint". > > >> > >>>>>>>>>> > > >> > >>>>>>>>>> The only caveat, is that for all operators like > > `KStream#to` > > >> > >> and > > >> > >>>>>>>>>> `KStream#print` that returns void, this alternative would > > not > > >> > >>>> work. > > >> > >>>>>> But > > >> > >>>>>>>>> for > > >> > >>>>>>>>>> the current operators: > > >> > >>>>>>>>>> > > >> > >>>>>>>>>> a. KStream#print, > > >> > >>>>>>>>>> b. KStream#foreach, > > >> > >>>>>>>>>> c. KStream#to, > > >> > >>>>>>>>>> d. KStream#process > > >> > >>>>>>>>>> > > >> > >>>>>>>>>> I personally felt that except `KStream#process` users > would > > >> > >> not > > >> > >>>>>> usually > > >> > >>>>>>>>>> bother to override their names, and for `KStream#process` > > we > > >> > >>> could > > >> > >>>>> add > > >> > >>>>>>>> an > > >> > >>>>>>>>>> overload variant with the additional Processed object. > > >> > >>>>>>>>>> > > >> > >>>>>>>>>> > > >> > >>>>>>>>>> 3. In your example, the processor names are still added > > with > > >> a > > >> > >>>>> suffix > > >> > >>>>>>>>> like > > >> > >>>>>>>>>> " > > >> > >>>>>>>>>> -0000000000", is this intentional? If yes, why (I thought > > >> with > > >> > >>>> user > > >> > >>>>>>>>>> specified processor name hints we will not add suffix to > > >> > >>>> distinguish > > >> > >>>>>>>>>> different nodes of the same type any more)? > > >> > >>>>>>>>>> > > >> > >>>>>>>>>> > > >> > >>>>>>>>>> Guozhang > > >> > >>>>>>>>>> > > >> > >>>>>>>>>> > > >> > >>>>>>>>>> On Tue, May 29, 2018 at 6:47 AM, Bill Bejeck < > > >> > >> bbej...@gmail.com > > >> > >>>> > > >> > >>>>>>>> wrote: > > >> > >>>>>>>>>> > > >> > >>>>>>>>>>> Hi Florian, > > >> > >>>>>>>>>>> > > >> > >>>>>>>>>>> Thanks for the KIP. I think being able to add more > > context > > >> > >> to > > >> > >>>> the > > >> > >>>>>>>>>>> processor names would be useful. > > >> > >>>>>>>>>>> > > >> > >>>>>>>>>>> I like the idea of adding a "withProcessorName" to > > Produced, > > >> > >>>>> Consumed > > >> > >>>>>>>>> and > > >> > >>>>>>>>>>> Joined. > > >> > >>>>>>>>>>> > > >> > >>>>>>>>>>> But instead of adding the "Processed" parameter to a > large > > >> > >>>>> percentage > > >> > >>>>>>>>> of > > >> > >>>>>>>>>>> the methods, which would result in overloaded methods > > (which > > >> > >> we > > >> > >>>>>>>> removed > > >> > >>>>>>>>>>> quite a bit with KIP-182) what do you think of adding a > > >> > >> method > > >> > >>>>>>>>>>> to the AbstractStream class "withName(String > > >> processorName)"? > > >> > >>> BTW > > >> > >>>>> I"m > > >> > >>>>>>>>> not > > >> > >>>>>>>>>>> married to the method name, it's the best I can do off > the > > >> > >> top > > >> > >>> of > > >> > >>>>> my > > >> > >>>>>>>>>> head. > > >> > >>>>>>>>>>> > > >> > >>>>>>>>>>> For the methods that return void, we'd have to add a > > >> > >> parameter, > > >> > >>>> but > > >> > >>>>>>>>> that > > >> > >>>>>>>>>>> would at least cut down on the number of overloaded > > methods > > >> > >> in > > >> > >>>> the > > >> > >>>>>>>> API. > > >> > >>>>>>>>>>> > > >> > >>>>>>>>>>> Just my 2 cents. > > >> > >>>>>>>>>>> > > >> > >>>>>>>>>>> Thanks, > > >> > >>>>>>>>>>> Bill > > >> > >>>>>>>>>>> > > >> > >>>>>>>>>>> On Sun, May 27, 2018 at 4:13 PM, Florian Hussonnois < > > >> > >>>>>>>>>> fhussonn...@gmail.com > > >> > >>>>>>>>>>>> > > >> > >>>>>>>>>>> wrote: > > >> > >>>>>>>>>>> > > >> > >>>>>>>>>>>> Hi, > > >> > >>>>>>>>>>>> > > >> > >>>>>>>>>>>> I would like to start a new discussion on following > KIP : > > >> > >>>>>>>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > >> > >>>>>>>>>>>> > > >> > >>> 307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL > > >> > >>>>>>>>>>>> > > >> > >>>>>>>>>>>> This is still a draft. > > >> > >>>>>>>>>>>> > > >> > >>>>>>>>>>>> Looking forward for your feedback. > > >> > >>>>>>>>>>>> -- > > >> > >>>>>>>>>>>> Florian HUSSONNOIS > > >> > >>>>>>>>>>>> > > >> > >>>>>>>>>>> > > >> > >>>>>>>>>> > > >> > >>>>>>>>>> > > >> > >>>>>>>>>> > > >> > >>>>>>>>>> -- > > >> > >>>>>>>>>> -- Guozhang > > >> > >>>>>>>>>> > > >> > >>>>>>>>> > > >> > >>>>>>>> > > >> > >>>>>>>> > > >> > >>>>>>>> -- > > >> > >>>>>>>> Florian HUSSONNOIS > > >> > >>>>>>>> > > >> > >>>>>>> > > >> > >>>>>>> > > >> > >>>>>>> > > >> > >>>>>> > > >> > >>>>>> > > >> > >>>>> > > >> > >>>>> -- > > >> > >>>>> Florian HUSSONNOIS > > >> > >>>>> > > >> > >>>> > > >> > >>> > > >> > >>> > > >> > >>> > > >> > >>> -- > > >> > >>> -- Guozhang > > >> > >>> > > >> > >> > > >> > >> > > >> > >> -- > > >> > >> Florian HUSSONNOIS > > >> > >> > > >> > > > > >> > > > > >> > > > > >> > > > >> > > > >> > > >> -- > > >> Florian HUSSONNOIS > > >> > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang