I have concerns about the latest proposal from Guozhang. However, as
John said it's beyond the scope of this KIP and thus, I don't go into
details. I agree thought, that the current "transformer APIs" are not
ideal and could be improved.


An orthogonal though is that we should split the current
`ProcessorContext` into multiple interfaces. Atm, the context can be use to:

- access metadata
- schedule punctuation
- get state stores
- register state stores
- forward output data

(1) registering state stores is only required if one implements a custom
store, but not for a regular `Processor` implementation -- hence, it's a
leaking abstraction

(2) for `ValueTransformer` and `flatValueTransformer` we don't want to
allow forwarding key-value pairs, and hence need to throw an RTE for
this case atm

(3) Why do we expose `keySerde()`, `valueSerde()`, and `stateDir()`
explicitly? We have already `appConfigs()` to allow users to access the
configuration.

Overall, it seems that `ProcessorContext` is rather convoluted. Because,
we add a new `Processor` abstraction, it seems like a good opportunity
to improve the interface and to not pass `ProcessroContext` into the new
`Processor#init()` method, but an improved interface.

Thoughts?



One more nits about the KIP:

I think, we should clearly state, that this change does not provide type
safety for PAPI users. The following example would compile without any
errors or warning, even if the types don't match:

> Topology t = new Topology();
> t.addSource("s", ...);
> t.addProcessor("p1", new ProcessorSupplier<KIn, VIn, FooKey, BarValue>()..., 
> "s");
> t.addProcessor("p2", new ProcessorSupplier<NotFooKey, NotBarValue, KOut, 
> VOut>()..., "p1");

Just want to make sure users understand the impact/scope of the change,
especially what is _not_ achieved.


About `addGlobalStore()` -- should the return types be `Void` similar to
`KStream#process()`?



-Matthias


On 7/24/19 9:11 AM, Guozhang Wang wrote:
> Sounds good to me, thanks John!
> 
> 
> Guozhang
> 
> On Wed, Jul 24, 2019 at 7:40 AM John Roesler <j...@confluent.io> wrote:
> 
>> Hey Guozhang,
>>
>> Thanks for the thought! It sounds related to what I was thinking in
>> https://issues.apache.org/jira/browse/KAFKA-8396 , but a little "extra"...
>>
>> I proposed to eliminate ValueTransformer, but I believe you're right; we
>> could eliminate Transformer also and just use Processor in the transform()
>> methods.
>>
>> To your first bullet, regarding transform/flatTransform... I'd argue that
>> the difference isn't material and if we switch to just using
>> context.forward instead of returns, then we just need one and people can
>> call forward as much as they want. It certainly warrants further
>> discussion, though...
>>
>> To the second point, yes, I'm thinking that we can eschew the
>> ValueTransformer and instead do something like ignore the forwarded key or
>> check the key for serial identity, etc.
>>
>> The ultimate advantage of these ideas is that we reduce the number of
>> interface variants and we also give people just one way to pass values
>> forward instead of two.
>>
>> Of course, it's beyond the scope of this KIP, but this KIP is a
>> precondition for these further improvements.
>>
>> I'm copying your comment onto the ticket for posterity.
>>
>> Thanks!
>> -John
>>
>> On Tue, Jul 23, 2019 at 5:38 PM Guozhang Wang <wangg...@gmail.com> wrote:
>>
>>> Hi John,
>>>
>>> Just a wild thought about Transformer: now with the new Processor<KIn,
>>> KOut, VIn, VOut>#init(ProcessorContext<KOut, VOut>), do we still need a
>>> Transformer (and even ValueTransformer / ValueTransformerWithKey)?
>>>
>>> What if:
>>>
>>> * We just make KStream#transform to get a ProcessorSupplier as well, and
>>> inside `process()` we check that at most one `context.forward()` is
>> called,
>>> and then take it as the return value.
>>> * We would still use ValueTransformer for KStream#transformValue, or we
>> can
>>> also use a `ProcessorSupplier where we allow at most one
>>> `context.forward()` AND we ignore whatever passed in as key but just use
>>> the original key.
>>>
>>>
>>> Guozhang
>>>
>>>
>>> On Tue, Jul 16, 2019 at 9:03 AM John Roesler <j...@confluent.io> wrote:
>>>
>>>> Hi again, all,
>>>>
>>>> I have started the voting thread. Please cast your votes (or voice
>>>> your objections)! The vote will remain open at least 72 hours. Once it
>>>> closes, I can send the PR pretty quickly.
>>>>
>>>> Thanks for all you help ironing out the details on this feature.
>>>> -John
>>>>
>>>> On Mon, Jul 15, 2019 at 5:09 PM John Roesler <j...@confluent.io>
>> wrote:
>>>>>
>>>>> Hey all,
>>>>>
>>>>> It sounds like there's general agreement now on this KIP, so I
>> updated
>>>>> the KIP to fit in with Guozhang's overall proposed package structure.
>>>>> Specifically, the proposed name for the new Processor interface is
>>>>> "org.apache.kafka.streams.processor.api.Processor".
>>>>>
>>>>> If there are no objections, then I plan to start the vote tomorrow!
>>>>>
>>>>> Thanks, all, for your contributions.
>>>>> -John
>>>>>
>>>>> On Thu, Jul 11, 2019 at 1:50 PM Matthias J. Sax <
>> matth...@confluent.io
>>>>
>>>> wrote:
>>>>>>
>>>>>> Side remark:
>>>>>>
>>>>>>> Now that "flat transform" is a specific
>>>>>>>> part of the API it seems okay to steer folks in that direction
>> (to
>>>> never
>>>>>>>> use context.process in a transformer), but it should be called
>> out
>>>>>>>> explicitly in javadocs.  Currently Transformer (which is used
>> for
>>>> both
>>>>>>>> transform() and flatTransform() ) doesn't really call out the
>>>> ambiguity:
>>>>>>
>>>>>> Would you want to do a PR for address this? We are always eager to
>>>>>> improve the JavaDocs!
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>> On 7/7/19 11:26 AM, Paul Whalen wrote:
>>>>>>> First of all, +1 on the whole idea, my team has run into
>>> (admittedly
>>>> minor,
>>>>>>> but definitely annoying) issues because of the weaker typing.
>>> We're
>>>> heavy
>>>>>>> users of the PAPI and have Processors that, while not hundreds of
>>>> lines
>>>>>>> long, are certainly quite hefty and call context.forward() in
>> many
>>>> places.
>>>>>>>
>>>>>>> After reading the KIP and discussion a few times, I've convinced
>>>> myself
>>>>>>> that any initial concerns I had aren't really concerns at all
>>> (state
>>>> store
>>>>>>> types, for one).  One thing I will mention:  changing
>> *Transformer*
>>>> to have
>>>>>>> ProcessorContext<Void, Void> gave me pause, because I have code
>>> that
>>>> does
>>>>>>> context.forward in transformers.  Now that "flat transform" is a
>>>> specific
>>>>>>> part of the API it seems okay to steer folks in that direction
>> (to
>>>> never
>>>>>>> use context.process in a transformer), but it should be called
>> out
>>>>>>> explicitly in javadocs.  Currently Transformer (which is used for
>>>> both
>>>>>>> transform() and flatTransform() ) doesn't really call out the
>>>> ambiguity:
>>>>>>>
>>>>
>>>
>> https://github.com/apache/kafka/blob/ca641b3e2e48c14ff308181c775775408f5f35f7/streams/src/main/java/org/apache/kafka/streams/kstream/Transformer.java#L75-L77
>>>> ,
>>>>>>> and for migrating users (from before flatTransform) it could be
>>>> confusing.
>>>>>>>
>>>>>>> Side note, I'd like to plug KIP-401 (there is a discussion thread
>>>> and a
>>>>>>> voting thread) which also relates to using the PAPI.  It seems
>> like
>>>> there
>>>>>>> is some interest and it is in a votable state with the majority
>> of
>>>>>>> implementation complete.
>>>>>>>
>>>>>>> Paul
>>>>>>>
>>>>>>> On Fri, Jun 28, 2019 at 2:02 PM Bill Bejeck <bbej...@gmail.com>
>>>> wrote:
>>>>>>>
>>>>>>>> Sorry for coming late to the party.
>>>>>>>>
>>>>>>>> As for the naming I'm in favor of RecordProcessor as well.
>>>>>>>>
>>>>>>>> I agree that we should not take on doing all of the package
>>>> movements as
>>>>>>>> part of this KIP, especially as John has pointed out, it will be
>>> an
>>>>>>>> opportunity to discuss some clean-up on individual classes
>> which I
>>>> envision
>>>>>>>> becoming another somewhat involved process.
>>>>>>>>
>>>>>>>> For the end goal, if possible, here's what I propose.
>>>>>>>>
>>>>>>>>    1. We keep the scope of the KIP the same, *but we only
>>>> implement* *it in
>>>>>>>>    phases*
>>>>>>>>    2. Phase one could include what Guozhang had proposed earlier
>>>> namely
>>>>>>>>    1. > 1.a) modifying ProcessorContext only with the output
>> types
>>>> on
>>>>>>>>       forward.
>>>>>>>>       > 1.b) modifying Transformer signature to have generics of
>>>>>>>>       ProcessorContext,
>>>>>>>>       > and then lift the restricting of not using punctuate: if
>>>> user did
>>>>>>>>       not
>>>>>>>>       > follow the enforced typing and just code without
>> generics,
>>>> they
>>>>>>>>       will get
>>>>>>>>       > warning at compile time and get run-time error if they
>>>> forward
>>>>>>>>       wrong-typed
>>>>>>>>       > records, which I think would be acceptable.
>>>>>>>>    3. Then we could tackle other pieces in an incremental manner
>>> as
>>>> we see
>>>>>>>>    what makes sense
>>>>>>>>
>>>>>>>> Just my 2cents
>>>>>>>>
>>>>>>>> -Bill
>>>>>>>>
>>>>>>>> On Mon, Jun 24, 2019 at 10:22 PM Guozhang Wang <
>>> wangg...@gmail.com>
>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi John,
>>>>>>>>>
>>>>>>>>> Yeah I think we should not do all the repackaging as part of
>> this
>>>> KIP as
>>>>>>>>> well (we can just do the movement of the Processor /
>>>> ProcessorSupplier),
>>>>>>>>> but I think we need to discuss the end goal here since
>> otherwise
>>>> we may
>>>>>>>> do
>>>>>>>>> the repackaging of Processor in this KIP, but only later on
>>>> realizing
>>>>>>>> that
>>>>>>>>> other re-packagings are not our favorite solutions.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Guozhang
>>>>>>>>>
>>>>>>>>> On Mon, Jun 24, 2019 at 3:06 PM John Roesler <
>> j...@confluent.io>
>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hey Guozhang,
>>>>>>>>>>
>>>>>>>>>> Thanks for the idea! I'm wondering if we could take a middle
>>>> ground
>>>>>>>>>> and take your proposed layout as a "roadmap", while only
>>> actually
>>>>>>>>>> moving the classes that are already involved in this KIP.
>>>>>>>>>>
>>>>>>>>>> The reason I ask is not just to control the scope of this KIP,
>>> but
>>>>>>>>>> also, I think that if we move other classes to new packages,
>> we
>>>> might
>>>>>>>>>> also want to take the opportunity to clean up other things
>> about
>>>> them.
>>>>>>>>>> But each one of those would become a discussion point of its
>>> own,
>>>> so
>>>>>>>>>> it seems the discussion would become intractable. FWIW, I do
>>> like
>>>> your
>>>>>>>>>> idea for precisely this reason, it creates opportunities for
>> us
>>> to
>>>>>>>>>> consider other changes that we are simply not able to make
>>> without
>>>>>>>>>> breaking source compatibility.
>>>>>>>>>>
>>>>>>>>>> If the others feel "kind of favorable" with this overall
>> vision,
>>>> maybe
>>>>>>>>>> we can make one or more Jira tickets to capture it, and then
>>> just
>>>>>>>>>> alter _this_ proposal to `processor.api.Processor` (etc).
>>>>>>>>>>
>>>>>>>>>> WDYT?
>>>>>>>>>> -John
>>>>>>>>>>
>>>>>>>>>> On Sun, Jun 23, 2019 at 7:17 PM Guozhang Wang <
>>> wangg...@gmail.com
>>>>>
>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hello John,
>>>>>>>>>>>
>>>>>>>>>>> Thanks for your detailed explanation, I've done some quick
>>>> checks on
>>>>>>>>> some
>>>>>>>>>>> existing examples that heavily used Processor and the results
>>>> also
>>>>>>>>> makes
>>>>>>>>>> me
>>>>>>>>>>> worried about my previous statements that "the breakage would
>>>> not be
>>>>>>>>>> big".
>>>>>>>>>>> I agree we should maintain compatibility.
>>>>>>>>>>>
>>>>>>>>>>> About the naming itself, I'm actually a bit inclined into
>>>>>>>> sub-packages
>>>>>>>>>> than
>>>>>>>>>>> renamed new classes, and my motivations are that our current
>>>>>>>> packaging
>>>>>>>>> is
>>>>>>>>>>> already quite coarsen grained and sometimes ill-placed, and
>>> hence
>>>>>>>> maybe
>>>>>>>>>> we
>>>>>>>>>>> can take this change along with some clean up on packages
>> (but
>>>> again,
>>>>>>>>> we
>>>>>>>>>>> should follow the deprecate - removal path). What I'm
>> thinking
>>>> is:
>>>>>>>>>>>
>>>>>>>>>>> -------------------
>>>>>>>>>>>
>>>>>>>>>>> processor/:
>>>> StateRestoreCallback/AbstractNotifyingRestoreCallback,
>>>>>>>>>> (deprecated
>>>>>>>>>>> later, same meaning for other cross-throughs),
>> ProcessContest,
>>>>>>>>>>> RecordContext, Punctuator, PunctuationType, To, Cancellable
>>> (are
>>>> the
>>>>>>>>> only
>>>>>>>>>>> things left)
>>>>>>>>>>>
>>>>>>>>>>> (new) processor/api/: Processor, ProcessorSupplier (and of
>>>> course,
>>>>>>>>> these
>>>>>>>>>>> two classes can be strong typed)
>>>>>>>>>>>
>>>>>>>>>>> state/: StateStore, BatchingStateRestoreCallback,
>>>>>>>>>>> AbstractNotifyingBatchingRestoreCallback (moved from
>>> processor/),
>>>>>>>>>>> PartitionGrouper, WindowStoreIterator, StateSerdes (this one
>>> can
>>>> be
>>>>>>>>> moved
>>>>>>>>>>> into state/internals), TimestampedByteStore (we can move this
>>> to
>>>>>>>>>> internals
>>>>>>>>>>> since store types would use vat by default, see below),
>>>>>>>>> ValueAndTimestamp
>>>>>>>>>>>
>>>>>>>>>>> (new) state/factory/: Stores, StoreBuilder, StoreSupplier;
>>> *BUT*
>>>> the
>>>>>>>>> new
>>>>>>>>>>> Stores would not have timestampedXXBuilder APIs since the
>>> default
>>>>>>>>>>> StoreSupplier / StoreBuilder value types are
>> ValueAndTimestamp
>>>>>>>> already.
>>>>>>>>>>>
>>>>>>>>>>> (new) state/queryable/: QueryableStoreType,
>>> QueryableStoreTypes,
>>>>>>>>> HostInfo
>>>>>>>>>>>
>>>>>>>>>>> (new) state/keyValue/: KeyValueXXX classes, and also the same
>>> for
>>>>>>>>>>> state/sessionWindow and state/timeWindow; *BUT* here we use
>>>>>>>>>>> ValueAndTimestamp as value types of those APIs directly, and
>>> also
>>>>>>>>>>> TimestampedKeyValue/WindowStore would be deprecated.
>>>>>>>>>>>
>>>>>>>>>>> (new) kstream/api/: KStream, KTable, GroupedKStream (renamed
>>> from
>>>>>>>>>>> KGroupedStream), GroupedKTable (renamed from KGroupedTable),
>>>>>>>>>>> TimeWindowedKStream, SessionWindowedKStream, GlobalKTable
>>>>>>>>>>>
>>>>>>>>>>> (new) kstream/operator/: Aggregator, ForeachFunction,  ... ,
>>>> Merger
>>>>>>>> and
>>>>>>>>>>> Grouped, Joined, Materialized, ... , Printed and Transformer,
>>>>>>>>>>> TransformerSupplier.
>>>>>>>>>>>
>>>>>>>>>>> (new) kstream/window/: Window, Windows, Windowed,
>> TimeWindows,
>>>>>>>>>>> SessionWindows, UnlimitedWindows, JoinWindows,
>> WindowedSerdes,
>>>>>>>>>>> Time/SessionWindowedSerialized/Deserializer.
>>>>>>>>>>>
>>>>>>>>>>> (new) configure/: RocksDBConfigSetter, TopicNameExtractor,
>>>>>>>>>>> TimestampExtractor, UsePreviousTimeOnInvalidTimestamp,
>>>>>>>>>>> WallclockTimestampExtractor, ExtractRecordMetadataTimestamp,
>>>>>>>>>>> FailOnInvalidTimestamp, LogAndSkipOnInvalidTimestamp,
>>>>>>>>>> StateRestoreListener,
>>>>>>>>>>>
>>>>>>>>>>> (new) metadata/: StreamsMetadata, ThreadMetadata,
>> TaskMetadata,
>>>>>>>> TaskId
>>>>>>>>>>>
>>>>>>>>>>> Still, any xxx/internals packages are declared as inner
>>> classes,
>>>> but
>>>>>>>>>> other
>>>>>>>>>>> xxx/yyy packages are declared as public APIs.
>>>>>>>>>>>
>>>>>>>>>>> -------------------
>>>>>>>>>>>
>>>>>>>>>>> This is a very wild thought and I can totally understand if
>>>> people
>>>>>>>> feel
>>>>>>>>>>> this is too much since it definitely enlarges the scope of
>> this
>>>> KIP a
>>>>>>>>> lot
>>>>>>>>>>> :) just trying to play a devil's advocate here to do major
>>>>>>>> refactoring
>>>>>>>>>> and
>>>>>>>>>>> avoid renaming Processor classes.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Guozhang
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Jun 21, 2019 at 9:51 PM Matthias J. Sax <
>>>>>>>> matth...@confluent.io
>>>>>>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> I think `RecordProcessor` is a good name.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>
>>>>>>>>>>>> On 6/21/19 5:09 PM, John Roesler wrote:
>>>>>>>>>>>>> After kicking the naming around a bit more, it seems like
>> any
>>>>>>>>> package
>>>>>>>>>>>>> name change is a bit "weird" because it fragments the
>> package
>>>> and
>>>>>>>>>>>>> directory structure. If we can come up with a reasonable
>> name
>>>> for
>>>>>>>>> the
>>>>>>>>>>>>> interface after all, it seems like the better choice.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The real challenge is that the existing name "Processor"
>>> seems
>>>>>>>> just
>>>>>>>>>>>>> about perfect. In picking a new name, we need to consider
>> the
>>>>>>>>>> ultimate
>>>>>>>>>>>>> state, after the deprecation period, when we entirely
>> remove
>>>>>>>>>>>>> Processor. In this context, TypedProcessor seems a little
>> odd
>>>> to
>>>>>>>>> me,
>>>>>>>>>>>>> because it seems to imply that there should also be an
>>> "untyped
>>>>>>>>>>>>> processor".
>>>>>>>>>>>>>
>>>>>>>>>>>>> After kicking around a few other ideas, what does everyone
>>>> think
>>>>>>>>>> about
>>>>>>>>>>>>> "RecordProcessor"? I _think_ maybe it stands on its own
>> just
>>>>>>>> fine,
>>>>>>>>>>>>> because it's a thing that processes... records?
>>>>>>>>>>>>>
>>>>>>>>>>>>> If others agree with this, I can change the proposal to
>>>>>>>>>> RecordProcessor.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> -John
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Jun 21, 2019 at 6:42 PM John Roesler <
>>>> j...@confluent.io>
>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I've updated the KIP with the feedback so far.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The naming question is still the biggest (only?)
>> outstanding
>>>>>>>>> issue.
>>>>>>>>>> It
>>>>>>>>>>>>>> would be good to hear some more thoughts on it.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> As we stand now, there's one vote for changing the package
>>>> name
>>>>>>>> to
>>>>>>>>>>>>>> something like 'typedprocessor', one for changing the
>>>> interface
>>>>>>>> to
>>>>>>>>>>>>>> TypedProcessor (as in the PoC), and one for just changing
>>> the
>>>>>>>>>>>>>> Processor interface in-place, breaking source
>> compatibility.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> How can we resolve this decision?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Jun 20, 2019 at 5:44 PM John Roesler <
>>>> j...@confluent.io
>>>>>>>>>
>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks for the feedback, Guozhang and Matthias,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Regarding motivation: I'll update the wiki. Briefly:
>>>>>>>>>>>>>>> * Any processor can benefit. Imagine a pure user of the
>>>>>>>>>> ProcessorAPI
>>>>>>>>>>>>>>> who has very complex processing logic. I have seen
>> several
>>>>>>>>>> processor
>>>>>>>>>>>>>>> implementation that are hundreds of lines long and call
>>>>>>>>>>>>>>> `context.forward` in many different locations and
>> branches.
>>>> In
>>>>>>>>>> such an
>>>>>>>>>>>>>>> implementation, it would be very easy to have a bug in a
>>>> rarely
>>>>>>>>>> used
>>>>>>>>>>>>>>> branch that forwards the wrong kind of value. This would
>>>>>>>>>> structurally
>>>>>>>>>>>>>>> prevent that from happening.
>>>>>>>>>>>>>>> * Also, anyone who heavily uses the ProcessorAPI would
>>> likely
>>>>>>>>> have
>>>>>>>>>>>>>>> developed helper methods to wire together processors,
>> just
>>> as
>>>>>>>> we
>>>>>>>>>> have
>>>>>>>>>>>>>>> in the DSL implementation. This change would enable them
>> to
>>>>>>>>> ensure
>>>>>>>>>> at
>>>>>>>>>>>>>>> compile time that they are actually wiring together
>>>> compatible
>>>>>>>>>> types.
>>>>>>>>>>>>>>> This was actually _my_ original motivation, since I found
>>> it
>>>>>>>> very
>>>>>>>>>>>>>>> difficult and time consuming to follow the Streams DSL
>>>> internal
>>>>>>>>>>>>>>> builders.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Regarding breaking the source compatibility of
>> Processor: I
>>>>>>>> would
>>>>>>>>>>>>>>> _love_ to side-step the naming problem, but I really
>> don't
>>>> know
>>>>>>>>> if
>>>>>>>>>>>>>>> it's excusable to break compatibility. I suspect that our
>>>>>>>> oldest
>>>>>>>>>> and
>>>>>>>>>>>>>>> dearest friends are using the ProcessorAPI in some form
>> or
>>>>>>>>> another,
>>>>>>>>>>>>>>> and all their source code would break. It sucks to have
>> to
>>>>>>>>> create a
>>>>>>>>>>>>>>> whole new interface to get around this, but it feels like
>>> the
>>>>>>>>> right
>>>>>>>>>>>>>>> thing to do. Would be nice to get even more feedback on
>>> this
>>>>>>>>> point,
>>>>>>>>>>>>>>> though.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Regarding the types of stores, as I said in my response
>> to
>>>>>>>>> Sophie,
>>>>>>>>>>>>>>> it's not an issue.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Regarding the change to StreamsBuilder, it doesn't pin
>> the
>>>>>>>> types
>>>>>>>>> in
>>>>>>>>>>>>>>> any way, since all the types are bounded by Object only,
>>> and
>>>>>>>>> there
>>>>>>>>>> are
>>>>>>>>>>>>>>> no extra constraints between arguments (each type is used
>>>> only
>>>>>>>>>> once in
>>>>>>>>>>>>>>> one argument). But maybe I missed the point you were
>> asking
>>>>>>>>> about.
>>>>>>>>>>>>>>> Since the type takes generic paramters, we should allow
>>> users
>>>>>>>> to
>>>>>>>>>> pass
>>>>>>>>>>>>>>> in parameterized arguments. Otherwise, they would _have
>> to_
>>>>>>>> give
>>>>>>>>>> us a
>>>>>>>>>>>>>>> raw type, and they would be forced to get a "rawtyes"
>>> warning
>>>>>>>>> from
>>>>>>>>>> the
>>>>>>>>>>>>>>> compiler. So, it's our obligation in any API that
>> accepts a
>>>>>>>>>>>>>>> parameterized-type parameter to allow people to actually
>>>> pass a
>>>>>>>>>>>>>>> parameterized type, even if we don't actually use the
>>>>>>>> parameters.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The naming question is a complex one, as I took pains to
>>>> detail
>>>>>>>>>>>>>>> previously. Please don't just pick out one minor point,
>>> call
>>>> it
>>>>>>>>>> weak,
>>>>>>>>>>>>>>> and then claim that it invalidates the whole decision. I
>>>> don't
>>>>>>>>>> think
>>>>>>>>>>>>>>> there's a clear best choice, so I'm more than happy for
>>>> someone
>>>>>>>>> to
>>>>>>>>>>>>>>> advocate for renaming the class instead of the package.
>> Can
>>>> you
>>>>>>>>>>>>>>> provide some reasons why you think that would be better?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Regarding the deprecated methods, you're absolutely
>> right.
>>>> I'll
>>>>>>>>>>> update the KIP.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks again for all the feedback!
>>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Jun 20, 2019 at 4:34 PM Matthias J. Sax <
>>>>>>>>>> matth...@confluent.io>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Just want to second what Sophie said about the stores.
>> The
>>>>>>>> type
>>>>>>>>>> of a
>>>>>>>>>>>>>>>> used stores is completely independent of input/output
>>> types.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> This related to change `addGlobalStore()` method. Why do
>>> you
>>>>>>>>> want
>>>>>>>>>> to
>>>>>>>>>>> pin
>>>>>>>>>>>>>>>> the types? In fact, people request the ability to
>> filter()
>>>> and
>>>>>>>>>> maybe
>>>>>>>>>>>>>>>> even map() the data before they are put into the global
>>>> store.
>>>>>>>>>>> Limiting
>>>>>>>>>>>>>>>> the types seems to be a step backward here?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Also, the pack name is questionable.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> This wouldn't be the first project to do something like
>>>>>>>> this...
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Not a strong argument. I would actually propose to not
>> a a
>>>> new
>>>>>>>>>>> package,
>>>>>>>>>>>>>>>> but just a new class `TypedProcessor`.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> For `ProcessorContext#forward` methods -- some of those
>>>>>>>> methods
>>>>>>>>>> are
>>>>>>>>>>>>>>>> already deprecated. While the will still be affected, it
>>>> would
>>>>>>>>> be
>>>>>>>>>>> worth
>>>>>>>>>>>>>>>> to mark them as deprecated in the wiki page, too.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> @Guozhang: I dont' think we should break source
>>>> compatibility
>>>>>>>>> in a
>>>>>>>>>>> minor
>>>>>>>>>>>>>>>> release.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On 6/20/19 1:43 PM, Guozhang Wang wrote:
>>>>>>>>>>>>>>>>> Hi John,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks for KIP! I've a few comments below:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 1. So far the "Motivation" section is very general, and
>>> the
>>>>>>>>> only
>>>>>>>>>>> concrete
>>>>>>>>>>>>>>>>> example that I have in mind is
>>> `TransformValues#punctuate`.
>>>>>>>> Do
>>>>>>>>> we
>>>>>>>>>>> have any
>>>>>>>>>>>>>>>>> other concrete issues that drive this KIP? If not then
>> I
>>>> feel
>>>>>>>>>>> better to
>>>>>>>>>>>>>>>>> narrow the scope of this KIP to:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 1.a) modifying ProcessorContext only with the output
>>> types
>>>> on
>>>>>>>>>>> forward.
>>>>>>>>>>>>>>>>> 1.b) modifying Transformer signature to have generics
>> of
>>>>>>>>>>> ProcessorContext,
>>>>>>>>>>>>>>>>> and then lift the restricting of not using punctuate:
>> if
>>>> user
>>>>>>>>> did
>>>>>>>>>>> not
>>>>>>>>>>>>>>>>> follow the enforced typing and just code without
>>> generics,
>>>>>>>> they
>>>>>>>>>>> will get
>>>>>>>>>>>>>>>>> warning at compile time and get run-time error if they
>>>>>>>> forward
>>>>>>>>>>> wrong-typed
>>>>>>>>>>>>>>>>> records, which I think would be acceptable.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I feel this would be a good solution for this specific
>>>> issue;
>>>>>>>>>>> again, feel
>>>>>>>>>>>>>>>>> free to update the wiki page with other known issues
>> that
>>>>>>>>> cannot
>>>>>>>>>> be
>>>>>>>>>>>>>>>>> resolved.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 2. If, we want to go with the current scope then my
>> next
>>>>>>>>> question
>>>>>>>>>>> would be,
>>>>>>>>>>>>>>>>> how much breakage we would introducing if we just
>> modify
>>>> the
>>>>>>>>>>> Processor
>>>>>>>>>>>>>>>>> signature directly? My feeling is that DSL users would
>> be
>>>>>>>> most
>>>>>>>>>>> likely not
>>>>>>>>>>>>>>>>> affected and PAPI users only need to modify a few lines
>>> on
>>>>>>>>> class
>>>>>>>>>>>>>>>>> declaration. I feel it worth doing some research on
>> this
>>>> part
>>>>>>>>> and
>>>>>>>>>>> then
>>>>>>>>>>>>>>>>> decide if we really want to bite the bullet of
>> duplicated
>>>>>>>>>> Processor
>>>>>>>>>>> /
>>>>>>>>>>>>>>>>> ProcessorSupplier classes for maintaining
>> compatibility.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Wed, Jun 19, 2019 at 12:21 PM John Roesler <
>>>>>>>>> j...@confluent.io
>>>>>>>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> In response to the feedback so far, I changed the
>>> package
>>>>>>>> name
>>>>>>>>>> from
>>>>>>>>>>>>>>>>>> `processor2` to `processor.generic`.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Mon, Jun 17, 2019 at 4:49 PM John Roesler <
>>>>>>>>> j...@confluent.io
>>>>>>>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks for the feedback, Sophie!
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I actually felt a little uneasy when I wrote that
>>> remark,
>>>>>>>>>> because
>>>>>>>>>>> it's
>>>>>>>>>>>>>>>>>>> not restricted at all in the API, it's just available
>>> to
>>>>>>>> you
>>>>>>>>> if
>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>>> choose to give your stores and context the same
>>>> parameters.
>>>>>>>>>> So, I
>>>>>>>>>>>>>>>>>>> think your use case is valid, and also perfectly
>>>>>>>> permissable
>>>>>>>>>>> under the
>>>>>>>>>>>>>>>>>>> current KIP. Sorry for sowing confusion on my own
>>>>>>>> discussion
>>>>>>>>>>> thread!
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I'm not crazy about the package name, either. I went
>>> with
>>>>>>>> it
>>>>>>>>>> only
>>>>>>>>>>>>>>>>>>> because there's seemingly nothing special about the
>> new
>>>>>>>>> package
>>>>>>>>>>> except
>>>>>>>>>>>>>>>>>>> that it can't have the same name as the old one.
>>>> Otherwise,
>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> existing "processor" and "Processor" names for the
>>>> package
>>>>>>>>> and
>>>>>>>>>>> class
>>>>>>>>>>>>>>>>>>> are perfectly satisfying. Rather than pile on
>>> additional
>>>>>>>>>>> semantics, it
>>>>>>>>>>>>>>>>>>> seemed cleaner to just add a number to the package
>>> name.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> This wouldn't be the first project to do something
>> like
>>>>>>>>> this...
>>>>>>>>>>> Apache
>>>>>>>>>>>>>>>>>>> Commons, for example, has added a "2" to the end of
>>> some
>>>> of
>>>>>>>>>> their
>>>>>>>>>>>>>>>>>>> packages for exactly the same reason.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I'm open to any suggestions. For example, we could do
>>>>>>>>> something
>>>>>>>>>>> like
>>>>>>>>>>>>>>>>>>> org.apache.kafka.streams.typedprocessor.Processor or
>>>>>>>>>>>>>>>>>>> org.apache.kafka.streams.processor.typed.Processor ,
>>>> which
>>>>>>>>>> would
>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>> just about the same effect. One microscopic thought
>> is
>>>>>>>> that,
>>>>>>>>> if
>>>>>>>>>>>>>>>>>>> there's another interface in the "processor" package
>>> that
>>>>>>>> we
>>>>>>>>>> wish
>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> do the same thing to, would _could_ pile it in to
>>>>>>>>> "processor2",
>>>>>>>>>>> but we
>>>>>>>>>>>>>>>>>>> couldn't do the same if we use a package that has
>>> "typed"
>>>>>>>> in
>>>>>>>>>> the
>>>>>>>>>>> name,
>>>>>>>>>>>>>>>>>>> unless that change is _also_ related to types in some
>>>> way.
>>>>>>>>> But
>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>> seems like a very minor concern.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> What's your preference?
>>>>>>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Mon, Jun 17, 2019 at 3:56 PM Sophie Blee-Goldman <
>>>>>>>>>>> sop...@confluent.io>
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hey John, thanks for writing this up! I like the
>>>> proposal
>>>>>>>>> but
>>>>>>>>>>> there's
>>>>>>>>>>>>>>>>>> one
>>>>>>>>>>>>>>>>>>>> point that I think may be too restrictive:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> "A processor that happens to use a typed store is
>>>> actually
>>>>>>>>>>> emitting the
>>>>>>>>>>>>>>>>>>>> same types that it is storing."
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I can imagine someone could want to leverage this
>> new
>>>> type
>>>>>>>>>> safety
>>>>>>>>>>>>>>>>>> without
>>>>>>>>>>>>>>>>>>>> also limiting how they can interact with/use their
>>>> store.
>>>>>>>> As
>>>>>>>>>> an
>>>>>>>>>>>>>>>>>> (admittedly
>>>>>>>>>>>>>>>>>>>> contrived) example, say you have an input stream of
>>>>>>>>> purchases
>>>>>>>>>> of
>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>> certain
>>>>>>>>>>>>>>>>>>>> type (entertainment, food, etc), and on seeing a new
>>>>>>>> record
>>>>>>>>>> you
>>>>>>>>>>> want to
>>>>>>>>>>>>>>>>>>>> output how many types of purchase a shopper has made
>>>> more
>>>>>>>>>> than 5
>>>>>>>>>>>>>>>>>> purchases
>>>>>>>>>>>>>>>>>>>> of in the last month. Your state store will probably
>>> be
>>>>>>>>>> holding
>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>> more
>>>>>>>>>>>>>>>>>>>> complicated PurchaseHistory object (keyed by user),
>>> but
>>>>>>>> your
>>>>>>>>>>> output is
>>>>>>>>>>>>>>>>>> just
>>>>>>>>>>>>>>>>>>>> a <User, Long>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I'm also not crazy about "processor2" as the package
>>>> name
>>>>>>>>> ...
>>>>>>>>>>> not sure
>>>>>>>>>>>>>>>>>> what
>>>>>>>>>>>>>>>>>>>> a better one would be though (something with
>> "typed"?)
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Mon, Jun 17, 2019 at 12:47 PM John Roesler <
>>>>>>>>>> j...@confluent.io>
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I'd like to propose KIP-478 (
>>>>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/x/2SkLBw
>>>>>>>>>>>>>>>>>>>>> ).
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> This proposal would add output type bounds to the
>>>>>>>> Processor
>>>>>>>>>>> interface
>>>>>>>>>>>>>>>>>>>>> in Kafka Streams, which enables static checking of
>> a
>>>>>>>> number
>>>>>>>>>> of
>>>>>>>>>>> useful
>>>>>>>>>>>>>>>>>>>>> properties:
>>>>>>>>>>>>>>>>>>>>> * A processor B that consumes the output of
>>> processor A
>>>>>>>> is
>>>>>>>>>>> actually
>>>>>>>>>>>>>>>>>>>>> expecting the same types that processor A produces.
>>>>>>>>>>>>>>>>>>>>> * A processor that happens to use a typed store is
>>>>>>>> actually
>>>>>>>>>>> emitting
>>>>>>>>>>>>>>>>>>>>> the same types that it is storing.
>>>>>>>>>>>>>>>>>>>>> * A processor is simply forwarding the expected
>> types
>>>> in
>>>>>>>>> all
>>>>>>>>>>> code
>>>>>>>>>>>>>>>>>> paths.
>>>>>>>>>>>>>>>>>>>>> * Processors added via the Streams DSL, which are
>> not
>>>>>>>>>> permitted
>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>> forward results at all are statically prevented
>> from
>>>>>>>> doing
>>>>>>>>> so
>>>>>>>>>>> by the
>>>>>>>>>>>>>>>>>>>>> compiler
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Internally, we can use the above properties to
>>> achieve
>>>> a
>>>>>>>>> much
>>>>>>>>>>> higher
>>>>>>>>>>>>>>>>>>>>> level of confidence in the Streams DSL
>>> implementation's
>>>>>>>>>>> correctness.
>>>>>>>>>>>>>>>>>>>>> Actually, while doing the POC, I found a few bugs
>> and
>>>>>>>>>> mistakes,
>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>>>> become structurally impossible with KIP-478.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Additionally, the stronger types dramatically
>> improve
>>>> the
>>>>>>>>>>>>>>>>>>>>> self-documentation of our Streams internal
>>>>>>>> implementations,
>>>>>>>>>>> which
>>>>>>>>>>>>>>>>>>>>> makes it much easier for new contributors to ramp
>> up
>>>> with
>>>>>>>>>>> confidence.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thanks so much for your consideration!
>>>>>>>>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> -- Guozhang
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> -- Guozhang
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>
>>>
>>> --
>>> -- Guozhang
>>>
>>
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to