I have concerns about the latest proposal from Guozhang. However, as John said it's beyond the scope of this KIP and thus, I don't go into details. I agree thought, that the current "transformer APIs" are not ideal and could be improved.
An orthogonal though is that we should split the current `ProcessorContext` into multiple interfaces. Atm, the context can be use to: - access metadata - schedule punctuation - get state stores - register state stores - forward output data (1) registering state stores is only required if one implements a custom store, but not for a regular `Processor` implementation -- hence, it's a leaking abstraction (2) for `ValueTransformer` and `flatValueTransformer` we don't want to allow forwarding key-value pairs, and hence need to throw an RTE for this case atm (3) Why do we expose `keySerde()`, `valueSerde()`, and `stateDir()` explicitly? We have already `appConfigs()` to allow users to access the configuration. Overall, it seems that `ProcessorContext` is rather convoluted. Because, we add a new `Processor` abstraction, it seems like a good opportunity to improve the interface and to not pass `ProcessroContext` into the new `Processor#init()` method, but an improved interface. Thoughts? One more nits about the KIP: I think, we should clearly state, that this change does not provide type safety for PAPI users. The following example would compile without any errors or warning, even if the types don't match: > Topology t = new Topology(); > t.addSource("s", ...); > t.addProcessor("p1", new ProcessorSupplier<KIn, VIn, FooKey, BarValue>()..., > "s"); > t.addProcessor("p2", new ProcessorSupplier<NotFooKey, NotBarValue, KOut, > VOut>()..., "p1"); Just want to make sure users understand the impact/scope of the change, especially what is _not_ achieved. About `addGlobalStore()` -- should the return types be `Void` similar to `KStream#process()`? -Matthias On 7/24/19 9:11 AM, Guozhang Wang wrote: > Sounds good to me, thanks John! > > > Guozhang > > On Wed, Jul 24, 2019 at 7:40 AM John Roesler <j...@confluent.io> wrote: > >> Hey Guozhang, >> >> Thanks for the thought! It sounds related to what I was thinking in >> https://issues.apache.org/jira/browse/KAFKA-8396 , but a little "extra"... >> >> I proposed to eliminate ValueTransformer, but I believe you're right; we >> could eliminate Transformer also and just use Processor in the transform() >> methods. >> >> To your first bullet, regarding transform/flatTransform... I'd argue that >> the difference isn't material and if we switch to just using >> context.forward instead of returns, then we just need one and people can >> call forward as much as they want. It certainly warrants further >> discussion, though... >> >> To the second point, yes, I'm thinking that we can eschew the >> ValueTransformer and instead do something like ignore the forwarded key or >> check the key for serial identity, etc. >> >> The ultimate advantage of these ideas is that we reduce the number of >> interface variants and we also give people just one way to pass values >> forward instead of two. >> >> Of course, it's beyond the scope of this KIP, but this KIP is a >> precondition for these further improvements. >> >> I'm copying your comment onto the ticket for posterity. >> >> Thanks! >> -John >> >> On Tue, Jul 23, 2019 at 5:38 PM Guozhang Wang <wangg...@gmail.com> wrote: >> >>> Hi John, >>> >>> Just a wild thought about Transformer: now with the new Processor<KIn, >>> KOut, VIn, VOut>#init(ProcessorContext<KOut, VOut>), do we still need a >>> Transformer (and even ValueTransformer / ValueTransformerWithKey)? >>> >>> What if: >>> >>> * We just make KStream#transform to get a ProcessorSupplier as well, and >>> inside `process()` we check that at most one `context.forward()` is >> called, >>> and then take it as the return value. >>> * We would still use ValueTransformer for KStream#transformValue, or we >> can >>> also use a `ProcessorSupplier where we allow at most one >>> `context.forward()` AND we ignore whatever passed in as key but just use >>> the original key. >>> >>> >>> Guozhang >>> >>> >>> On Tue, Jul 16, 2019 at 9:03 AM John Roesler <j...@confluent.io> wrote: >>> >>>> Hi again, all, >>>> >>>> I have started the voting thread. Please cast your votes (or voice >>>> your objections)! The vote will remain open at least 72 hours. Once it >>>> closes, I can send the PR pretty quickly. >>>> >>>> Thanks for all you help ironing out the details on this feature. >>>> -John >>>> >>>> On Mon, Jul 15, 2019 at 5:09 PM John Roesler <j...@confluent.io> >> wrote: >>>>> >>>>> Hey all, >>>>> >>>>> It sounds like there's general agreement now on this KIP, so I >> updated >>>>> the KIP to fit in with Guozhang's overall proposed package structure. >>>>> Specifically, the proposed name for the new Processor interface is >>>>> "org.apache.kafka.streams.processor.api.Processor". >>>>> >>>>> If there are no objections, then I plan to start the vote tomorrow! >>>>> >>>>> Thanks, all, for your contributions. >>>>> -John >>>>> >>>>> On Thu, Jul 11, 2019 at 1:50 PM Matthias J. Sax < >> matth...@confluent.io >>>> >>>> wrote: >>>>>> >>>>>> Side remark: >>>>>> >>>>>>> Now that "flat transform" is a specific >>>>>>>> part of the API it seems okay to steer folks in that direction >> (to >>>> never >>>>>>>> use context.process in a transformer), but it should be called >> out >>>>>>>> explicitly in javadocs. Currently Transformer (which is used >> for >>>> both >>>>>>>> transform() and flatTransform() ) doesn't really call out the >>>> ambiguity: >>>>>> >>>>>> Would you want to do a PR for address this? We are always eager to >>>>>> improve the JavaDocs! >>>>>> >>>>>> >>>>>> -Matthias >>>>>> >>>>>> On 7/7/19 11:26 AM, Paul Whalen wrote: >>>>>>> First of all, +1 on the whole idea, my team has run into >>> (admittedly >>>> minor, >>>>>>> but definitely annoying) issues because of the weaker typing. >>> We're >>>> heavy >>>>>>> users of the PAPI and have Processors that, while not hundreds of >>>> lines >>>>>>> long, are certainly quite hefty and call context.forward() in >> many >>>> places. >>>>>>> >>>>>>> After reading the KIP and discussion a few times, I've convinced >>>> myself >>>>>>> that any initial concerns I had aren't really concerns at all >>> (state >>>> store >>>>>>> types, for one). One thing I will mention: changing >> *Transformer* >>>> to have >>>>>>> ProcessorContext<Void, Void> gave me pause, because I have code >>> that >>>> does >>>>>>> context.forward in transformers. Now that "flat transform" is a >>>> specific >>>>>>> part of the API it seems okay to steer folks in that direction >> (to >>>> never >>>>>>> use context.process in a transformer), but it should be called >> out >>>>>>> explicitly in javadocs. Currently Transformer (which is used for >>>> both >>>>>>> transform() and flatTransform() ) doesn't really call out the >>>> ambiguity: >>>>>>> >>>> >>> >> https://github.com/apache/kafka/blob/ca641b3e2e48c14ff308181c775775408f5f35f7/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java#L75-L77 >>>> , >>>>>>> and for migrating users (from before flatTransform) it could be >>>> confusing. >>>>>>> >>>>>>> Side note, I'd like to plug KIP-401 (there is a discussion thread >>>> and a >>>>>>> voting thread) which also relates to using the PAPI. It seems >> like >>>> there >>>>>>> is some interest and it is in a votable state with the majority >> of >>>>>>> implementation complete. >>>>>>> >>>>>>> Paul >>>>>>> >>>>>>> On Fri, Jun 28, 2019 at 2:02 PM Bill Bejeck <bbej...@gmail.com> >>>> wrote: >>>>>>> >>>>>>>> Sorry for coming late to the party. >>>>>>>> >>>>>>>> As for the naming I'm in favor of RecordProcessor as well. >>>>>>>> >>>>>>>> I agree that we should not take on doing all of the package >>>> movements as >>>>>>>> part of this KIP, especially as John has pointed out, it will be >>> an >>>>>>>> opportunity to discuss some clean-up on individual classes >> which I >>>> envision >>>>>>>> becoming another somewhat involved process. >>>>>>>> >>>>>>>> For the end goal, if possible, here's what I propose. >>>>>>>> >>>>>>>> 1. We keep the scope of the KIP the same, *but we only >>>> implement* *it in >>>>>>>> phases* >>>>>>>> 2. Phase one could include what Guozhang had proposed earlier >>>> namely >>>>>>>> 1. > 1.a) modifying ProcessorContext only with the output >> types >>>> on >>>>>>>> forward. >>>>>>>> > 1.b) modifying Transformer signature to have generics of >>>>>>>> ProcessorContext, >>>>>>>> > and then lift the restricting of not using punctuate: if >>>> user did >>>>>>>> not >>>>>>>> > follow the enforced typing and just code without >> generics, >>>> they >>>>>>>> will get >>>>>>>> > warning at compile time and get run-time error if they >>>> forward >>>>>>>> wrong-typed >>>>>>>> > records, which I think would be acceptable. >>>>>>>> 3. Then we could tackle other pieces in an incremental manner >>> as >>>> we see >>>>>>>> what makes sense >>>>>>>> >>>>>>>> Just my 2cents >>>>>>>> >>>>>>>> -Bill >>>>>>>> >>>>>>>> On Mon, Jun 24, 2019 at 10:22 PM Guozhang Wang < >>> wangg...@gmail.com> >>>> wrote: >>>>>>>> >>>>>>>>> Hi John, >>>>>>>>> >>>>>>>>> Yeah I think we should not do all the repackaging as part of >> this >>>> KIP as >>>>>>>>> well (we can just do the movement of the Processor / >>>> ProcessorSupplier), >>>>>>>>> but I think we need to discuss the end goal here since >> otherwise >>>> we may >>>>>>>> do >>>>>>>>> the repackaging of Processor in this KIP, but only later on >>>> realizing >>>>>>>> that >>>>>>>>> other re-packagings are not our favorite solutions. >>>>>>>>> >>>>>>>>> >>>>>>>>> Guozhang >>>>>>>>> >>>>>>>>> On Mon, Jun 24, 2019 at 3:06 PM John Roesler < >> j...@confluent.io> >>>> wrote: >>>>>>>>> >>>>>>>>>> Hey Guozhang, >>>>>>>>>> >>>>>>>>>> Thanks for the idea! I'm wondering if we could take a middle >>>> ground >>>>>>>>>> and take your proposed layout as a "roadmap", while only >>> actually >>>>>>>>>> moving the classes that are already involved in this KIP. >>>>>>>>>> >>>>>>>>>> The reason I ask is not just to control the scope of this KIP, >>> but >>>>>>>>>> also, I think that if we move other classes to new packages, >> we >>>> might >>>>>>>>>> also want to take the opportunity to clean up other things >> about >>>> them. >>>>>>>>>> But each one of those would become a discussion point of its >>> own, >>>> so >>>>>>>>>> it seems the discussion would become intractable. FWIW, I do >>> like >>>> your >>>>>>>>>> idea for precisely this reason, it creates opportunities for >> us >>> to >>>>>>>>>> consider other changes that we are simply not able to make >>> without >>>>>>>>>> breaking source compatibility. >>>>>>>>>> >>>>>>>>>> If the others feel "kind of favorable" with this overall >> vision, >>>> maybe >>>>>>>>>> we can make one or more Jira tickets to capture it, and then >>> just >>>>>>>>>> alter _this_ proposal to `processor.api.Processor` (etc). >>>>>>>>>> >>>>>>>>>> WDYT? >>>>>>>>>> -John >>>>>>>>>> >>>>>>>>>> On Sun, Jun 23, 2019 at 7:17 PM Guozhang Wang < >>> wangg...@gmail.com >>>>> >>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> Hello John, >>>>>>>>>>> >>>>>>>>>>> Thanks for your detailed explanation, I've done some quick >>>> checks on >>>>>>>>> some >>>>>>>>>>> existing examples that heavily used Processor and the results >>>> also >>>>>>>>> makes >>>>>>>>>> me >>>>>>>>>>> worried about my previous statements that "the breakage would >>>> not be >>>>>>>>>> big". >>>>>>>>>>> I agree we should maintain compatibility. >>>>>>>>>>> >>>>>>>>>>> About the naming itself, I'm actually a bit inclined into >>>>>>>> sub-packages >>>>>>>>>> than >>>>>>>>>>> renamed new classes, and my motivations are that our current >>>>>>>> packaging >>>>>>>>> is >>>>>>>>>>> already quite coarsen grained and sometimes ill-placed, and >>> hence >>>>>>>> maybe >>>>>>>>>> we >>>>>>>>>>> can take this change along with some clean up on packages >> (but >>>> again, >>>>>>>>> we >>>>>>>>>>> should follow the deprecate - removal path). What I'm >> thinking >>>> is: >>>>>>>>>>> >>>>>>>>>>> ------------------- >>>>>>>>>>> >>>>>>>>>>> processor/: >>>> StateRestoreCallback/AbstractNotifyingRestoreCallback, >>>>>>>>>> (deprecated >>>>>>>>>>> later, same meaning for other cross-throughs), >> ProcessContest, >>>>>>>>>>> RecordContext, Punctuator, PunctuationType, To, Cancellable >>> (are >>>> the >>>>>>>>> only >>>>>>>>>>> things left) >>>>>>>>>>> >>>>>>>>>>> (new) processor/api/: Processor, ProcessorSupplier (and of >>>> course, >>>>>>>>> these >>>>>>>>>>> two classes can be strong typed) >>>>>>>>>>> >>>>>>>>>>> state/: StateStore, BatchingStateRestoreCallback, >>>>>>>>>>> AbstractNotifyingBatchingRestoreCallback (moved from >>> processor/), >>>>>>>>>>> PartitionGrouper, WindowStoreIterator, StateSerdes (this one >>> can >>>> be >>>>>>>>> moved >>>>>>>>>>> into state/internals), TimestampedByteStore (we can move this >>> to >>>>>>>>>> internals >>>>>>>>>>> since store types would use vat by default, see below), >>>>>>>>> ValueAndTimestamp >>>>>>>>>>> >>>>>>>>>>> (new) state/factory/: Stores, StoreBuilder, StoreSupplier; >>> *BUT* >>>> the >>>>>>>>> new >>>>>>>>>>> Stores would not have timestampedXXBuilder APIs since the >>> default >>>>>>>>>>> StoreSupplier / StoreBuilder value types are >> ValueAndTimestamp >>>>>>>> already. >>>>>>>>>>> >>>>>>>>>>> (new) state/queryable/: QueryableStoreType, >>> QueryableStoreTypes, >>>>>>>>> HostInfo >>>>>>>>>>> >>>>>>>>>>> (new) state/keyValue/: KeyValueXXX classes, and also the same >>> for >>>>>>>>>>> state/sessionWindow and state/timeWindow; *BUT* here we use >>>>>>>>>>> ValueAndTimestamp as value types of those APIs directly, and >>> also >>>>>>>>>>> TimestampedKeyValue/WindowStore would be deprecated. >>>>>>>>>>> >>>>>>>>>>> (new) kstream/api/: KStream, KTable, GroupedKStream (renamed >>> from >>>>>>>>>>> KGroupedStream), GroupedKTable (renamed from KGroupedTable), >>>>>>>>>>> TimeWindowedKStream, SessionWindowedKStream, GlobalKTable >>>>>>>>>>> >>>>>>>>>>> (new) kstream/operator/: Aggregator, ForeachFunction, ... , >>>> Merger >>>>>>>> and >>>>>>>>>>> Grouped, Joined, Materialized, ... , Printed and Transformer, >>>>>>>>>>> TransformerSupplier. >>>>>>>>>>> >>>>>>>>>>> (new) kstream/window/: Window, Windows, Windowed, >> TimeWindows, >>>>>>>>>>> SessionWindows, UnlimitedWindows, JoinWindows, >> WindowedSerdes, >>>>>>>>>>> Time/SessionWindowedSerialized/Deserializer. >>>>>>>>>>> >>>>>>>>>>> (new) configure/: RocksDBConfigSetter, TopicNameExtractor, >>>>>>>>>>> TimestampExtractor, UsePreviousTimeOnInvalidTimestamp, >>>>>>>>>>> WallclockTimestampExtractor, ExtractRecordMetadataTimestamp, >>>>>>>>>>> FailOnInvalidTimestamp, LogAndSkipOnInvalidTimestamp, >>>>>>>>>> StateRestoreListener, >>>>>>>>>>> >>>>>>>>>>> (new) metadata/: StreamsMetadata, ThreadMetadata, >> TaskMetadata, >>>>>>>> TaskId >>>>>>>>>>> >>>>>>>>>>> Still, any xxx/internals packages are declared as inner >>> classes, >>>> but >>>>>>>>>> other >>>>>>>>>>> xxx/yyy packages are declared as public APIs. >>>>>>>>>>> >>>>>>>>>>> ------------------- >>>>>>>>>>> >>>>>>>>>>> This is a very wild thought and I can totally understand if >>>> people >>>>>>>> feel >>>>>>>>>>> this is too much since it definitely enlarges the scope of >> this >>>> KIP a >>>>>>>>> lot >>>>>>>>>>> :) just trying to play a devil's advocate here to do major >>>>>>>> refactoring >>>>>>>>>> and >>>>>>>>>>> avoid renaming Processor classes. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Guozhang >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Fri, Jun 21, 2019 at 9:51 PM Matthias J. Sax < >>>>>>>> matth...@confluent.io >>>>>>>>>> >>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> I think `RecordProcessor` is a good name. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -Matthias >>>>>>>>>>>> >>>>>>>>>>>> On 6/21/19 5:09 PM, John Roesler wrote: >>>>>>>>>>>>> After kicking the naming around a bit more, it seems like >> any >>>>>>>>> package >>>>>>>>>>>>> name change is a bit "weird" because it fragments the >> package >>>> and >>>>>>>>>>>>> directory structure. If we can come up with a reasonable >> name >>>> for >>>>>>>>> the >>>>>>>>>>>>> interface after all, it seems like the better choice. >>>>>>>>>>>>> >>>>>>>>>>>>> The real challenge is that the existing name "Processor" >>> seems >>>>>>>> just >>>>>>>>>>>>> about perfect. In picking a new name, we need to consider >> the >>>>>>>>>> ultimate >>>>>>>>>>>>> state, after the deprecation period, when we entirely >> remove >>>>>>>>>>>>> Processor. In this context, TypedProcessor seems a little >> odd >>>> to >>>>>>>>> me, >>>>>>>>>>>>> because it seems to imply that there should also be an >>> "untyped >>>>>>>>>>>>> processor". >>>>>>>>>>>>> >>>>>>>>>>>>> After kicking around a few other ideas, what does everyone >>>> think >>>>>>>>>> about >>>>>>>>>>>>> "RecordProcessor"? I _think_ maybe it stands on its own >> just >>>>>>>> fine, >>>>>>>>>>>>> because it's a thing that processes... records? >>>>>>>>>>>>> >>>>>>>>>>>>> If others agree with this, I can change the proposal to >>>>>>>>>> RecordProcessor. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> -John >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Jun 21, 2019 at 6:42 PM John Roesler < >>>> j...@confluent.io> >>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>> >>>>>>>>>>>>>> I've updated the KIP with the feedback so far. >>>>>>>>>>>>>> >>>>>>>>>>>>>> The naming question is still the biggest (only?) >> outstanding >>>>>>>>> issue. >>>>>>>>>> It >>>>>>>>>>>>>> would be good to hear some more thoughts on it. >>>>>>>>>>>>>> >>>>>>>>>>>>>> As we stand now, there's one vote for changing the package >>>> name >>>>>>>> to >>>>>>>>>>>>>> something like 'typedprocessor', one for changing the >>>> interface >>>>>>>> to >>>>>>>>>>>>>> TypedProcessor (as in the PoC), and one for just changing >>> the >>>>>>>>>>>>>> Processor interface in-place, breaking source >> compatibility. >>>>>>>>>>>>>> >>>>>>>>>>>>>> How can we resolve this decision? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>> -John >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, Jun 20, 2019 at 5:44 PM John Roesler < >>>> j...@confluent.io >>>>>>>>> >>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks for the feedback, Guozhang and Matthias, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Regarding motivation: I'll update the wiki. Briefly: >>>>>>>>>>>>>>> * Any processor can benefit. Imagine a pure user of the >>>>>>>>>> ProcessorAPI >>>>>>>>>>>>>>> who has very complex processing logic. I have seen >> several >>>>>>>>>> processor >>>>>>>>>>>>>>> implementation that are hundreds of lines long and call >>>>>>>>>>>>>>> `context.forward` in many different locations and >> branches. >>>> In >>>>>>>>>> such an >>>>>>>>>>>>>>> implementation, it would be very easy to have a bug in a >>>> rarely >>>>>>>>>> used >>>>>>>>>>>>>>> branch that forwards the wrong kind of value. This would >>>>>>>>>> structurally >>>>>>>>>>>>>>> prevent that from happening. >>>>>>>>>>>>>>> * Also, anyone who heavily uses the ProcessorAPI would >>> likely >>>>>>>>> have >>>>>>>>>>>>>>> developed helper methods to wire together processors, >> just >>> as >>>>>>>> we >>>>>>>>>> have >>>>>>>>>>>>>>> in the DSL implementation. This change would enable them >> to >>>>>>>>> ensure >>>>>>>>>> at >>>>>>>>>>>>>>> compile time that they are actually wiring together >>>> compatible >>>>>>>>>> types. >>>>>>>>>>>>>>> This was actually _my_ original motivation, since I found >>> it >>>>>>>> very >>>>>>>>>>>>>>> difficult and time consuming to follow the Streams DSL >>>> internal >>>>>>>>>>>>>>> builders. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Regarding breaking the source compatibility of >> Processor: I >>>>>>>> would >>>>>>>>>>>>>>> _love_ to side-step the naming problem, but I really >> don't >>>> know >>>>>>>>> if >>>>>>>>>>>>>>> it's excusable to break compatibility. I suspect that our >>>>>>>> oldest >>>>>>>>>> and >>>>>>>>>>>>>>> dearest friends are using the ProcessorAPI in some form >> or >>>>>>>>> another, >>>>>>>>>>>>>>> and all their source code would break. It sucks to have >> to >>>>>>>>> create a >>>>>>>>>>>>>>> whole new interface to get around this, but it feels like >>> the >>>>>>>>> right >>>>>>>>>>>>>>> thing to do. Would be nice to get even more feedback on >>> this >>>>>>>>> point, >>>>>>>>>>>>>>> though. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Regarding the types of stores, as I said in my response >> to >>>>>>>>> Sophie, >>>>>>>>>>>>>>> it's not an issue. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Regarding the change to StreamsBuilder, it doesn't pin >> the >>>>>>>> types >>>>>>>>> in >>>>>>>>>>>>>>> any way, since all the types are bounded by Object only, >>> and >>>>>>>>> there >>>>>>>>>> are >>>>>>>>>>>>>>> no extra constraints between arguments (each type is used >>>> only >>>>>>>>>> once in >>>>>>>>>>>>>>> one argument). But maybe I missed the point you were >> asking >>>>>>>>> about. >>>>>>>>>>>>>>> Since the type takes generic paramters, we should allow >>> users >>>>>>>> to >>>>>>>>>> pass >>>>>>>>>>>>>>> in parameterized arguments. Otherwise, they would _have >> to_ >>>>>>>> give >>>>>>>>>> us a >>>>>>>>>>>>>>> raw type, and they would be forced to get a "rawtyes" >>> warning >>>>>>>>> from >>>>>>>>>> the >>>>>>>>>>>>>>> compiler. So, it's our obligation in any API that >> accepts a >>>>>>>>>>>>>>> parameterized-type parameter to allow people to actually >>>> pass a >>>>>>>>>>>>>>> parameterized type, even if we don't actually use the >>>>>>>> parameters. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> The naming question is a complex one, as I took pains to >>>> detail >>>>>>>>>>>>>>> previously. Please don't just pick out one minor point, >>> call >>>> it >>>>>>>>>> weak, >>>>>>>>>>>>>>> and then claim that it invalidates the whole decision. I >>>> don't >>>>>>>>>> think >>>>>>>>>>>>>>> there's a clear best choice, so I'm more than happy for >>>> someone >>>>>>>>> to >>>>>>>>>>>>>>> advocate for renaming the class instead of the package. >> Can >>>> you >>>>>>>>>>>>>>> provide some reasons why you think that would be better? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Regarding the deprecated methods, you're absolutely >> right. >>>> I'll >>>>>>>>>>> update the KIP. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks again for all the feedback! >>>>>>>>>>>>>>> -John >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Thu, Jun 20, 2019 at 4:34 PM Matthias J. Sax < >>>>>>>>>> matth...@confluent.io> >>>>>>>>>>> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Just want to second what Sophie said about the stores. >> The >>>>>>>> type >>>>>>>>>> of a >>>>>>>>>>>>>>>> used stores is completely independent of input/output >>> types. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> This related to change `addGlobalStore()` method. Why do >>> you >>>>>>>>> want >>>>>>>>>> to >>>>>>>>>>> pin >>>>>>>>>>>>>>>> the types? In fact, people request the ability to >> filter() >>>> and >>>>>>>>>> maybe >>>>>>>>>>>>>>>> even map() the data before they are put into the global >>>> store. >>>>>>>>>>> Limiting >>>>>>>>>>>>>>>> the types seems to be a step backward here? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Also, the pack name is questionable. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> This wouldn't be the first project to do something like >>>>>>>> this... >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Not a strong argument. I would actually propose to not >> a a >>>> new >>>>>>>>>>> package, >>>>>>>>>>>>>>>> but just a new class `TypedProcessor`. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> For `ProcessorContext#forward` methods -- some of those >>>>>>>> methods >>>>>>>>>> are >>>>>>>>>>>>>>>> already deprecated. While the will still be affected, it >>>> would >>>>>>>>> be >>>>>>>>>>> worth >>>>>>>>>>>>>>>> to mark them as deprecated in the wiki page, too. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> @Guozhang: I dont' think we should break source >>>> compatibility >>>>>>>>> in a >>>>>>>>>>> minor >>>>>>>>>>>>>>>> release. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On 6/20/19 1:43 PM, Guozhang Wang wrote: >>>>>>>>>>>>>>>>> Hi John, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks for KIP! I've a few comments below: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 1. So far the "Motivation" section is very general, and >>> the >>>>>>>>> only >>>>>>>>>>> concrete >>>>>>>>>>>>>>>>> example that I have in mind is >>> `TransformValues#punctuate`. >>>>>>>> Do >>>>>>>>> we >>>>>>>>>>> have any >>>>>>>>>>>>>>>>> other concrete issues that drive this KIP? If not then >> I >>>> feel >>>>>>>>>>> better to >>>>>>>>>>>>>>>>> narrow the scope of this KIP to: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 1.a) modifying ProcessorContext only with the output >>> types >>>> on >>>>>>>>>>> forward. >>>>>>>>>>>>>>>>> 1.b) modifying Transformer signature to have generics >> of >>>>>>>>>>> ProcessorContext, >>>>>>>>>>>>>>>>> and then lift the restricting of not using punctuate: >> if >>>> user >>>>>>>>> did >>>>>>>>>>> not >>>>>>>>>>>>>>>>> follow the enforced typing and just code without >>> generics, >>>>>>>> they >>>>>>>>>>> will get >>>>>>>>>>>>>>>>> warning at compile time and get run-time error if they >>>>>>>> forward >>>>>>>>>>> wrong-typed >>>>>>>>>>>>>>>>> records, which I think would be acceptable. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I feel this would be a good solution for this specific >>>> issue; >>>>>>>>>>> again, feel >>>>>>>>>>>>>>>>> free to update the wiki page with other known issues >> that >>>>>>>>> cannot >>>>>>>>>> be >>>>>>>>>>>>>>>>> resolved. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 2. If, we want to go with the current scope then my >> next >>>>>>>>> question >>>>>>>>>>> would be, >>>>>>>>>>>>>>>>> how much breakage we would introducing if we just >> modify >>>> the >>>>>>>>>>> Processor >>>>>>>>>>>>>>>>> signature directly? My feeling is that DSL users would >> be >>>>>>>> most >>>>>>>>>>> likely not >>>>>>>>>>>>>>>>> affected and PAPI users only need to modify a few lines >>> on >>>>>>>>> class >>>>>>>>>>>>>>>>> declaration. I feel it worth doing some research on >> this >>>> part >>>>>>>>> and >>>>>>>>>>> then >>>>>>>>>>>>>>>>> decide if we really want to bite the bullet of >> duplicated >>>>>>>>>> Processor >>>>>>>>>>> / >>>>>>>>>>>>>>>>> ProcessorSupplier classes for maintaining >> compatibility. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Guozhang >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Wed, Jun 19, 2019 at 12:21 PM John Roesler < >>>>>>>>> j...@confluent.io >>>>>>>>>>> >>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> In response to the feedback so far, I changed the >>> package >>>>>>>> name >>>>>>>>>> from >>>>>>>>>>>>>>>>>> `processor2` to `processor.generic`. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>> -John >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Mon, Jun 17, 2019 at 4:49 PM John Roesler < >>>>>>>>> j...@confluent.io >>>>>>>>>>> >>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thanks for the feedback, Sophie! >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I actually felt a little uneasy when I wrote that >>> remark, >>>>>>>>>> because >>>>>>>>>>> it's >>>>>>>>>>>>>>>>>>> not restricted at all in the API, it's just available >>> to >>>>>>>> you >>>>>>>>> if >>>>>>>>>>> you >>>>>>>>>>>>>>>>>>> choose to give your stores and context the same >>>> parameters. >>>>>>>>>> So, I >>>>>>>>>>>>>>>>>>> think your use case is valid, and also perfectly >>>>>>>> permissable >>>>>>>>>>> under the >>>>>>>>>>>>>>>>>>> current KIP. Sorry for sowing confusion on my own >>>>>>>> discussion >>>>>>>>>>> thread! >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I'm not crazy about the package name, either. I went >>> with >>>>>>>> it >>>>>>>>>> only >>>>>>>>>>>>>>>>>>> because there's seemingly nothing special about the >> new >>>>>>>>> package >>>>>>>>>>> except >>>>>>>>>>>>>>>>>>> that it can't have the same name as the old one. >>>> Otherwise, >>>>>>>>> the >>>>>>>>>>>>>>>>>>> existing "processor" and "Processor" names for the >>>> package >>>>>>>>> and >>>>>>>>>>> class >>>>>>>>>>>>>>>>>>> are perfectly satisfying. Rather than pile on >>> additional >>>>>>>>>>> semantics, it >>>>>>>>>>>>>>>>>>> seemed cleaner to just add a number to the package >>> name. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> This wouldn't be the first project to do something >> like >>>>>>>>> this... >>>>>>>>>>> Apache >>>>>>>>>>>>>>>>>>> Commons, for example, has added a "2" to the end of >>> some >>>> of >>>>>>>>>> their >>>>>>>>>>>>>>>>>>> packages for exactly the same reason. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I'm open to any suggestions. For example, we could do >>>>>>>>> something >>>>>>>>>>> like >>>>>>>>>>>>>>>>>>> org.apache.kafka.streams.typedprocessor.Processor or >>>>>>>>>>>>>>>>>>> org.apache.kafka.streams.processor.typed.Processor , >>>> which >>>>>>>>>> would >>>>>>>>>>> have >>>>>>>>>>>>>>>>>>> just about the same effect. One microscopic thought >> is >>>>>>>> that, >>>>>>>>> if >>>>>>>>>>>>>>>>>>> there's another interface in the "processor" package >>> that >>>>>>>> we >>>>>>>>>> wish >>>>>>>>>>> to >>>>>>>>>>>>>>>>>>> do the same thing to, would _could_ pile it in to >>>>>>>>> "processor2", >>>>>>>>>>> but we >>>>>>>>>>>>>>>>>>> couldn't do the same if we use a package that has >>> "typed" >>>>>>>> in >>>>>>>>>> the >>>>>>>>>>> name, >>>>>>>>>>>>>>>>>>> unless that change is _also_ related to types in some >>>> way. >>>>>>>>> But >>>>>>>>>>> this >>>>>>>>>>>>>>>>>>> seems like a very minor concern. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> What's your preference? >>>>>>>>>>>>>>>>>>> -John >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Mon, Jun 17, 2019 at 3:56 PM Sophie Blee-Goldman < >>>>>>>>>>> sop...@confluent.io> >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hey John, thanks for writing this up! I like the >>>> proposal >>>>>>>>> but >>>>>>>>>>> there's >>>>>>>>>>>>>>>>>> one >>>>>>>>>>>>>>>>>>>> point that I think may be too restrictive: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> "A processor that happens to use a typed store is >>>> actually >>>>>>>>>>> emitting the >>>>>>>>>>>>>>>>>>>> same types that it is storing." >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I can imagine someone could want to leverage this >> new >>>> type >>>>>>>>>> safety >>>>>>>>>>>>>>>>>> without >>>>>>>>>>>>>>>>>>>> also limiting how they can interact with/use their >>>> store. >>>>>>>> As >>>>>>>>>> an >>>>>>>>>>>>>>>>>> (admittedly >>>>>>>>>>>>>>>>>>>> contrived) example, say you have an input stream of >>>>>>>>> purchases >>>>>>>>>> of >>>>>>>>>>> a >>>>>>>>>>>>>>>>>> certain >>>>>>>>>>>>>>>>>>>> type (entertainment, food, etc), and on seeing a new >>>>>>>> record >>>>>>>>>> you >>>>>>>>>>> want to >>>>>>>>>>>>>>>>>>>> output how many types of purchase a shopper has made >>>> more >>>>>>>>>> than 5 >>>>>>>>>>>>>>>>>> purchases >>>>>>>>>>>>>>>>>>>> of in the last month. Your state store will probably >>> be >>>>>>>>>> holding >>>>>>>>>>> some >>>>>>>>>>>>>>>>>> more >>>>>>>>>>>>>>>>>>>> complicated PurchaseHistory object (keyed by user), >>> but >>>>>>>> your >>>>>>>>>>> output is >>>>>>>>>>>>>>>>>> just >>>>>>>>>>>>>>>>>>>> a <User, Long> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I'm also not crazy about "processor2" as the package >>>> name >>>>>>>>> ... >>>>>>>>>>> not sure >>>>>>>>>>>>>>>>>> what >>>>>>>>>>>>>>>>>>>> a better one would be though (something with >> "typed"?) >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Mon, Jun 17, 2019 at 12:47 PM John Roesler < >>>>>>>>>> j...@confluent.io> >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I'd like to propose KIP-478 ( >>>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/x/2SkLBw >>>>>>>>>>>>>>>>>>>>> ). >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> This proposal would add output type bounds to the >>>>>>>> Processor >>>>>>>>>>> interface >>>>>>>>>>>>>>>>>>>>> in Kafka Streams, which enables static checking of >> a >>>>>>>> number >>>>>>>>>> of >>>>>>>>>>> useful >>>>>>>>>>>>>>>>>>>>> properties: >>>>>>>>>>>>>>>>>>>>> * A processor B that consumes the output of >>> processor A >>>>>>>> is >>>>>>>>>>> actually >>>>>>>>>>>>>>>>>>>>> expecting the same types that processor A produces. >>>>>>>>>>>>>>>>>>>>> * A processor that happens to use a typed store is >>>>>>>> actually >>>>>>>>>>> emitting >>>>>>>>>>>>>>>>>>>>> the same types that it is storing. >>>>>>>>>>>>>>>>>>>>> * A processor is simply forwarding the expected >> types >>>> in >>>>>>>>> all >>>>>>>>>>> code >>>>>>>>>>>>>>>>>> paths. >>>>>>>>>>>>>>>>>>>>> * Processors added via the Streams DSL, which are >> not >>>>>>>>>> permitted >>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>> forward results at all are statically prevented >> from >>>>>>>> doing >>>>>>>>> so >>>>>>>>>>> by the >>>>>>>>>>>>>>>>>>>>> compiler >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Internally, we can use the above properties to >>> achieve >>>> a >>>>>>>>> much >>>>>>>>>>> higher >>>>>>>>>>>>>>>>>>>>> level of confidence in the Streams DSL >>> implementation's >>>>>>>>>>> correctness. >>>>>>>>>>>>>>>>>>>>> Actually, while doing the POC, I found a few bugs >> and >>>>>>>>>> mistakes, >>>>>>>>>>> which >>>>>>>>>>>>>>>>>>>>> become structurally impossible with KIP-478. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Additionally, the stronger types dramatically >> improve >>>> the >>>>>>>>>>>>>>>>>>>>> self-documentation of our Streams internal >>>>>>>> implementations, >>>>>>>>>>> which >>>>>>>>>>>>>>>>>>>>> makes it much easier for new contributors to ramp >> up >>>> with >>>>>>>>>>> confidence. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Thanks so much for your consideration! >>>>>>>>>>>>>>>>>>>>> -John >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> -- Guozhang >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> -- Guozhang >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>> >>> >>> >>> -- >>> -- Guozhang >>> >> > >
signature.asc
Description: OpenPGP digital signature