Hi John, Yeah I think we should not do all the repackaging as part of this KIP as well (we can just do the movement of the Processor / ProcessorSupplier), but I think we need to discuss the end goal here since otherwise we may do the repackaging of Processor in this KIP, but only later on realizing that other re-packagings are not our favorite solutions.
Guozhang On Mon, Jun 24, 2019 at 3:06 PM John Roesler <j...@confluent.io> wrote: > Hey Guozhang, > > Thanks for the idea! I'm wondering if we could take a middle ground > and take your proposed layout as a "roadmap", while only actually > moving the classes that are already involved in this KIP. > > The reason I ask is not just to control the scope of this KIP, but > also, I think that if we move other classes to new packages, we might > also want to take the opportunity to clean up other things about them. > But each one of those would become a discussion point of its own, so > it seems the discussion would become intractable. FWIW, I do like your > idea for precisely this reason, it creates opportunities for us to > consider other changes that we are simply not able to make without > breaking source compatibility. > > If the others feel "kind of favorable" with this overall vision, maybe > we can make one or more Jira tickets to capture it, and then just > alter _this_ proposal to `processor.api.Processor` (etc). > > WDYT? > -John > > On Sun, Jun 23, 2019 at 7:17 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > > Hello John, > > > > Thanks for your detailed explanation, I've done some quick checks on some > > existing examples that heavily used Processor and the results also makes > me > > worried about my previous statements that "the breakage would not be > big". > > I agree we should maintain compatibility. > > > > About the naming itself, I'm actually a bit inclined into sub-packages > than > > renamed new classes, and my motivations are that our current packaging is > > already quite coarsen grained and sometimes ill-placed, and hence maybe > we > > can take this change along with some clean up on packages (but again, we > > should follow the deprecate - removal path). What I'm thinking is: > > > > ------------------- > > > > processor/: StateRestoreCallback/AbstractNotifyingRestoreCallback, > (deprecated > > later, same meaning for other cross-throughs), ProcessContest, > > RecordContext, Punctuator, PunctuationType, To, Cancellable (are the only > > things left) > > > > (new) processor/api/: Processor, ProcessorSupplier (and of course, these > > two classes can be strong typed) > > > > state/: StateStore, BatchingStateRestoreCallback, > > AbstractNotifyingBatchingRestoreCallback (moved from processor/), > > PartitionGrouper, WindowStoreIterator, StateSerdes (this one can be moved > > into state/internals), TimestampedByteStore (we can move this to > internals > > since store types would use vat by default, see below), ValueAndTimestamp > > > > (new) state/factory/: Stores, StoreBuilder, StoreSupplier; *BUT* the new > > Stores would not have timestampedXXBuilder APIs since the default > > StoreSupplier / StoreBuilder value types are ValueAndTimestamp already. > > > > (new) state/queryable/: QueryableStoreType, QueryableStoreTypes, HostInfo > > > > (new) state/keyValue/: KeyValueXXX classes, and also the same for > > state/sessionWindow and state/timeWindow; *BUT* here we use > > ValueAndTimestamp as value types of those APIs directly, and also > > TimestampedKeyValue/WindowStore would be deprecated. > > > > (new) kstream/api/: KStream, KTable, GroupedKStream (renamed from > > KGroupedStream), GroupedKTable (renamed from KGroupedTable), > > TimeWindowedKStream, SessionWindowedKStream, GlobalKTable > > > > (new) kstream/operator/: Aggregator, ForeachFunction, ... , Merger and > > Grouped, Joined, Materialized, ... , Printed and Transformer, > > TransformerSupplier. > > > > (new) kstream/window/: Window, Windows, Windowed, TimeWindows, > > SessionWindows, UnlimitedWindows, JoinWindows, WindowedSerdes, > > Time/SessionWindowedSerialized/Deserializer. > > > > (new) configure/: RocksDBConfigSetter, TopicNameExtractor, > > TimestampExtractor, UsePreviousTimeOnInvalidTimestamp, > > WallclockTimestampExtractor, ExtractRecordMetadataTimestamp, > > FailOnInvalidTimestamp, LogAndSkipOnInvalidTimestamp, > StateRestoreListener, > > > > (new) metadata/: StreamsMetadata, ThreadMetadata, TaskMetadata, TaskId > > > > Still, any xxx/internals packages are declared as inner classes, but > other > > xxx/yyy packages are declared as public APIs. > > > > ------------------- > > > > This is a very wild thought and I can totally understand if people feel > > this is too much since it definitely enlarges the scope of this KIP a lot > > :) just trying to play a devil's advocate here to do major refactoring > and > > avoid renaming Processor classes. > > > > > > Guozhang > > > > > > On Fri, Jun 21, 2019 at 9:51 PM Matthias J. Sax <matth...@confluent.io> > > wrote: > > > > > > I think `RecordProcessor` is a good name. > > > > > > > > > -Matthias > > > > > > On 6/21/19 5:09 PM, John Roesler wrote: > > > > After kicking the naming around a bit more, it seems like any package > > > > name change is a bit "weird" because it fragments the package and > > > > directory structure. If we can come up with a reasonable name for the > > > > interface after all, it seems like the better choice. > > > > > > > > The real challenge is that the existing name "Processor" seems just > > > > about perfect. In picking a new name, we need to consider the > ultimate > > > > state, after the deprecation period, when we entirely remove > > > > Processor. In this context, TypedProcessor seems a little odd to me, > > > > because it seems to imply that there should also be an "untyped > > > > processor". > > > > > > > > After kicking around a few other ideas, what does everyone think > about > > > > "RecordProcessor"? I _think_ maybe it stands on its own just fine, > > > > because it's a thing that processes... records? > > > > > > > > If others agree with this, I can change the proposal to > RecordProcessor. > > > > > > > > Thanks, > > > > -John > > > > > > > > On Fri, Jun 21, 2019 at 6:42 PM John Roesler <j...@confluent.io> > wrote: > > > >> > > > >> Hi all, > > > >> > > > >> I've updated the KIP with the feedback so far. > > > >> > > > >> The naming question is still the biggest (only?) outstanding issue. > It > > > >> would be good to hear some more thoughts on it. > > > >> > > > >> As we stand now, there's one vote for changing the package name to > > > >> something like 'typedprocessor', one for changing the interface to > > > >> TypedProcessor (as in the PoC), and one for just changing the > > > >> Processor interface in-place, breaking source compatibility. > > > >> > > > >> How can we resolve this decision? > > > >> > > > >> Thanks, > > > >> -John > > > >> > > > >> On Thu, Jun 20, 2019 at 5:44 PM John Roesler <j...@confluent.io> > wrote: > > > >>> > > > >>> Thanks for the feedback, Guozhang and Matthias, > > > >>> > > > >>> Regarding motivation: I'll update the wiki. Briefly: > > > >>> * Any processor can benefit. Imagine a pure user of the > ProcessorAPI > > > >>> who has very complex processing logic. I have seen several > processor > > > >>> implementation that are hundreds of lines long and call > > > >>> `context.forward` in many different locations and branches. In > such an > > > >>> implementation, it would be very easy to have a bug in a rarely > used > > > >>> branch that forwards the wrong kind of value. This would > structurally > > > >>> prevent that from happening. > > > >>> * Also, anyone who heavily uses the ProcessorAPI would likely have > > > >>> developed helper methods to wire together processors, just as we > have > > > >>> in the DSL implementation. This change would enable them to ensure > at > > > >>> compile time that they are actually wiring together compatible > types. > > > >>> This was actually _my_ original motivation, since I found it very > > > >>> difficult and time consuming to follow the Streams DSL internal > > > >>> builders. > > > >>> > > > >>> Regarding breaking the source compatibility of Processor: I would > > > >>> _love_ to side-step the naming problem, but I really don't know if > > > >>> it's excusable to break compatibility. I suspect that our oldest > and > > > >>> dearest friends are using the ProcessorAPI in some form or another, > > > >>> and all their source code would break. It sucks to have to create a > > > >>> whole new interface to get around this, but it feels like the right > > > >>> thing to do. Would be nice to get even more feedback on this point, > > > >>> though. > > > >>> > > > >>> Regarding the types of stores, as I said in my response to Sophie, > > > >>> it's not an issue. > > > >>> > > > >>> Regarding the change to StreamsBuilder, it doesn't pin the types in > > > >>> any way, since all the types are bounded by Object only, and there > are > > > >>> no extra constraints between arguments (each type is used only > once in > > > >>> one argument). But maybe I missed the point you were asking about. > > > >>> Since the type takes generic paramters, we should allow users to > pass > > > >>> in parameterized arguments. Otherwise, they would _have to_ give > us a > > > >>> raw type, and they would be forced to get a "rawtyes" warning from > the > > > >>> compiler. So, it's our obligation in any API that accepts a > > > >>> parameterized-type parameter to allow people to actually pass a > > > >>> parameterized type, even if we don't actually use the parameters. > > > >>> > > > >>> The naming question is a complex one, as I took pains to detail > > > >>> previously. Please don't just pick out one minor point, call it > weak, > > > >>> and then claim that it invalidates the whole decision. I don't > think > > > >>> there's a clear best choice, so I'm more than happy for someone to > > > >>> advocate for renaming the class instead of the package. Can you > > > >>> provide some reasons why you think that would be better? > > > >>> > > > >>> Regarding the deprecated methods, you're absolutely right. I'll > > update the KIP. > > > >>> > > > >>> Thanks again for all the feedback! > > > >>> -John > > > >>> > > > >>> On Thu, Jun 20, 2019 at 4:34 PM Matthias J. Sax < > matth...@confluent.io> > > wrote: > > > >>>> > > > >>>> Just want to second what Sophie said about the stores. The type > of a > > > >>>> used stores is completely independent of input/output types. > > > >>>> > > > >>>> This related to change `addGlobalStore()` method. Why do you want > to > > pin > > > >>>> the types? In fact, people request the ability to filter() and > maybe > > > >>>> even map() the data before they are put into the global store. > > Limiting > > > >>>> the types seems to be a step backward here? > > > >>>> > > > >>>> > > > >>>> > > > >>>> Also, the pack name is questionable. > > > >>>> > > > >>>>> This wouldn't be the first project to do something like this... > > > >>>> > > > >>>> Not a strong argument. I would actually propose to not a a new > > package, > > > >>>> but just a new class `TypedProcessor`. > > > >>>> > > > >>>> > > > >>>> For `ProcessorContext#forward` methods -- some of those methods > are > > > >>>> already deprecated. While the will still be affected, it would be > > worth > > > >>>> to mark them as deprecated in the wiki page, too. > > > >>>> > > > >>>> > > > >>>> @Guozhang: I dont' think we should break source compatibility in a > > minor > > > >>>> release. > > > >>>> > > > >>>> > > > >>>> > > > >>>> -Matthias > > > >>>> > > > >>>> > > > >>>> > > > >>>> On 6/20/19 1:43 PM, Guozhang Wang wrote: > > > >>>>> Hi John, > > > >>>>> > > > >>>>> Thanks for KIP! I've a few comments below: > > > >>>>> > > > >>>>> 1. So far the "Motivation" section is very general, and the only > > concrete > > > >>>>> example that I have in mind is `TransformValues#punctuate`. Do we > > have any > > > >>>>> other concrete issues that drive this KIP? If not then I feel > > better to > > > >>>>> narrow the scope of this KIP to: > > > >>>>> > > > >>>>> 1.a) modifying ProcessorContext only with the output types on > > forward. > > > >>>>> 1.b) modifying Transformer signature to have generics of > > ProcessorContext, > > > >>>>> and then lift the restricting of not using punctuate: if user did > > not > > > >>>>> follow the enforced typing and just code without generics, they > > will get > > > >>>>> warning at compile time and get run-time error if they forward > > wrong-typed > > > >>>>> records, which I think would be acceptable. > > > >>>>> > > > >>>>> I feel this would be a good solution for this specific issue; > > again, feel > > > >>>>> free to update the wiki page with other known issues that cannot > be > > > >>>>> resolved. > > > >>>>> > > > >>>>> 2. If, we want to go with the current scope then my next question > > would be, > > > >>>>> how much breakage we would introducing if we just modify the > > Processor > > > >>>>> signature directly? My feeling is that DSL users would be most > > likely not > > > >>>>> affected and PAPI users only need to modify a few lines on class > > > >>>>> declaration. I feel it worth doing some research on this part and > > then > > > >>>>> decide if we really want to bite the bullet of duplicated > Processor > > / > > > >>>>> ProcessorSupplier classes for maintaining compatibility. > > > >>>>> > > > >>>>> > > > >>>>> Guozhang > > > >>>>> > > > >>>>> > > > >>>>> > > > >>>>> On Wed, Jun 19, 2019 at 12:21 PM John Roesler <j...@confluent.io > > > > wrote: > > > >>>>> > > > >>>>>> Hi all, > > > >>>>>> > > > >>>>>> In response to the feedback so far, I changed the package name > from > > > >>>>>> `processor2` to `processor.generic`. > > > >>>>>> > > > >>>>>> Thanks, > > > >>>>>> -John > > > >>>>>> > > > >>>>>> On Mon, Jun 17, 2019 at 4:49 PM John Roesler <j...@confluent.io > > > > wrote: > > > >>>>>>> > > > >>>>>>> Thanks for the feedback, Sophie! > > > >>>>>>> > > > >>>>>>> I actually felt a little uneasy when I wrote that remark, > because > > it's > > > >>>>>>> not restricted at all in the API, it's just available to you if > > you > > > >>>>>>> choose to give your stores and context the same parameters. > So, I > > > >>>>>>> think your use case is valid, and also perfectly permissable > > under the > > > >>>>>>> current KIP. Sorry for sowing confusion on my own discussion > > thread! > > > >>>>>>> > > > >>>>>>> I'm not crazy about the package name, either. I went with it > only > > > >>>>>>> because there's seemingly nothing special about the new package > > except > > > >>>>>>> that it can't have the same name as the old one. Otherwise, the > > > >>>>>>> existing "processor" and "Processor" names for the package and > > class > > > >>>>>>> are perfectly satisfying. Rather than pile on additional > > semantics, it > > > >>>>>>> seemed cleaner to just add a number to the package name. > > > >>>>>>> > > > >>>>>>> This wouldn't be the first project to do something like this... > > Apache > > > >>>>>>> Commons, for example, has added a "2" to the end of some of > their > > > >>>>>>> packages for exactly the same reason. > > > >>>>>>> > > > >>>>>>> I'm open to any suggestions. For example, we could do something > > like > > > >>>>>>> org.apache.kafka.streams.typedprocessor.Processor or > > > >>>>>>> org.apache.kafka.streams.processor.typed.Processor , which > would > > have > > > >>>>>>> just about the same effect. One microscopic thought is that, if > > > >>>>>>> there's another interface in the "processor" package that we > wish > > to > > > >>>>>>> do the same thing to, would _could_ pile it in to "processor2", > > but we > > > >>>>>>> couldn't do the same if we use a package that has "typed" in > the > > name, > > > >>>>>>> unless that change is _also_ related to types in some way. But > > this > > > >>>>>>> seems like a very minor concern. > > > >>>>>>> > > > >>>>>>> What's your preference? > > > >>>>>>> -John > > > >>>>>>> > > > >>>>>>> On Mon, Jun 17, 2019 at 3:56 PM Sophie Blee-Goldman < > > sop...@confluent.io> > > > >>>>>> wrote: > > > >>>>>>>> > > > >>>>>>>> Hey John, thanks for writing this up! I like the proposal but > > there's > > > >>>>>> one > > > >>>>>>>> point that I think may be too restrictive: > > > >>>>>>>> > > > >>>>>>>> "A processor that happens to use a typed store is actually > > emitting the > > > >>>>>>>> same types that it is storing." > > > >>>>>>>> > > > >>>>>>>> I can imagine someone could want to leverage this new type > safety > > > >>>>>> without > > > >>>>>>>> also limiting how they can interact with/use their store. As > an > > > >>>>>> (admittedly > > > >>>>>>>> contrived) example, say you have an input stream of purchases > of > > a > > > >>>>>> certain > > > >>>>>>>> type (entertainment, food, etc), and on seeing a new record > you > > want to > > > >>>>>>>> output how many types of purchase a shopper has made more > than 5 > > > >>>>>> purchases > > > >>>>>>>> of in the last month. Your state store will probably be > holding > > some > > > >>>>>> more > > > >>>>>>>> complicated PurchaseHistory object (keyed by user), but your > > output is > > > >>>>>> just > > > >>>>>>>> a <User, Long> > > > >>>>>>>> > > > >>>>>>>> I'm also not crazy about "processor2" as the package name ... > > not sure > > > >>>>>> what > > > >>>>>>>> a better one would be though (something with "typed"?) > > > >>>>>>>> > > > >>>>>>>> On Mon, Jun 17, 2019 at 12:47 PM John Roesler < > j...@confluent.io> > > > >>>>>> wrote: > > > >>>>>>>> > > > >>>>>>>>> Hi all, > > > >>>>>>>>> > > > >>>>>>>>> I'd like to propose KIP-478 ( > > > >>>>>> https://cwiki.apache.org/confluence/x/2SkLBw > > > >>>>>>>>> ). > > > >>>>>>>>> > > > >>>>>>>>> This proposal would add output type bounds to the Processor > > interface > > > >>>>>>>>> in Kafka Streams, which enables static checking of a number > of > > useful > > > >>>>>>>>> properties: > > > >>>>>>>>> * A processor B that consumes the output of processor A is > > actually > > > >>>>>>>>> expecting the same types that processor A produces. > > > >>>>>>>>> * A processor that happens to use a typed store is actually > > emitting > > > >>>>>>>>> the same types that it is storing. > > > >>>>>>>>> * A processor is simply forwarding the expected types in all > > code > > > >>>>>> paths. > > > >>>>>>>>> * Processors added via the Streams DSL, which are not > permitted > > to > > > >>>>>>>>> forward results at all are statically prevented from doing so > > by the > > > >>>>>>>>> compiler > > > >>>>>>>>> > > > >>>>>>>>> Internally, we can use the above properties to achieve a much > > higher > > > >>>>>>>>> level of confidence in the Streams DSL implementation's > > correctness. > > > >>>>>>>>> Actually, while doing the POC, I found a few bugs and > mistakes, > > which > > > >>>>>>>>> become structurally impossible with KIP-478. > > > >>>>>>>>> > > > >>>>>>>>> Additionally, the stronger types dramatically improve the > > > >>>>>>>>> self-documentation of our Streams internal implementations, > > which > > > >>>>>>>>> makes it much easier for new contributors to ramp up with > > confidence. > > > >>>>>>>>> > > > >>>>>>>>> Thanks so much for your consideration! > > > >>>>>>>>> -John > > > >>>>>>>>> > > > >>>>>> > > > >>>>> > > > >>>>> > > > >>>> > > > > > > > > > -- > > -- Guozhang > -- -- Guozhang