Thanks John.

SGTM.

On 10/1/20 2:50 PM, John Roesler wrote:
> Hello again, all,
> 
> I'm sorry to make another tweak to this KIP, but during the
> implementation of the design we've just agreed on, I
> realized that Processors would almost never need to
> reference the RecordMetadata. Therefore, I'm proposing to
> streamline the API by moving the Optional<RecordMetadata> to
> the new ProcessorContext as a method, rather than making it
> a method argument to Processor#process.
> 
> The change is visible here:
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=16&selectedPageVersions=15
> 
> All of the same semantics and considerations we discussed
> still apply, it's just that Processor implementers won't
> have to think about it unless they actually _need_ the
> topic/partition/offset information from the RecordMetadata.
> 
> Also, the PR for this part of the KIP is now available here:
> https://github.com/apache/kafka/pull/9361
> 
> I know it's a bit on the heavy side; I've annotated the PR
> to try and ease the reviewer's job. I'd greatly appreciate
> it if anyone can take the time to review.
> 
> Thanks,
> -John
> 
> On Wed, 2020-09-30 at 10:16 -0500, John Roesler wrote:
>> Thanks, Matthias!
>>
>> I can certainly document it. I didn't bother because the old
>> Processor, Supplier, and Context will themselves be
>> deprecated, so any method that handles them won't be able to
>> avoid the deprecation warning. Nevertheless, it doesn't hurt
>> just to explicitly deprecated those methods.
>>
>> Thanks,
>> -John
>>
>> On Wed, 2020-09-30 at 08:10 -0700, Matthias J. Sax wrote:
>>> Thanks John. I like the proposal.
>>>
>>> Btw: I was just going over the KIP and realized that we add new methods
>>> to `StreamBuilder`, `Topology`, and `KStream` that take the new
>>> `ProcessorSupplier` class -- should we also deprecate the corresponding
>>> existing ones that take the old `ProcessorSupplier`?
>>>
>>>
>>> -Matthias
>>>
>>>
>>> On 9/30/20 7:46 AM, John Roesler wrote:
>>>> Thanks Paul and Sophie,
>>>>
>>>> Your feedback certainly underscores the need to be explicit
>>>> in the javadoc about why that parameter is Optional. Getting
>>>> this kind of feedback before the release is exactly the kind
>>>> of outcome we hope to get from the KIP process!
>>>>
>>>> Thanks,
>>>> -John
>>>>
>>>> On Tue, 2020-09-29 at 22:32 -0500, Paul Whalen wrote:
>>>>> John, I totally agree that adding a method to Processor is cumbersome and
>>>>> not a good path.  I was imagining maybe a separate interface that could be
>>>>> used in the appropriate context, but I don't think that makes too much
>>>>> sense - it's just too far away from what Kafka Streams is.  I was
>>>>> originally more interested in the "why" Optional than the "how" (I think 
>>>>> my
>>>>> original reply overplayed the "optional as an argument" concern).  But
>>>>> you've convinced me that there is a perfectly legitimate "why".  We should
>>>>> make sure that it's clear why it's Optional, but I suppose that goes
>>>>> without saying.  It's a nice opportunity to make the API reflect more what
>>>>> is actually going on under the hood.
>>>>>
>>>>> Thanks!
>>>>> Paul
>>>>>
>>>>> On Tue, Sep 29, 2020 at 10:05 PM Sophie Blee-Goldman <sop...@confluent.io>
>>>>> wrote:
>>>>>
>>>>>> FWIW, while I'm really not a fan of Optional in general, I agree that its
>>>>>> usage
>>>>>> here seems appropriate. Even for those rare software developers who
>>>>>> carefully
>>>>>> read all the docs several times over, I think it wouldn't be too hard to
>>>>>> miss a
>>>>>> note about the RecordMetadata possibly being null.
>>>>>>
>>>>>> Especially because it's not that obvious why at first glance, and takes a
>>>>>> bit of
>>>>>> thinking to realize that records originating from a Punctuator wouldn't
>>>>>> have a
>>>>>> "current record". This  is something that has definitely confused users
>>>>>> today.
>>>>>>
>>>>>> It's on us to improve the education here -- and an 
>>>>>> Optional<RecordMetadata>
>>>>>> would naturally raise awareness of this subtlety
>>>>>>
>>>>>> On Tue, Sep 29, 2020 at 7:40 PM Sophie Blee-Goldman <sop...@confluent.io>
>>>>>> wrote:
>>>>>>
>>>>>>> Does my reply address your concerns?
>>>>>>>
>>>>>>>
>>>>>>> Yes; also, I definitely misread part of the proposal earlier and thought
>>>>>>> you had put
>>>>>>> the timestamp field in RecordMetadata. Sorry for not giving things a
>>>>>>> closer look
>>>>>>> before responding! I'm not sure my original message made much sense 
>>>>>>> given
>>>>>>> the misunderstanding, but thanks for responding anyway :P
>>>>>>>
>>>>>>> Having given the proposal a second pass, I agree, it's very elegant. +1
>>>>>>>
>>>>>>> On Tue, Sep 29, 2020 at 6:50 PM John Roesler <vvcep...@apache.org>
>>>>>> wrote:
>>>>>>>> Thanks for the reply, Sophie,
>>>>>>>>
>>>>>>>> I think I may have summarized too much in my prior reply.
>>>>>>>>
>>>>>>>> In the currently proposed KIP, any caller of forward() must
>>>>>>>> supply a Record, which consists of:
>>>>>>>> * key
>>>>>>>> * value
>>>>>>>> * timestamp
>>>>>>>> * headers (with a convenience constructor that sets empty
>>>>>>>> headers)
>>>>>>>>
>>>>>>>> These aren't what I was referring to as potentially being
>>>>>>>> undefined downstream, since thanks to the introduction of
>>>>>>>> Record, they are, as you're advocating, required to be
>>>>>>>> defined everywhere, even when forwarding from a punctuator.
>>>>>>>>
>>>>>>>> So to be clear, the intent of this change is actually to
>>>>>>>> _enforce_ that timestamp would never be undefined (which it
>>>>>>>> currently can be). Also, since punctuators _are_ going to
>>>>>>>> have to "make up" a timestamp going forward, we should note
>>>>>>>> that the "punctuate" method currently passes in a good
>>>>>>>> timestamp that they can use: for system-time punctuations,
>>>>>>>> they receive the current system time, and for stream-time
>>>>>>>> punctuations, they get the current stream time.
>>>>>>>>
>>>>>>>> The potentially undefined RecordMetadata only contains these
>>>>>>>> fields:
>>>>>>>> * topic
>>>>>>>> * partition
>>>>>>>> * offset
>>>>>>>>
>>>>>>>> These fields aren't required (or even used) in a Sink, and
>>>>>>>> it doesn't seem like they would be important to many
>>>>>>>> applications. Furthermore, it doesn't _seem_ like you'd even
>>>>>>>> want to set these fields. They seem purely informational and
>>>>>>>> only useful in the context when you are actually processing
>>>>>>>> a real input record. It doesn't sound like you were asking
>>>>>>>> for it, but just to put it on the record, I think if we were
>>>>>>>> to require values for the metadata from punctuators, people
>>>>>>>> would mostly just make up their own dummy values, to no
>>>>>>>> one's benefit.
>>>>>>>>
>>>>>>>> I should also note that with the current
>>>>>>>> Record/RecordMetadata split, we will have the freedom to
>>>>>>>> move fields into the Record class (or even add new fields)
>>>>>>>> if we want them to become "data" as opposed to "metadata" in
>>>>>>>> the future.
>>>>>>>>
>>>>>>>> Thanks for your reply; I was similarly floored when I
>>>>>>>> realized the true nature of the current situation. Does my
>>>>>>>> reply address your concerns?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> -John
>>>>>>>>
>>>>>>>> On Tue, 2020-09-29 at 18:34 -0700, Sophie Blee-Goldman
>>>>>>>> wrote:
>>>>>>>>>> However, the record metadata is only defined when the parent
>>>>>> forwards
>>>>>>>>>> while processing a
>>>>>>>>>
>>>>>>>>> real record, not when it calls forward from the punctuator
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Can we take a step back for a second...why wouldn't you be required to
>>>>>>>> set
>>>>>>>>> the RecordContext
>>>>>>>>> yourself when calling forward from a Punctuator? I think I agree with
>>>>>>>> Paul
>>>>>>>>> here, it seems kind of
>>>>>>>>> absurd not to enforce that the RecordContext be present inside the
>>>>>>>>> process() method.
>>>>>>>>>
>>>>>>>>> The original problem with Punctuators, as I understood it, was that
>>>>>> all
>>>>>>>> of
>>>>>>>>> the RecordContext
>>>>>>>>> fields were exposed automatically to both the Processor and any
>>>>>>>> Punctuator,
>>>>>>>>> due to being
>>>>>>>>> direct methods on the ProcessorContext. We can't control which
>>>>>>>>> ProcessorContext methods
>>>>>>>>> someone will call from with a Punctuator vs from a Processor. The best
>>>>>>>> we
>>>>>>>>> could do was
>>>>>>>>> set these "nonsense" fields to null when inside a Punctuator, or set
>>>>>>>> them
>>>>>>>>> to some dummy
>>>>>>>>> values as you pointed out.
>>>>>>>>>
>>>>>>>>> But then you proposed the solution of a separate RecordContext which
>>>>>> is
>>>>>>>> not
>>>>>>>>> attached to the
>>>>>>>>> ProcessorContext at all. This seemed to solve the above problem very
>>>>>>>>> neatly: we only pass
>>>>>>>>> in the RecordContext to the process() method, so we don't have to
>>>>>> worry
>>>>>>>>> about people trying
>>>>>>>>> to access these fields from within a Punctuator. The fields aren't
>>>>>>>>> accessible unless they're
>>>>>>>>> defined.
>>>>>>>>>
>>>>>>>>> So what happens when someone wants to forward something from within a
>>>>>>>>> Punctuator? I
>>>>>>>>> don't think it's reasonable to let the timestamp field be undefined,
>>>>>>>> ever.
>>>>>>>>> What if the Punctuator
>>>>>>>>> forwards directly to a sink, or directly to some windowing logic. Are
>>>>>> we
>>>>>>>>> supposed to add
>>>>>>>>> handling for the RecordContext == null case to every processor? Or are
>>>>>>>> we
>>>>>>>>> just going to
>>>>>>>>> assume the implicit restriction that users will only forward records
>>>>>>>> from a
>>>>>>>>> Punctuator to
>>>>>>>>> downstream processors that know how to handle and/or set the
>>>>>>>> RecordContext
>>>>>>>>> if it's
>>>>>>>>> undefined. That seems to throw away a lot of the awesome safety added
>>>>>> in
>>>>>>>>> this KIP
>>>>>>>>>
>>>>>>>>> Apologies for the rant. But I feel pretty strongly that allowing to
>>>>>>>> forward
>>>>>>>>> records from a
>>>>>>>>> Punctuator without a defined RecordContext would be asking for
>>>>>> trouble.
>>>>>>>>> Imo, if you
>>>>>>>>> want to forward from a Punctuator, you need to store the info you need
>>>>>>>> in
>>>>>>>>> order to
>>>>>>>>> set the timestamp, or make one up yourself
>>>>>>>>>
>>>>>>>>> (the one alternative I can think of here is that maybe we could pass
>>>>>> in
>>>>>>>> the
>>>>>>>>> current
>>>>>>>>> partition time, so users can at least put in a reasonable estimate for
>>>>>>>> the
>>>>>>>>> timestamp
>>>>>>>>> that won't cause it to get dropped and won't potentially lurch the
>>>>>>>>> streamtime far into
>>>>>>>>> the future. This would be similar to what we do in the
>>>>>>>> TimestampExtractor)
>>>>>>>>> On Tue, Sep 29, 2020 at 6:06 PM John Roesler <vvcep...@apache.org>
>>>>>>>> wrote:
>>>>>>>>>> Oh, I guess one other thing I should have mentioned is that I’ve
>>>>>>>> recently
>>>>>>>>>> discovered that in cases where the context is undefined, we
>>>>>> currently
>>>>>>>> just
>>>>>>>>>> fill in dummy values for the context. So there’s a good chance that
>>>>>>>> real
>>>>>>>>>> applications in use are depending on undefined context without even
>>>>>>>>>> realizing it. What I’m hoping to do is just make the situation
>>>>>>>> explicit and
>>>>>>>>>> get rid of the dummy values.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> John
>>>>>>>>>>
>>>>>>>>>> On Tue, Sep 29, 2020, at 20:01, John Roesler wrote:
>>>>>>>>>>> Thanks for the review, Paul!
>>>>>>>>>>>
>>>>>>>>>>> I had read some of that debate before. There seems to be some
>>>>>>>> subtext
>>>>>>>>>>> there, because they advise against using Optional in cases like
>>>>>>>> this,
>>>>>>>>>>> but there doesn’t seem to be a specific reason why it’s
>>>>>>>> inappropriate.
>>>>>>>>>>> I got the impression they were just afraid that people would go
>>>>>>>>>>> overboard and make everything Optional.
>>>>>>>>>>>
>>>>>>>>>>> I could also make two methods, but it seemed like it might be an
>>>>>>>>>>> unfortunate way to handle the issue, since Processor is just
>>>>>> about a
>>>>>>>>>>> Function as-is, but the two-method approach would require people
>>>>>> to
>>>>>>>>>>> implement both methods.
>>>>>>>>>>>
>>>>>>>>>>> To your question, this is something that’s only recently became
>>>>>>>> clear
>>>>>>>>>>> to me. Imagine you have a parent processor that calls forward both
>>>>>>>> from
>>>>>>>>>>> process and a punctuator. The child will have process() invoked in
>>>>>>>> both
>>>>>>>>>>> cases, and won’t be able to distinguish them. However, the record
>>>>>>>>>>> metadata is only defined when the parent forwards while
>>>>>> processing a
>>>>>>>>>>> real record, not when it calls forward from the punctuator.
>>>>>>>>>>>
>>>>>>>>>>> This is why I wanted to make the metadata Optional, to advertise
>>>>>>>> that
>>>>>>>>>>> the metadata might be undefined if any ancestor processor ever
>>>>>> calls
>>>>>>>>>>> forward from a punctuator. We could remove the Optional and
>>>>>> instead
>>>>>>>>>>> just document that the argument might be null.
>>>>>>>>>>>
>>>>>>>>>>> With that context in place, what’s your take?
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> John
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Sep 29, 2020, at 19:09, Paul Whalen wrote:
>>>>>>>>>>>> Looks pretty good to me, though the Processor#process(Record,
>>>>>>>>>>>> Optional<RecordMetadata>) signature caught my eye.  There's some
>>>>>>>>>> debate
>>>>>>>>>>>> (
>>>>>>>>>>>>
>>>>>> https://stackoverflow.com/questions/31922866/why-should-java-8s-optional-not-be-used-in-arguments
>>>>>>>>>> )
>>>>>>>>>>>> about whether to use Optionals in arguments, and while that's a
>>>>>>>> bit of
>>>>>>>>>> a
>>>>>>>>>>>> religious debate in the abstract, it did make me wonder whether
>>>>>> it
>>>>>>>>>> makes
>>>>>>>>>>>> sense in this specific case.  When is it actually not present?
>>>>>> I
>>>>>>>> was
>>>>>>>>>>>> under
>>>>>>>>>>>> the impression that we should always have access to it in
>>>>>>>> process(),
>>>>>>>>>> and
>>>>>>>>>>>> that the concern about metadata being undefined was about having
>>>>>>>>>> access
>>>>>>>>>>>> to
>>>>>>>>>>>> record metadata in the ProcessorContext held for use inside a
>>>>>>>>>>>> Punctuator.
>>>>>>>>>>>>
>>>>>>>>>>>> If that's not the case and it is truly optional in process(), is
>>>>>>>> there
>>>>>>>>>> an
>>>>>>>>>>>> opportunity for an alternate interface for the cases when we
>>>>>>>> don't get
>>>>>>>>>> it,
>>>>>>>>>>>> rather than force the branching on implementers of the
>>>>>> interface?
>>>>>>>>>>>> Apologies if I've missed something, I took a look at the PR and
>>>>>> I
>>>>>>>>>> didn't
>>>>>>>>>>>> see any spots where I thought it would be empty.  Perhaps an
>>>>>>>> example
>>>>>>>>>> of a
>>>>>>>>>>>> Punctuator using (and not using) the new API would clear things
>>>>>>>> up.
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Paul
>>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Sep 29, 2020 at 4:10 PM John Roesler <
>>>>>> vvcep...@apache.org
>>>>>>>>>> wrote:
>>>>>>>>>>>>> Hello again, all,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for the latest round of discussion. I've taken the
>>>>>>>>>>>>> recent feedback and come up with an updated KIP that seems
>>>>>>>>>>>>> actually quite a bit nicer than the prior proposal.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The specific diff on the KIP is here:
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=15&selectedPageVersions=14
>>>>>>>>>>>>> These changes are implemented in this POC PR:
>>>>>>>>>>>>> https://github.com/apache/kafka/pull/9346
>>>>>>>>>>>>>
>>>>>>>>>>>>> The basic idea is that, building on the recent conversaion,
>>>>>>>>>>>>> we would transition away from the current API where we get
>>>>>>>>>>>>> only key/value in the process() method and other "data"
>>>>>>>>>>>>> comes in the ProcessorContext along with the "metadata".
>>>>>>>>>>>>>
>>>>>>>>>>>>> Instead, we formalize what is "data" and what is "metadata",
>>>>>>>>>>>>> and pass it all in to the process method:
>>>>>>>>>>>>> Processor#process(Record, Optional<RecordMetadata>)
>>>>>>>>>>>>>
>>>>>>>>>>>>> Also, you forward the whole data class instead of mutating
>>>>>>>>>>>>> the ProcessorContext fields and also calling forward:
>>>>>>>>>>>>> ProcessorContext#forward(Record)
>>>>>>>>>>>>>
>>>>>>>>>>>>> The Record class itself ships with methods like
>>>>>>>>>>>>> record#withValue(NewV newValue)
>>>>>>>>>>>>> that make a shallow copy of the input Record, enabling
>>>>>>>>>>>>> Processors to safely handle the record without polluting the
>>>>>>>>>>>>> context of their parents and siblings.
>>>>>>>>>>>>>
>>>>>>>>>>>>> This proposal has a number of key benefits:
>>>>>>>>>>>>> * As we've discovered in KAFKA-9584, it's unsafe to mutate
>>>>>>>>>>>>> the Headers via the ProcessorContext. This proposal offers a
>>>>>>>>>>>>> way to safely forward changes only to downstream processors.
>>>>>>>>>>>>> * The new API has symmetry (each processor's input is the
>>>>>>>>>>>>> output of its parent processor)
>>>>>>>>>>>>> * The API makes clear that the record metadata isn't always
>>>>>>>>>>>>> defined (for example, in a punctuation, there is no current
>>>>>>>>>>>>> topic/partition/offset)
>>>>>>>>>>>>> * The API enables punctuators to forward well defined
>>>>>>>>>>>>> headers downstream, which is currently not possible.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Unless their are objections, I'll go ahead and re-finalize
>>>>>>>>>>>>> this KIP and update that PR to a mergeable state.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks, all,
>>>>>>>>>>>>> -John
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, 2020-09-24 at 09:41 -0700, Matthias J. Sax wrote:
>>>>>>>>>>>>>> Interesting proposal. However, I am not totally convinced,
>>>>>>>> because
>>>>>>>>>> I see
>>>>>>>>>>>>>> a fundamental difference between "data" and "metadata".
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Topic/partition/offset are "metadata" in the strong sense
>>>>>> and
>>>>>>>> they
>>>>>>>>>> are
>>>>>>>>>>>>>> immutable.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On the other hand there is "primary" data like key and
>>>>>> value,
>>>>>>>> as
>>>>>>>>>> well as
>>>>>>>>>>>>>> "secondary" data like timestamp and headers. The issue seems
>>>>>>>> that
>>>>>>>>>> we
>>>>>>>>>>>>>> treat "secondary data" more like metadata atm?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thus, promoting timestamp and headers into a first class
>>>>>>>> citizen
>>>>>>>>>> roll
>>>>>>>>>>>>>> make sense to me (my original proposal about `RecordContext`
>>>>>>>> would
>>>>>>>>>> still
>>>>>>>>>>>>>> fall short with this regard). However, putting both (data
>>>>>> and
>>>>>>>>>> metadata)
>>>>>>>>>>>>>> into a `Record` abstraction might go too far?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I am also a little bit concerned about `Record.copy()`
>>>>>>>> because it
>>>>>>>>>> might
>>>>>>>>>>>>>> be a trap: Users might assume it does a full deep copy of
>>>>>> the
>>>>>>>>>> record,
>>>>>>>>>>>>>> however, it would not. It would only create a new `Record`
>>>>>>>> object
>>>>>>>>>> as
>>>>>>>>>>>>>> wrapper that points to the same key/value/header objects as
>>>>>>>> the
>>>>>>>>>> input
>>>>>>>>>>>>>> record.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> With the current `context.forward(key, value)` we don't have
>>>>>>>> this
>>>>>>>>>> "deep
>>>>>>>>>>>>>> copy" issue -- it's pretty clear what is happening.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Instead of `To.all().withTimestamp()` we could also add
>>>>>>>>>>>>>> `context.forward(key, value, timestamp)` etc (just wondering
>>>>>>>> about
>>>>>>>>>> the
>>>>>>>>>>>>>> exposition in overload)?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Also, `Record.withValue` etc sounds odd? Should a record not
>>>>>>>> be
>>>>>>>>>>>>>> immutable? So, we could have something like
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>> `RecordFactory.withKeyValue(...).withTimestamp(...).withHeaders(...).build()`.
>>>>>>>>>>>>>> But it looks rather verbose?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The other question is of course, to what extend to we want
>>>>>> to
>>>>>>>> keep
>>>>>>>>>> the
>>>>>>>>>>>>>> distinction between "primary" and "secondary" data? To me,
>>>>>>>> it's a
>>>>>>>>>>>>>> question of easy of use?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Just putting all this out to move the discussion forward.
>>>>>>>> Don't
>>>>>>>>>> have a
>>>>>>>>>>>>>> concrete proposal atm.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 9/14/20 9:24 AM, John Roesler wrote:
>>>>>>>>>>>>>>> Thanks for this thought, Matthias!
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> To be honest, it's bugged me quite a bit that _all_ the
>>>>>>>>>>>>>>> record information hasn't been an argument to `process`. I
>>>>>>>>>>>>>>> suppose I was trying to be conservative in this proposal,
>>>>>>>>>>>>>>> but then again, if we're adding new Processor and
>>>>>>>>>>>>>>> ProcessorContext interfaces, then this is the time to make
>>>>>>>>>>>>>>> such a change.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> To be unambiguous, I think this is what we're talking
>>>>>> about:
>>>>>>>>>>>>>>> ProcessorContext:
>>>>>>>>>>>>>>> * applicationId
>>>>>>>>>>>>>>> * taskId
>>>>>>>>>>>>>>> * appConfigs
>>>>>>>>>>>>>>> * appConfigsWithPrefix
>>>>>>>>>>>>>>> * keySerde
>>>>>>>>>>>>>>> * valueSerde
>>>>>>>>>>>>>>> * stateDir
>>>>>>>>>>>>>>> * metrics
>>>>>>>>>>>>>>> * schedule
>>>>>>>>>>>>>>> * commit
>>>>>>>>>>>>>>> * forward
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> StateStoreContext:
>>>>>>>>>>>>>>> * applicationId
>>>>>>>>>>>>>>> * taskId
>>>>>>>>>>>>>>> * appConfigs
>>>>>>>>>>>>>>> * appConfigsWithPrefix
>>>>>>>>>>>>>>> * keySerde
>>>>>>>>>>>>>>> * valueSerde
>>>>>>>>>>>>>>> * stateDir
>>>>>>>>>>>>>>> * metrics
>>>>>>>>>>>>>>> * register
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> RecordContext
>>>>>>>>>>>>>>> * topic
>>>>>>>>>>>>>>> * partition
>>>>>>>>>>>>>>> * offset
>>>>>>>>>>>>>>> * timestamp
>>>>>>>>>>>>>>> * headers
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Your proposal sounds good to me as-is. Just to cover the
>>>>>>>>>>>>>>> bases, though, I'm wondering if we should push the idea
>>>>>> just
>>>>>>>>>>>>>>> a little farther. Instead of decomposing
>>>>>> key,value,context,
>>>>>>>>>>>>>>> we could just keep them all in one object, like this:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Record:
>>>>>>>>>>>>>>> * key
>>>>>>>>>>>>>>> * value
>>>>>>>>>>>>>>> * topic
>>>>>>>>>>>>>>> * partition
>>>>>>>>>>>>>>> * offset
>>>>>>>>>>>>>>> * timestamp
>>>>>>>>>>>>>>> * headers
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Then, we could have:
>>>>>>>>>>>>>>> Processor#process(Record)
>>>>>>>>>>>>>>> ProcessorContext#forward(Record, To)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Viewed from this perspective, a record has three
>>>>>> properties
>>>>>>>>>>>>>>> that people may specify in their processors: key, value,
>>>>>> and
>>>>>>>>>>>>>>> timestamp.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> We could deprecate `To#withTimestamp` and enable people to
>>>>>>>>>>>>>>> specify the timestamp along with the key and value when
>>>>>> they
>>>>>>>>>>>>>>> forward a record.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> E.g.,
>>>>>>>>>>>>>>> RecordBuilder toForward = RecordBuilder.copy(record)
>>>>>>>>>>>>>>> toForward.withKey(newKey)
>>>>>>>>>>>>>>> toForward.withValue(newValue)
>>>>>>>>>>>>>>> toForward.withTimestamp(newTimestamp)
>>>>>>>>>>>>>>> Record newRecord = toForward.build()
>>>>>>>>>>>>>>> context.forward(newRecord, To.child("child1"))
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Or, the more compact common case:
>>>>>>>>>>>>>>> current:
>>>>>>>>>>>>>>>  context.forward(key, "newValue")
>>>>>>>>>>>>>>> proposed:
>>>>>>>>>>>>>>>
>>>>>> context.forward(copy(record).withValue("newValue").build())
>>>>>>>>>>>>>>> It's slightly more verbose, but also more extensible. This
>>>>>>>>>>>>>>> would give us a clean path to add header support in PAPI
>>>>>> as
>>>>>>>>>>>>>>> well, simply by adding `withHeaders` in RecordBuilder.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> It's also more symmetrical, since the recipient of
>>>>>> `forward`
>>>>>>>>>>>>>>> would just get the sent `Record`. Whereas today, the
>>>>>> sender
>>>>>>>>>>>>>>> puts the timestamp in `To`, but the recipient gets in in
>>>>>> its
>>>>>>>>>>>>>>> own `ProcessorContext`.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> WDYT?
>>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, 2020-09-11 at 12:30 -0700, Matthias J. Sax wrote:
>>>>>>>>>>>>>>>> I think separating the different contexts make sense.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> In fact, we could even go one step further and remove
>>>>>> the
>>>>>>>>>> record
>>>>>>>>>>>>> context
>>>>>>>>>>>>>>>> from the processor context completely and we add a third
>>>>>>>>>> parameter to
>>>>>>>>>>>>>>>> `process(key, value, recordContext)`. This would make it
>>>>>>>> clear
>>>>>>>>>> that
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> context is for the input record only and it's not
>>>>>>>> possible to
>>>>>>>>>> pass
>>>>>>>>>>>>> it to
>>>>>>>>>>>>>>>> a `punctuate` callback.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> For the stores and changelogging: I think there are two
>>>>>>>> cases.
>>>>>>>>>> (1)
>>>>>>>>>>>>> You
>>>>>>>>>>>>>>>> use a plain key-value store. For this case, it seems you
>>>>>>>> do
>>>>>>>>>> not care
>>>>>>>>>>>>>>>> about the timestamp and thus does not care what
>>>>>> timestamp
>>>>>>>> is
>>>>>>>>>> set in
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> changelog records. (We can set anything we want, as it's
>>>>>>>> not
>>>>>>>>>>>>> relevant at
>>>>>>>>>>>>>>>> all -- the timestamp is ignored on read anyway.) (2) The
>>>>>>>> other
>>>>>>>>>> case
>>>>>>>>>>>>> is,
>>>>>>>>>>>>>>>> that one does care about timestamps, and for this case
>>>>>>>> should
>>>>>>>>>> use
>>>>>>>>>>>>>>>> TimestampedKeyValueStore. The passed timestamp will be
>>>>>>>> set on
>>>>>>>>>> the
>>>>>>>>>>>>>>>> changelog records for this case.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thus, for both cases, accessing the record context does
>>>>>>>> not
>>>>>>>>>> seems to
>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>> a requirement. And providing access to the processor
>>>>>>>> context
>>>>>>>>>> to, eg.,
>>>>>>>>>>>>>>>> `forward()` or similar seems safe.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On 9/10/20 7:25 PM, John Roesler wrote:
>>>>>>>>>>>>>>>>> Thanks for the reply, Paul!
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I certainly intend to make sure that the changelogging
>>>>>>>> layer
>>>>>>>>>>>>>>>>> continues to work the way it does now, by hook or by
>>>>>>>> crook.
>>>>>>>>>>>>>>>>> I think the easiest path for me is to just "cheat" and
>>>>>>>> get
>>>>>>>>>>>>>>>>> the real ProcessorContext into the ChangeLoggingStore
>>>>>>>>>>>>>>>>> implementation somehow. I'll tag you on the PR when I
>>>>>>>> create
>>>>>>>>>>>>>>>>> it, so you have an opportunity to express a preference
>>>>>>>> about
>>>>>>>>>>>>>>>>> the implementation choice, and maybe even compile/test
>>>>>>>>>>>>>>>>> against it to make sure your stuff still works.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Regarding this:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> we have an interest in making a state store with a
>>>>>>>> richer
>>>>>>>>>>>>>>>>>> way of querying its data (like perhaps getting all
>>>>>>>> values
>>>>>>>>>>>>>>>>>> associated with a secondary key), while still
>>>>>>>> ultimately
>>>>>>>>>>>>>>>>>> writing to the changelog topic for later
>>>>>> restoration.
>>>>>>>>>>>>>>>>> This is very intriguing to me. On the side, I've been
>>>>>>>>>>>>>>>>> preparing a couple of ideas related to this topic. I
>>>>>>>> don't
>>>>>>>>>>>>>>>>> think I have a coherent enough thought to even express
>>>>>>>> it in
>>>>>>>>>>>>>>>>> a Jira right now, but when I do, I'll tag you on it
>>>>>>>> also to
>>>>>>>>>>>>>>>>> see what you think.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Whenever you're ready to share the usability
>>>>>> improvement
>>>>>>>>>>>>>>>>> ideas, I'm very interested to see what you've come up
>>>>>>>> with.
>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Thu, 2020-09-10 at 21:02 -0500, Paul Whalen wrote:
>>>>>>>>>>>>>>>>>>> when you use a HashMap or RocksDB or other "state
>>>>>>>>>> stores", you
>>>>>>>>>>>>> don't
>>>>>>>>>>>>>>>>>>> expect them to automatically know extra stuff
>>>>>> about
>>>>>>>> the
>>>>>>>>>> record
>>>>>>>>>>>>> you're
>>>>>>>>>>>>>>>>>>> storing.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> So, I don't think there is any reason we *can't*
>>>>>>>> retain the
>>>>>>>>>>>>> record context
>>>>>>>>>>>>>>>>>>> in the StateStoreContext, and if any users came
>>>>>>>> along
>>>>>>>>>> with a
>>>>>>>>>>>>> clear use case
>>>>>>>>>>>>>>>>>>> I'd find that convincing.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I agree with the principle of being conservative
>>>>>> with
>>>>>>>> the
>>>>>>>>>>>>> StateStoreContext
>>>>>>>>>>>>>>>>>> API.  Regarding user expectations or a clear use
>>>>>>>> case, the
>>>>>>>>>> only
>>>>>>>>>>>>>>>>>> counterpoint I would offer is that we sort of have
>>>>>>>> that
>>>>>>>>>> use case
>>>>>>>>>>>>> already,
>>>>>>>>>>>>>>>>>> which is the example I gave of the change logging
>>>>>>>> store
>>>>>>>>>> using the
>>>>>>>>>>>>>>>>>> timestamp.  I am curious if this functionality will
>>>>>> be
>>>>>>>>>> retained
>>>>>>>>>>>>> when using
>>>>>>>>>>>>>>>>>> built in state stores, or will a low-level processor
>>>>>>>> get a
>>>>>>>>>>>>> KeyValueStore
>>>>>>>>>>>>>>>>>> that no longer writes to the changelog topic with
>>>>>> the
>>>>>>>>>> record's
>>>>>>>>>>>>> timestamp.
>>>>>>>>>>>>>>>>>> While I personally don't care much about that
>>>>>>>> functionality
>>>>>>>>>>>>> specifically, I
>>>>>>>>>>>>>>>>>> have a general desire for custom state stores to
>>>>>>>> easily do
>>>>>>>>>> the
>>>>>>>>>>>>> things that
>>>>>>>>>>>>>>>>>> built in state stores do.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> It genuinely did not occur to me that users might be
>>>>>>>>>> looking up
>>>>>>>>>>>>> and/or
>>>>>>>>>>>>>>>>>>> updating records of other keys from within a
>>>>>>>> Processor.
>>>>>>>>>>>>>>>>>> I'm glad you said this Sophie, because it gives me
>>>>>> an
>>>>>>>>>>>>> opportunity to say
>>>>>>>>>>>>>>>>>> that this is actually a *huge* use case for my team.
>>>>>>>> The
>>>>>>>>>> state
>>>>>>>>>>>>> store
>>>>>>>>>>>>>>>>>> usability improvements I was referring to in my
>>>>>>>> previous
>>>>>>>>>> message
>>>>>>>>>>>>> were about
>>>>>>>>>>>>>>>>>> enabling the user to write custom stores while still
>>>>>>>> easily
>>>>>>>>>>>>> hooking into
>>>>>>>>>>>>>>>>>> the ability to write to a changelog topic.  I think
>>>>>>>> that is
>>>>>>>>>>>>> technically
>>>>>>>>>>>>>>>>>> possible now, but I don't think it's trivial.
>>>>>>>>>> Specifically, we
>>>>>>>>>>>>> have an
>>>>>>>>>>>>>>>>>> interest in making a state store with a richer way
>>>>>> of
>>>>>>>>>> querying
>>>>>>>>>>>>> its data
>>>>>>>>>>>>>>>>>> (like perhaps getting all values associated with a
>>>>>>>>>> secondary
>>>>>>>>>>>>> key), while
>>>>>>>>>>>>>>>>>> still ultimately writing to the changelog topic for
>>>>>>>> later
>>>>>>>>>>>>> restoration.
>>>>>>>>>>>>>>>>>> We recognize that this use case throws away some of
>>>>>>>> what
>>>>>>>>>> kafka
>>>>>>>>>>>>> streams
>>>>>>>>>>>>>>>>>> (especially the DSL) is good at - easy
>>>>>>>> parallelizability by
>>>>>>>>>>>>> partitioning
>>>>>>>>>>>>>>>>>> all processing by key - and that our business logic
>>>>>>>> would
>>>>>>>>>>>>> completely fall
>>>>>>>>>>>>>>>>>> apart if we were consuming from multi-partition
>>>>>>>> topics with
>>>>>>>>>>>>> multiple
>>>>>>>>>>>>>>>>>> consumers.  But we have found that using the low
>>>>>> level
>>>>>>>>>> processor
>>>>>>>>>>>>> API is
>>>>>>>>>>>>>>>>>> good for the very simple stream processing
>>>>>> primitives
>>>>>>>> it
>>>>>>>>>>>>> provides: handling
>>>>>>>>>>>>>>>>>> the plumbing of consuming from multiple kafka topics
>>>>>>>> and
>>>>>>>>>>>>> potentially
>>>>>>>>>>>>>>>>>> updating persistent local state in a reliable way.
>>>>>>>> That in
>>>>>>>>>>>>> itself has
>>>>>>>>>>>>>>>>>> proven to be a worthwhile programming model.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Since I got off track a bit, let me summarize: I
>>>>>> don't
>>>>>>>>>>>>> particularly care
>>>>>>>>>>>>>>>>>> about the record context being available to state
>>>>>>>> store
>>>>>>>>>>>>> implementations,
>>>>>>>>>>>>>>>>>> and I think this KIP is headed in the right
>>>>>> direction
>>>>>>>> in
>>>>>>>>>> that
>>>>>>>>>>>>> regard.  But
>>>>>>>>>>>>>>>>>> more generally, I wanted to express the importance
>>>>>> of
>>>>>>>>>>>>> maintaining a
>>>>>>>>>>>>>>>>>> powerful and flexible StateStore interface.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>>>>>> Paul
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Thu, Sep 10, 2020 at 6:11 PM Sophie Blee-Goldman
>>>>>> <
>>>>>>>>>>>>> sop...@confluent.io>
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Aha, I did misinterpret the example in your
>>>>>> previous
>>>>>>>>>> response
>>>>>>>>>>>>> regarding the
>>>>>>>>>>>>>>>>>>> range query after all. I thought you just meant a
>>>>>>>>>> time-range
>>>>>>>>>>>>> query inside a
>>>>>>>>>>>>>>>>>>> punctuator. It genuinely did not occur to me that
>>>>>>>> users
>>>>>>>>>> might
>>>>>>>>>>>>> be looking up
>>>>>>>>>>>>>>>>>>> and/or updating records of other keys from within
>>>>>> a
>>>>>>>>>> Processor.
>>>>>>>>>>>>> Sorry for
>>>>>>>>>>>>>>>>>>> being closed minded
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I won't drag out this discussion any further by
>>>>>>>> asking
>>>>>>>>>> whether
>>>>>>>>>>>>> that might
>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>> a valid use case or just a lurking bug in itself
>>>>>> :)
>>>>>>>>>>>>>>>>>>> Thanks for humoring me. The current proposal for
>>>>>>>> KIP-478
>>>>>>>>>>>>> sounds good to me
>>>>>>>>>>>>>>>>>>> On Thu, Sep 10, 2020 at 3:43 PM John Roesler <
>>>>>>>>>>>>> vvcep...@apache.org> wrote:
>>>>>>>>>>>>>>>>>>>> Ah, thanks Sophie,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I'm sorry for misinterpreting your resonse. Yes,
>>>>>>>> we
>>>>>>>>>>>>>>>>>>>> absolutely can and should clear the context
>>>>>> before
>>>>>>>>>>>>>>>>>>>> punctuating.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> My secondary concern is maybe more far-fetched.
>>>>>> I
>>>>>>>> was
>>>>>>>>>>>>>>>>>>>> thinking that inside process(key,value), a
>>>>>>>> Processor
>>>>>>>>>> might
>>>>>>>>>>>>>>>>>>>> do a get/put of a _different_ key. Consider, for
>>>>>>>>>> example,
>>>>>>>>>>>>>>>>>>>> the way that Suppress processors work. When they
>>>>>>>> get a
>>>>>>>>>>>>>>>>>>>> record, they add it to the store and then do a
>>>>>>>> range
>>>>>>>>>> scan
>>>>>>>>>>>>>>>>>>>> and possibly forward a _different_ record. Of
>>>>>>>> course,
>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>> is an operation that is deeply coupled to the
>>>>>>>>>> internals, and
>>>>>>>>>>>>>>>>>>>> the Suppress processor accordingly actually does
>>>>>>>> get
>>>>>>>>>> access
>>>>>>>>>>>>>>>>>>>> to the internal context so that it can set the
>>>>>>>> context
>>>>>>>>>>>>>>>>>>>> before forwarding.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Still, it seems like I've had a handful of
>>>>>>>>>> conversations
>>>>>>>>>>>>>>>>>>>> with people over the years in which they tell me
>>>>>>>> they
>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>> using state stores in a way that transcends the
>>>>>>>> "get
>>>>>>>>>> and put
>>>>>>>>>>>>>>>>>>>> the currently processing record" access
>>>>>> pattern. I
>>>>>>>>>> doubt
>>>>>>>>>>>>>>>>>>>> that those folks would even have considered the
>>>>>>>>>> possiblity
>>>>>>>>>>>>>>>>>>>> that the currently processing record's _context_
>>>>>>>> could
>>>>>>>>>>>>>>>>>>>> pollute their state store operations, as I
>>>>>> myself
>>>>>>>>>> never gave
>>>>>>>>>>>>>>>>>>>> it a second thought until the current
>>>>>> conversation
>>>>>>>>>> began. In
>>>>>>>>>>>>>>>>>>>> cases like that, we have actually set a trap for
>>>>>>>> these
>>>>>>>>>>>>>>>>>>>> people, and it seems better to dismantle the
>>>>>> trap.
>>>>>>>>>>>>>>>>>>>> As you noted, really the only people who would
>>>>>> be
>>>>>>>>>> negatively
>>>>>>>>>>>>>>>>>>>> impacted are people who implement their own
>>>>>> state
>>>>>>>>>> stores.
>>>>>>>>>>>>>>>>>>>> These folks will get the deprecation warning and
>>>>>>>> try to
>>>>>>>>>>>>>>>>>>>> adapt their stores to the new interface. If they
>>>>>>>> needed
>>>>>>>>>>>>>>>>>>>> access to the record context, they would find
>>>>>>>> it's now
>>>>>>>>>>>>>>>>>>>> missing. They'd ask us about it, and we'd have
>>>>>> the
>>>>>>>>>> ability
>>>>>>>>>>>>>>>>>>>> to explain the lurking bug that they have had in
>>>>>>>> their
>>>>>>>>>>>>>>>>>>>> stores all along, as well as the new recommended
>>>>>>>>>> pattern
>>>>>>>>>>>>>>>>>>>> (just pass everything you need in the value). If
>>>>>>>> that's
>>>>>>>>>>>>>>>>>>>> unsatisfying, _then_ we should consider amending
>>>>>>>> the
>>>>>>>>>> API.
>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Thu, 2020-09-10 at 15:21 -0700, Sophie
>>>>>>>> Blee-Goldman
>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>> Regarding your first sentence, "...the
>>>>>>>> processor
>>>>>>>>>> would
>>>>>>>>>>>>> null
>>>>>>>>>>>>>>>>>>>>>> out the record context...", this is not
>>>>>>>> possible,
>>>>>>>>>> since
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> processor doesn't have write access to the
>>>>>>>>>> context. We
>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>>>>>>>> add it,
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Sorry, this was poorly phrased, I definitely
>>>>>>>> did not
>>>>>>>>>> mean
>>>>>>>>>>>>> to imply that
>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>> should make the context modifiable by the
>>>>>>>> Processors
>>>>>>>>>>>>> themselves. I
>>>>>>>>>>>>>>>>>>> meant
>>>>>>>>>>>>>>>>>>>>> this should be handled by the internal
>>>>>>>> processing
>>>>>>>>>>>>> framework that deals
>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>> passing records from one Processor to the
>>>>>> next,
>>>>>>>>>> setting
>>>>>>>>>>>>> the record
>>>>>>>>>>>>>>>>>>>> context
>>>>>>>>>>>>>>>>>>>>> when a new record is picked up, invoking the
>>>>>>>>>> punctuators,
>>>>>>>>>>>>> etc. I
>>>>>>>>>>>>>>>>>>> believe
>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>> all currently happens in the StreamTask? It
>>>>>>>> already
>>>>>>>>>> can
>>>>>>>>>>>>> and does
>>>>>>>>>>>>>>>>>>>> overwrite
>>>>>>>>>>>>>>>>>>>>> the record context as new records are
>>>>>>>> processed, and
>>>>>>>>>> is
>>>>>>>>>>>>> also
>>>>>>>>>>>>>>>>>>> responsible
>>>>>>>>>>>>>>>>>>>>> for calling the punctuators, so it doesn't
>>>>>> seem
>>>>>>>> like
>>>>>>>>>> a
>>>>>>>>>>>>> huge leap to
>>>>>>>>>>>>>>>>>>> just
>>>>>>>>>>>>>>>>>>>> say
>>>>>>>>>>>>>>>>>>>>> "null out the current record before
>>>>>> punctuating"
>>>>>>>>>>>>>>>>>>>>> To clarify, I was never advocating or even
>>>>>>>>>> considering to
>>>>>>>>>>>>> give the
>>>>>>>>>>>>>>>>>>>>> Processors
>>>>>>>>>>>>>>>>>>>>> write access to the record context. Sorry if
>>>>>> my
>>>>>>>> last
>>>>>>>>>>>>> message (or all of
>>>>>>>>>>>>>>>>>>>>> them)
>>>>>>>>>>>>>>>>>>>>> was misleading. I just wanted to point out
>>>>>> that
>>>>>>>> the
>>>>>>>>>>>>> punctuator concern
>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>> orthogonal to the question of whether we
>>>>>> should
>>>>>>>>>> include
>>>>>>>>>>>>> the record
>>>>>>>>>>>>>>>>>>>> context
>>>>>>>>>>>>>>>>>>>>> in the StateStoreContext. It's definitely a
>>>>>> real
>>>>>>>>>> problem,
>>>>>>>>>>>>> but it's a
>>>>>>>>>>>>>>>>>>>>> problem
>>>>>>>>>>>>>>>>>>>>> that exists at the Processor level and not
>>>>>> just
>>>>>>>> the
>>>>>>>>>>>>> StateStore.
>>>>>>>>>>>>>>>>>>>>> So, I don't think there is any reason we
>>>>>> *can't*
>>>>>>>>>> retain
>>>>>>>>>>>>> the record
>>>>>>>>>>>>>>>>>>>> context
>>>>>>>>>>>>>>>>>>>>> in the
>>>>>>>>>>>>>>>>>>>>> StateStoreContext, and if any users came along
>>>>>>>> with a
>>>>>>>>>>>>> clear use case
>>>>>>>>>>>>>>>>>>> I'd
>>>>>>>>>>>>>>>>>>>>> find
>>>>>>>>>>>>>>>>>>>>> that convincing. In the absence of any
>>>>>>>> examples, the
>>>>>>>>>>>>> conservative
>>>>>>>>>>>>>>>>>>>> approach
>>>>>>>>>>>>>>>>>>>>> sounds good to me.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> If it turns out that someone did need the
>>>>>> record
>>>>>>>>>> context
>>>>>>>>>>>>> in their
>>>>>>>>>>>>>>>>>>> custom
>>>>>>>>>>>>>>>>>>>>> state
>>>>>>>>>>>>>>>>>>>>> store, I'm sure they'll submit a politely
>>>>>>>> worded bug
>>>>>>>>>>>>> report alerting us
>>>>>>>>>>>>>>>>>>>>> that we
>>>>>>>>>>>>>>>>>>>>> broke their application.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Thu, Sep 10, 2020 at 3:05 PM John Roesler <
>>>>>>>>>>>>> vvcep...@apache.org>
>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>> Thanks, Sophie,
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Yes, now that you point it out, I can see
>>>>>>>> that the
>>>>>>>>>> record
>>>>>>>>>>>>>>>>>>>>>> context itself should be nulled out by
>>>>>> Streams
>>>>>>>>>> before
>>>>>>>>>>>>>>>>>>>>>> invoking punctuators. From that perspective,
>>>>>>>> we
>>>>>>>>>> don't
>>>>>>>>>>>>> need
>>>>>>>>>>>>>>>>>>>>>> to think about the second-order problem of
>>>>>>>> what's
>>>>>>>>>> in the
>>>>>>>>>>>>>>>>>>>>>> context for the state store when called
>>>>>> from a
>>>>>>>>>>>>> punctuator.
>>>>>>>>>>>>>>>>>>>>>> Regarding your first sentence, "...the
>>>>>>>> processor
>>>>>>>>>> would
>>>>>>>>>>>>> null
>>>>>>>>>>>>>>>>>>>>>> out the record context...", this is not
>>>>>>>> possible,
>>>>>>>>>> since
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> processor doesn't have write access to the
>>>>>>>>>> context. We
>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>>>>>>>> add it, but then all kinds of strange
>>>>>> effects
>>>>>>>>>> would ensue
>>>>>>>>>>>>>>>>>>>>>> when downstream processors execute but the
>>>>>>>> context
>>>>>>>>>> is
>>>>>>>>>>>>> empty,
>>>>>>>>>>>>>>>>>>>>>> etc. Better to just let the framework manage
>>>>>>>> the
>>>>>>>>>> record
>>>>>>>>>>>>>>>>>>>>>> context and keep it read-only for
>>>>>> Processors.
>>>>>>>>>>>>>>>>>>>>>> Reading between the lines of your last
>>>>>> reply,
>>>>>>>> it
>>>>>>>>>> sounds
>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>> the disconnect may just have been a mutual
>>>>>>>>>>>>> misunderstanding
>>>>>>>>>>>>>>>>>>>>>> about whether or not Processors currently
>>>>>> have
>>>>>>>>>> access to
>>>>>>>>>>>>> set
>>>>>>>>>>>>>>>>>>>>>> the record context. Since they do not, if we
>>>>>>>>>> wanted to
>>>>>>>>>>>>> add
>>>>>>>>>>>>>>>>>>>>>> the record context to StateStoreContext in a
>>>>>>>>>> well-defined
>>>>>>>>>>>>>>>>>>>>>> way, we'd also have to add the ability for
>>>>>>>>>> Processors to
>>>>>>>>>>>>>>>>>>>>>> manipulate it. But then, we're just
>>>>>> creating a
>>>>>>>>>>>>> side-channel
>>>>>>>>>>>>>>>>>>>>>> for Processors to pass some information in
>>>>>>>>>> arguments to
>>>>>>>>>>>>>>>>>>>>>> "put()" and other information implicitly
>>>>>>>> through
>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> context. It seems better just to go for a
>>>>>>>> single
>>>>>>>>>> channel
>>>>>>>>>>>>> for
>>>>>>>>>>>>>>>>>>>>>> now.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> It sounds like you're basically in favor of
>>>>>>>> the
>>>>>>>>>>>>> conservative
>>>>>>>>>>>>>>>>>>>>>> approach, and you just wanted to understand
>>>>>>>> the
>>>>>>>>>> blockers
>>>>>>>>>>>>>>>>>>>>>> that I implied. Does my clarification make
>>>>>>>> sense?
>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Thu, 2020-09-10 at 10:54 -0700, Sophie
>>>>>>>>>> Blee-Goldman
>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>> I was just thinking that the processor
>>>>>> would
>>>>>>>>>> null out
>>>>>>>>>>>>> the record
>>>>>>>>>>>>>>>>>>>> context
>>>>>>>>>>>>>>>>>>>>>>> after it
>>>>>>>>>>>>>>>>>>>>>>> finished processing the record, so I'm not
>>>>>>>> sure I
>>>>>>>>>>>>> follow why this
>>>>>>>>>>>>>>>>>>>> would
>>>>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>> possible? AFAIK we never call a punctuator
>>>>>>>> in the
>>>>>>>>>>>>> middle of
>>>>>>>>>>>>>>>>>>>> processing a
>>>>>>>>>>>>>>>>>>>>>>> record through the topology, and even if
>>>>>> we
>>>>>>>> did,
>>>>>>>>>> we
>>>>>>>>>>>>> still know when
>>>>>>>>>>>>>>>>>>>> it is
>>>>>>>>>>>>>>>>>>>>>>> about
>>>>>>>>>>>>>>>>>>>>>>> to be called and could set it to null
>>>>>>>> beforehand.
>>>>>>>>>>>>>>>>>>>>>>> I'm not trying to advocate for it here,
>>>>>> I'm
>>>>>>>> in
>>>>>>>>>>>>> agreement that
>>>>>>>>>>>>>>>>>>>> anything
>>>>>>>>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>>>>>>>> want
>>>>>>>>>>>>>>>>>>>>>>> to access within the store can and should
>>>>>> be
>>>>>>>>>> accessed
>>>>>>>>>>>>> within the
>>>>>>>>>>>>>>>>>>>> calling
>>>>>>>>>>>>>>>>>>>>>>> Processor/Punctuator before reaching the
>>>>>>>> store.
>>>>>>>>>> The
>>>>>>>>>>>>> "we can always
>>>>>>>>>>>>>>>>>>>> add it
>>>>>>>>>>>>>>>>>>>>>>> later if necessary" argument is also
>>>>>> pretty
>>>>>>>>>>>>> convincing. Just trying
>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>> understand
>>>>>>>>>>>>>>>>>>>>>>> why this wouldn't be possible.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> FWIW, the question of "what is the current
>>>>>>>>>> record in
>>>>>>>>>>>>> the context
>>>>>>>>>>>>>>>>>>> of a
>>>>>>>>>>>>>>>>>>>>>>> Punctuator"
>>>>>>>>>>>>>>>>>>>>>>> exists independently of whether we want to
>>>>>>>> add
>>>>>>>>>> this to
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> StateStoreContext
>>>>>>>>>>>>>>>>>>>>>>> or not. The full ProcessorContext,
>>>>>>>> including the
>>>>>>>>>>>>> current record
>>>>>>>>>>>>>>>>>>>> context,
>>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>> already available within a Punctuator, so
>>>>>>>>>> removing the
>>>>>>>>>>>>> current
>>>>>>>>>>>>>>>>>>> record
>>>>>>>>>>>>>>>>>>>>>>> context
>>>>>>>>>>>>>>>>>>>>>>> from the StateStoreContext does not solve
>>>>>>>> the
>>>>>>>>>> problem.
>>>>>>>>>>>>> Users can --
>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>>>>> (see KAFKA-9584 <
>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9584
>>>>>>>>>>>>>>>>>>>> ;;)
>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>> hit
>>>>>>>>>>>>>>>>>>>>>>> such subtle bugs without ever invoking a
>>>>>>>>>> StateStore
>>>>>>>>>>>>>>>>>>>>>>> from their punctuator.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Again, I think I do agree that we should
>>>>>>>> leave
>>>>>>>>>> the
>>>>>>>>>>>>> current record
>>>>>>>>>>>>>>>>>>>> context
>>>>>>>>>>>>>>>>>>>>>>> off of
>>>>>>>>>>>>>>>>>>>>>>> the StateStoreContext, but I don't think
>>>>>> the
>>>>>>>>>>>>> Punctuator argument
>>>>>>>>>>>>>>>>>>>> against
>>>>>>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>>>>> is
>>>>>>>>>>>>>>>>>>>>>>> very convincing. It sounds to me like we
>>>>>>>> need to
>>>>>>>>>>>>> disallow access to
>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>> current
>>>>>>>>>>>>>>>>>>>>>>> record context from within the Punctuator,
>>>>>>>>>> independent
>>>>>>>>>>>>> of anything
>>>>>>>>>>>>>>>>>>>> to do
>>>>>>>>>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>>>>>>>>>> state stores
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> On Thu, Sep 10, 2020 at 7:12 AM John
>>>>>>>> Roesler <
>>>>>>>>>>>>> vvcep...@apache.org>
>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the thoughts, Sophie.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> I agree that the extra information could
>>>>>>>> be
>>>>>>>>>> useful.
>>>>>>>>>>>>> My only
>>>>>>>>>>>>>>>>>>>> concern is
>>>>>>>>>>>>>>>>>>>>>>>> that it doesn’t seem like we can
>>>>>> actually
>>>>>>>>>> supply
>>>>>>>>>>>>> that extra
>>>>>>>>>>>>>>>>>>>> information
>>>>>>>>>>>>>>>>>>>>>>>> correctly. So, then we have a situation
>>>>>>>> where
>>>>>>>>>> the
>>>>>>>>>>>>> system offers
>>>>>>>>>>>>>>>>>>>> useful
>>>>>>>>>>>>>>>>>>>>>> API
>>>>>>>>>>>>>>>>>>>>>>>> calls that are only correct in a narrow
>>>>>>>> range
>>>>>>>>>> of use
>>>>>>>>>>>>> cases.
>>>>>>>>>>>>>>>>>>>> Outside of
>>>>>>>>>>>>>>>>>>>>>>>> those use cases, you get incorrect
>>>>>>>> behavior.
>>>>>>>>>>>>>>>>>>>>>>>> If it were possible to null out the
>>>>>>>> context
>>>>>>>>>> before
>>>>>>>>>>>>> you put a
>>>>>>>>>>>>>>>>>>>> document
>>>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>> which the context doesn’t apply, then
>>>>>> the
>>>>>>>>>> concern
>>>>>>>>>>>>> would be
>>>>>>>>>>>>>>>>>>>> mitigated.
>>>>>>>>>>>>>>>>>>>>>> But
>>>>>>>>>>>>>>>>>>>>>>>> it would still be pretty weird from the
>>>>>>>>>> perspective
>>>>>>>>>>>>> of the store
>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>> sometimes the context is populated and
>>>>>>>> other
>>>>>>>>>> times,
>>>>>>>>>>>>> it’s null.
>>>>>>>>>>>>>>>>>>>>>>>> But that seems moot, since it doesn’t
>>>>>> seem
>>>>>>>>>> possible
>>>>>>>>>>>>> to null out
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>> context. Only the Processor could know
>>>>>>>> whether
>>>>>>>>>> it’s
>>>>>>>>>>>>> about to put
>>>>>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>>>>>>> document
>>>>>>>>>>>>>>>>>>>>>>>> different from the context or not. And
>>>>>> it
>>>>>>>>>> would be
>>>>>>>>>>>>> inappropriate
>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>> offer a
>>>>>>>>>>>>>>>>>>>>>>>> public ProcessorContext api to manage
>>>>>> the
>>>>>>>>>> record
>>>>>>>>>>>>> context.
>>>>>>>>>>>>>>>>>>>>>>>> Ultimately, it still seems like if you
>>>>>>>> want to
>>>>>>>>>> store
>>>>>>>>>>>>> headers, you
>>>>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>>>>>>>> store them explicitly, right? That
>>>>>>>> doesn’t seem
>>>>>>>>>>>>> onerous to me,
>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>>>> kind
>>>>>>>>>>>>>>>>>>>>>>>> of seems better than relying on
>>>>>> undefined
>>>>>>>> or
>>>>>>>>>>>>> asymmetrical
>>>>>>>>>>>>>>>>>>> behavior
>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>> store itself.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Anyway, I’m not saying that we couldn’t
>>>>>>>> solve
>>>>>>>>>> these
>>>>>>>>>>>>> problems.
>>>>>>>>>>>>>>>>>>> Just
>>>>>>>>>>>>>>>>>>>>>> that it
>>>>>>>>>>>>>>>>>>>>>>>> seems a little that we can be
>>>>>>>> conservative and
>>>>>>>>>> avoid
>>>>>>>>>>>>> them for
>>>>>>>>>>>>>>>>>>> now.
>>>>>>>>>>>>>>>>>>>> If
>>>>>>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>>>>>>> turns out we really need to solve them,
>>>>>>>> we can
>>>>>>>>>>>>> always do it
>>>>>>>>>>>>>>>>>>> later.
>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>>> John
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Sep 9, 2020, at 22:46, Sophie
>>>>>>>>>> Blee-Goldman
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>> If you were to call "put" from a
>>>>>>>>>> punctuator, or
>>>>>>>>>>>>> do a
>>>>>>>>>>>>>>>>>>>>>>>>>> `range()` query and then update one
>>>>>> of
>>>>>>>>>> those
>>>>>>>>>>>>> records with
>>>>>>>>>>>>>>>>>>>>>>>>>> `put()`, you'd have a very subtle
>>>>>> bug
>>>>>>>> on
>>>>>>>>>> your
>>>>>>>>>>>>> hands.
>>>>>>>>>>>>>>>>>>>>>>>>> Can you elaborate on this a bit? I
>>>>>> agree
>>>>>>>>>> that the
>>>>>>>>>>>>> punctuator
>>>>>>>>>>>>>>>>>>>> case is
>>>>>>>>>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>>>>>>>>>> obvious exemption to the assumption
>>>>>> that
>>>>>>>>>> store
>>>>>>>>>>>>> invocations
>>>>>>>>>>>>>>>>>>> always
>>>>>>>>>>>>>>>>>>>>>>>>> have a corresponding "current record",
>>>>>>>> but I
>>>>>>>>>> don't
>>>>>>>>>>>>> understand
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>> second example. Are you envisioning a
>>>>>>>>>> scenario
>>>>>>>>>>>>> where the
>>>>>>>>>>>>>>>>>>> #process
>>>>>>>>>>>>>>>>>>>>>>>>> method performs a range query and then
>>>>>>>>>> updates
>>>>>>>>>>>>> records? Or were
>>>>>>>>>>>>>>>>>>>>>>>>> you just giving another example of the
>>>>>>>>>> punctuator
>>>>>>>>>>>>> case?
>>>>>>>>>>>>>>>>>>>>>>>>> I only bring it up because I agree
>>>>>> that
>>>>>>>> the
>>>>>>>>>>>>> current record
>>>>>>>>>>>>>>>>>>>>>> information
>>>>>>>>>>>>>>>>>>>>>>>> could
>>>>>>>>>>>>>>>>>>>>>>>>> still be useful within the context of
>>>>>>>> the
>>>>>>>>>> store.
>>>>>>>>>>>>> As a non-user
>>>>>>>>>>>>>>>>>>> my
>>>>>>>>>>>>>>>>>>>>>> input
>>>>>>>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>>> definitely has limited value, but it
>>>>>>>> just
>>>>>>>>>> isn't
>>>>>>>>>>>>> striking me as
>>>>>>>>>>>>>>>>>>>>>> obvious
>>>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>> we
>>>>>>>>>>>>>>>>>>>>>>>>> should remove access to the current
>>>>>>>> record
>>>>>>>>>> context
>>>>>>>>>>>>> from the
>>>>>>>>>>>>>>>>>>> state
>>>>>>>>>>>>>>>>>>>>>> stores.
>>>>>>>>>>>>>>>>>>>>>>>>> If there is no current record, as in
>>>>>> the
>>>>>>>>>>>>> punctuator case, we
>>>>>>>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>>>>>> just
>>>>>>>>>>>>>>>>>>>>>>>>> set
>>>>>>>>>>>>>>>>>>>>>>>>> the record context to null (or
>>>>>>>>>> Optional.empty,
>>>>>>>>>>>>> etc).
>>>>>>>>>>>>>>>>>>>>>>>>> That said, the put() always has to
>>>>>> come
>>>>>>>> from
>>>>>>>>>>>>> somewhere, and
>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>> somewhere is always going to be
>>>>>> either a
>>>>>>>>>> Processor
>>>>>>>>>>>>> or a
>>>>>>>>>>>>>>>>>>>> Punctuator,
>>>>>>>>>>>>>>>>>>>>>> both
>>>>>>>>>>>>>>>>>>>>>>>>> of which will still have access to the
>>>>>>>> full
>>>>>>>>>>>>> context. So
>>>>>>>>>>>>>>>>>>>> additional
>>>>>>>>>>>>>>>>>>>>>> info
>>>>>>>>>>>>>>>>>>>>>>>>> such as
>>>>>>>>>>>>>>>>>>>>>>>>> the timestamp can and should probably
>>>>>> be
>>>>>>>>>> supplied
>>>>>>>>>>>>> to the store
>>>>>>>>>>>>>>>>>>>> before
>>>>>>>>>>>>>>>>>>>>>>>>> calling put(), rather than looked up
>>>>>> by
>>>>>>>> the
>>>>>>>>>> store.
>>>>>>>>>>>>> But I can
>>>>>>>>>>>>>>>>>>> see
>>>>>>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>>>>>>> other
>>>>>>>>>>>>>>>>>>>>>>>>> things being useful, for example the
>>>>>>>> current
>>>>>>>>>>>>> record's headers.
>>>>>>>>>>>>>>>>>>>> Maybe
>>>>>>>>>>>>>>>>>>>>>>>> if/when
>>>>>>>>>>>>>>>>>>>>>>>>> we add better (or any) support for
>>>>>>>> headers in
>>>>>>>>>>>>> state stores this
>>>>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>>>>>>>> less true.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Of course as John has made clear, it's
>>>>>>>>>> pretty hard
>>>>>>>>>>>>> to judge
>>>>>>>>>>>>>>>>>>>> without
>>>>>>>>>>>>>>>>>>>>>>>>> examples
>>>>>>>>>>>>>>>>>>>>>>>>> and more insight as to what actually
>>>>>>>> goes on
>>>>>>>>>>>>> within a custom
>>>>>>>>>>>>>>>>>>>> state
>>>>>>>>>>>>>>>>>>>>>> store
>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Sep 9, 2020 at 8:07 PM John
>>>>>>>> Roesler <
>>>>>>>>>>>>>>>>>>> vvcep...@apache.org
>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Paul,
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> It's good to hear from you!
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> I'm glad you're in favor of the
>>>>>>>> direction.
>>>>>>>>>>>>> Especially when
>>>>>>>>>>>>>>>>>>>>>>>>>> it comes to public API and usability
>>>>>>>>>> concens, I
>>>>>>>>>>>>> tend to
>>>>>>>>>>>>>>>>>>>>>>>>>> think that "the folks who matter"
>>>>>> are
>>>>>>>>>> actually
>>>>>>>>>>>>> the folks who
>>>>>>>>>>>>>>>>>>>>>>>>>> have to use the APIs to accomplish
>>>>>>>> real
>>>>>>>>>> tasks.
>>>>>>>>>>>>> It can be
>>>>>>>>>>>>>>>>>>>>>>>>>> hard for me to be sure I'm thinking
>>>>>>>>>> clearly from
>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>>> perspective.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Funny story, I also started down
>>>>>> this
>>>>>>>> road
>>>>>>>>>> a
>>>>>>>>>>>>> couple of times
>>>>>>>>>>>>>>>>>>>>>>>>>> already and backed them out before
>>>>>>>> the KIP
>>>>>>>>>>>>> because I was
>>>>>>>>>>>>>>>>>>>>>>>>>> afraid of the scope of the proposal.
>>>>>>>>>>>>> Unfortunately, needing
>>>>>>>>>>>>>>>>>>>>>>>>>> to make a new ProcessorContext kind
>>>>>> of
>>>>>>>>>> forced my
>>>>>>>>>>>>> hand.
>>>>>>>>>>>>>>>>>>>>>>>>>> I see you've called me out about the
>>>>>>>>>>>>> ChangeLogging stores :)
>>>>>>>>>>>>>>>>>>>>>>>>>> In fact, I think these are the
>>>>>>>> main/only
>>>>>>>>>> reason
>>>>>>>>>>>>> that stores
>>>>>>>>>>>>>>>>>>>>>>>>>> might really need to invoke
>>>>>>>> "forward()". My
>>>>>>>>>>>>> secret plan was
>>>>>>>>>>>>>>>>>>>>>>>>>> to cheat and either accomplish
>>>>>>>>>> change-logging by
>>>>>>>>>>>>> a different
>>>>>>>>>>>>>>>>>>>>>>>>>> mechanism than implementing the
>>>>>> store
>>>>>>>>>> interface,
>>>>>>>>>>>>> or by just
>>>>>>>>>>>>>>>>>>>>>>>>>> breaking encapsulation to sneak the
>>>>>>>> "real"
>>>>>>>>>>>>> ProcessorContext
>>>>>>>>>>>>>>>>>>>>>>>>>> into the ChangeLogging stores. But
>>>>>>>> those
>>>>>>>>>> are all
>>>>>>>>>>>>>>>>>>>>>>>>>> implementation details. I think the
>>>>>>>> key
>>>>>>>>>> question
>>>>>>>>>>>>> is whether
>>>>>>>>>>>>>>>>>>>>>>>>>> anyone else has a store
>>>>>>>> implementation that
>>>>>>>>>>>>> needs to call
>>>>>>>>>>>>>>>>>>>>>>>>>> "forward()". It's not what you
>>>>>>>> mentioned,
>>>>>>>>>> but
>>>>>>>>>>>>> since you
>>>>>>>>>>>>>>>>>>>>>>>>>> spoke up, I'll just ask: if you have
>>>>>>>> a use
>>>>>>>>>> case
>>>>>>>>>>>>> for calling
>>>>>>>>>>>>>>>>>>>>>>>>>> "forward()" in a store, please share
>>>>>>>> it.
>>>>>>>>>>>>>>>>>>>>>>>>>> Regarding the other record-specific
>>>>>>>> context
>>>>>>>>>>>>> methods, I think
>>>>>>>>>>>>>>>>>>>>>>>>>> you have a good point, but I also
>>>>>>>> can't
>>>>>>>>>> quite
>>>>>>>>>>>>> wrap my head
>>>>>>>>>>>>>>>>>>>>>>>>>> around how we can actually guarantee
>>>>>>>> it to
>>>>>>>>>> work
>>>>>>>>>>>>> in general.
>>>>>>>>>>>>>>>>>>>>>>>>>> For example, the case you cited,
>>>>>>>> where the
>>>>>>>>>>>>> implementation of
>>>>>>>>>>>>>>>>>>>>>>>>>> `KeyValueStore#put(key, value)` uses
>>>>>>>> the
>>>>>>>>>> context
>>>>>>>>>>>>> to augment
>>>>>>>>>>>>>>>>>>>>>>>>>> the record with timestamp
>>>>>>>> information. This
>>>>>>>>>>>>> relies on the
>>>>>>>>>>>>>>>>>>>>>>>>>> assumption that you would only call
>>>>>>>>>> "put()" from
>>>>>>>>>>>>> inside a
>>>>>>>>>>>>>>>>>>>>>>>>>> `Processor#process(key, value)` call
>>>>>>>> in
>>>>>>>>>> which
>>>>>>>>>>>>> the record
>>>>>>>>>>>>>>>>>>>>>>>>>> being processed is the same record
>>>>>>>> that
>>>>>>>>>> you're
>>>>>>>>>>>>> trying to put
>>>>>>>>>>>>>>>>>>>>>>>>>> into the store.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> If you were to call "put" from a
>>>>>>>>>> punctuator, or
>>>>>>>>>>>>> do a
>>>>>>>>>>>>>>>>>>>>>>>>>> `range()` query and then update one
>>>>>> of
>>>>>>>>>> those
>>>>>>>>>>>>> records with
>>>>>>>>>>>>>>>>>>>>>>>>>> `put()`, you'd have a very subtle
>>>>>> bug
>>>>>>>> on
>>>>>>>>>> your
>>>>>>>>>>>>> hands. Right
>>>>>>>>>>>>>>>>>>>>>>>>>> now, the Streams component that
>>>>>>>> actually
>>>>>>>>>> calls
>>>>>>>>>>>>> the Processor
>>>>>>>>>>>>>>>>>>>>>>>>>> takes care to set the right record
>>>>>>>> context
>>>>>>>>>>>>> before invoking
>>>>>>>>>>>>>>>>>>>>>>>>>> the method, and in the case of
>>>>>>>> caching,
>>>>>>>>>> etc., it
>>>>>>>>>>>>> also takes
>>>>>>>>>>>>>>>>>>>>>>>>>> care to swap out the old context and
>>>>>>>> keep
>>>>>>>>>> it
>>>>>>>>>>>>> somewhere safe.
>>>>>>>>>>>>>>>>>>>>>>>>>> But when it comes to public API
>>>>>>>> Processors
>>>>>>>>>>>>> calling methods
>>>>>>>>>>>>>>>>>>>>>>>>>> on StateStores, there's no
>>>>>>>> opportunity for
>>>>>>>>>> any
>>>>>>>>>>>>> component to
>>>>>>>>>>>>>>>>>>>>>>>>>> make sure the context is always
>>>>>>>> correct.
>>>>>>>>>>>>>>>>>>>>>>>>>> In the face of that situation, it
>>>>>>>> seemed
>>>>>>>>>> better
>>>>>>>>>>>>> to just move
>>>>>>>>>>>>>>>>>>>>>>>>>> in the direction of a "normal" data
>>>>>>>> store.
>>>>>>>>>> I.e.,
>>>>>>>>>>>>> when you
>>>>>>>>>>>>>>>>>>>>>>>>>> use a HashMap or RocksDB or other
>>>>>>>> "state
>>>>>>>>>>>>> stores", you don't
>>>>>>>>>>>>>>>>>>>>>>>>>> expect them to automatically know
>>>>>>>> extra
>>>>>>>>>> stuff
>>>>>>>>>>>>> about the
>>>>>>>>>>>>>>>>>>>>>>>>>> record you're storing. If you need
>>>>>>>> them to
>>>>>>>>>> know
>>>>>>>>>>>>> something,
>>>>>>>>>>>>>>>>>>>>>>>>>> you just put it in the value.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> All of that said, I'm just reasoning
>>>>>>>> from
>>>>>>>>>> first
>>>>>>>>>>>>> principles
>>>>>>>>>>>>>>>>>>>>>>>>>> here. To really know if this is a
>>>>>>>> mistake
>>>>>>>>>> or
>>>>>>>>>>>>> not, I need to
>>>>>>>>>>>>>>>>>>>>>>>>>> be in your place. So please push
>>>>>> back
>>>>>>>> if
>>>>>>>>>> you
>>>>>>>>>>>>> think what I
>>>>>>>>>>>>>>>>>>>>>>>>>> said is nonsense. My personal plan
>>>>>>>> was to
>>>>>>>>>> keep
>>>>>>>>>>>>> an eye out
>>>>>>>>>>>>>>>>>>>>>>>>>> during the period where the old API
>>>>>>>> was
>>>>>>>>>> still
>>>>>>>>>>>>> present, but
>>>>>>>>>>>>>>>>>>>>>>>>>> deprecated, to see if people were
>>>>>>>>>> struggling to
>>>>>>>>>>>>> use the new
>>>>>>>>>>>>>>>>>>>>>>>>>> API. If so, then we'd have a chance
>>>>>> to
>>>>>>>>>> address
>>>>>>>>>>>>> it before
>>>>>>>>>>>>>>>>>>>>>>>>>> dropping the old API. But it's even
>>>>>>>> better
>>>>>>>>>> if
>>>>>>>>>>>>> you can help
>>>>>>>>>>>>>>>>>>>>>>>>>> think it through now.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> It did also cross my mind to _not_
>>>>>>>> add the
>>>>>>>>>>>>>>>>>>>>>>>>>> StateStoreContext, but just to
>>>>>>>> continue to
>>>>>>>>>> punt
>>>>>>>>>>>>> on the
>>>>>>>>>>>>>>>>>>>>>>>>>> question by just dropping in the new
>>>>>>>>>>>>> ProcessorContext to the
>>>>>>>>>>>>>>>>>>>>>>>>>> new init method. If
>>>>>> StateStoreContext
>>>>>>>>>> seems too
>>>>>>>>>>>>> bold, we can
>>>>>>>>>>>>>>>>>>>>>>>>>> go that direction. But if we
>>>>>> actually
>>>>>>>> add
>>>>>>>>>> some
>>>>>>>>>>>>> methods to
>>>>>>>>>>>>>>>>>>>>>>>>>> StateStoreContext, I'd like to be
>>>>>>>> able to
>>>>>>>>>> ensure
>>>>>>>>>>>>> they would
>>>>>>>>>>>>>>>>>>>>>>>>>> be well defined. I think the current
>>>>>>>>>> situation
>>>>>>>>>>>>> was more of
>>>>>>>>>>>>>>>>>>>>>>>>>> an oversight than a choice.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks again for your reply,
>>>>>>>>>>>>>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, 2020-09-09 at 21:23 -0500,
>>>>>>>> Paul
>>>>>>>>>> Whalen
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>> John,
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> It's exciting to see this KIP head
>>>>>>>> in
>>>>>>>>>> this
>>>>>>>>>>>>> direction!  In
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> last
>>>>>>>>>>>>>>>>>>>>>>>> year
>>>>>>>>>>>>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>>>>>>>>>> so I've tried to sketch out some
>>>>>>>>>> usability
>>>>>>>>>>>>> improvements for
>>>>>>>>>>>>>>>>>>>>>> custom
>>>>>>>>>>>>>>>>>>>>>>>> state
>>>>>>>>>>>>>>>>>>>>>>>>>>> stores, and I also ended up
>>>>>>>> splitting
>>>>>>>>>> out the
>>>>>>>>>>>>>>>>>>>> StateStoreContext
>>>>>>>>>>>>>>>>>>>>>> from
>>>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>> ProcessorContext in an attempt to
>>>>>>>>>> facilitate
>>>>>>>>>>>>> what I was
>>>>>>>>>>>>>>>>>>>> doing.  I
>>>>>>>>>>>>>>>>>>>>>>>> sort of
>>>>>>>>>>>>>>>>>>>>>>>>>>> abandoned it when I realized how
>>>>>>>> large
>>>>>>>>>> the
>>>>>>>>>>>>> ideal change
>>>>>>>>>>>>>>>>>>> might
>>>>>>>>>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>>>>>>> to be,
>>>>>>>>>>>>>>>>>>>>>>>>>>> but it's great to see that there
>>>>>> is
>>>>>>>> other
>>>>>>>>>>>>> interest in
>>>>>>>>>>>>>>>>>>> moving
>>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>>>>> direction (from the folks that
>>>>>>>> matter :)
>>>>>>>>>> ).
>>>>>>>>>>>>>>>>>>>>>>>>>>> Having taken a stab at it myself,
>>>>>> I
>>>>>>>> have
>>>>>>>>>> a
>>>>>>>>>>>>> comment/question
>>>>>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>>>> bullet
>>>>>>>>>>>>>>>>>>>>>>>>>>> about StateStoreContext:
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> It does *not*  include anything
>>>>>>>>>> processor- or
>>>>>>>>>>>>> record-
>>>>>>>>>>>>>>>>>>>> specific,
>>>>>>>>>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>>>>>>>>>>>>>>> `forward()` or any information
>>>>>>>> about
>>>>>>>>>> the
>>>>>>>>>>>>> "current"
>>>>>>>>>>>>>>>>>>> record,
>>>>>>>>>>>>>>>>>>>>>> which is
>>>>>>>>>>>>>>>>>>>>>>>>>> only a
>>>>>>>>>>>>>>>>>>>>>>>>>>>> well-defined in the context of
>>>>>> the
>>>>>>>>>>>>> Processor. Processors
>>>>>>>>>>>>>>>>>>>>>> process
>>>>>>>>>>>>>>>>>>>>>>>> one
>>>>>>>>>>>>>>>>>>>>>>>>>> record
>>>>>>>>>>>>>>>>>>>>>>>>>>>> at a time, but state stores may
>>>>>> be
>>>>>>>>>> used to
>>>>>>>>>>>>> store and
>>>>>>>>>>>>>>>>>>> fetch
>>>>>>>>>>>>>>>>>>>> many
>>>>>>>>>>>>>>>>>>>>>>>>>> records, so
>>>>>>>>>>>>>>>>>>>>>>>>>>>> there is no "current record".
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> I totally agree that
>>>>>>>> record-specific or
>>>>>>>>>>>>> processor-specific
>>>>>>>>>>>>>>>>>>>>>> context
>>>>>>>>>>>>>>>>>>>>>>>> in a
>>>>>>>>>>>>>>>>>>>>>>>>>>> state store is often not
>>>>>>>> well-defined
>>>>>>>>>> and it
>>>>>>>>>>>>> would be good
>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>>>> separate
>>>>>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>>>> out, but sometimes it (at least
>>>>>>>>>>>>> record-specific context) is
>>>>>>>>>>>>>>>>>>>>>> actually
>>>>>>>>>>>>>>>>>>>>>>>>>>> useful, for example, passing the
>>>>>>>> record's
>>>>>>>>>>>>> timestamp through
>>>>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>>>>>>> underlying storage (or changelog
>>>>>>>> topic):
>>>>>> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java#L121
>>>>>>>>>>>>>>>>>>>>>>>>>>> You could have the writer client
>>>>>> of
>>>>>>>> the
>>>>>>>>>> state
>>>>>>>>>>>>> store pass
>>>>>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>>>>>>>>> through,
>>>>>>>>>>>>>>>>>>>>>>>>>> but
>>>>>>>>>>>>>>>>>>>>>>>>>>> it would be nice to be able to
>>>>>> write
>>>>>>>>>> state
>>>>>>>>>>>>> stores where the
>>>>>>>>>>>>>>>>>>>>>> client
>>>>>>>>>>>>>>>>>>>>>>>> did
>>>>>>>>>>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>>>>>>>>>>> have this responsibility.  I'm not
>>>>>>>> sure
>>>>>>>>>> if the
>>>>>>>>>>>>> solution is
>>>>>>>>>>>>>>>>>>>> to add
>>>>>>>>>>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>>>>>>>>>>>>>> things back to StateStoreContext,
>>>>>> or
>>>>>>>>>> make yet
>>>>>>>>>>>>> another
>>>>>>>>>>>>>>>>>>> context
>>>>>>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>>>>>>>>>>>> represents record-specific context
>>>>>>>> while
>>>>>>>>>>>>> inside a state
>>>>>>>>>>>>>>>>>>>> store.
>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>>>>> Paul
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Sep 9, 2020 at 5:43 PM
>>>>>> John
>>>>>>>>>> Roesler <
>>>>>>>>>>>>>>>>>>>> j...@vvcephei.org>
>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello all,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> I've been slowly pushing KIP-478
>>>>>>>>>> forward
>>>>>>>>>>>>> over the last
>>>>>>>>>>>>>>>>>>>> year,
>>>>>>>>>>>>>>>>>>>>>>>>>>>> and I'm happy to say that we're
>>>>>>>> making
>>>>>>>>>> good
>>>>>>>>>>>>> progress now.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> However, several issues with the
>>>>>>>>>> original
>>>>>>>>>>>>> design have
>>>>>>>>>>>>>>>>>>> come
>>>>>>>>>>>>>>>>>>>>>>>>>>>> to light.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> The major changes:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> We discovered that the original
>>>>>>>> plan
>>>>>>>>>> of just
>>>>>>>>>>>>> adding
>>>>>>>>>>>>>>>>>>> generic
>>>>>>>>>>>>>>>>>>>>>>>>>>>> parameters to ProcessorContext
>>>>>>>> was too
>>>>>>>>>>>>> disruptive, so we
>>>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>>>>>>>>> now adding a new
>>>>>>>> api.ProcessorContext.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> That choice forces us to add a
>>>>>> new
>>>>>>>>>>>>> StateStore.init method
>>>>>>>>>>>>>>>>>>>>>>>>>>>> for the new context, but
>>>>>>>>>> ProcessorContext
>>>>>>>>>>>>> really isn't
>>>>>>>>>>>>>>>>>>>> ideal
>>>>>>>>>>>>>>>>>>>>>>>>>>>> for state stores to begin with,
>>>>>>>> so I'm
>>>>>>>>>>>>> proposing a new
>>>>>>>>>>>>>>>>>>>>>>>>>>>> StateStoreContext for this
>>>>>>>> purpose. In
>>>>>>>>>> a
>>>>>>>>>>>>> nutshell, there
>>>>>>>>>>>>>>>>>>>> are
>>>>>>>>>>>>>>>>>>>>>>>>>>>> quite a few methods in
>>>>>>>>>> ProcessorContext that
>>>>>>>>>>>>> actually
>>>>>>>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>>>>>>>>>>>> never be called from inside a
>>>>>>>>>> StateStore.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Also, since there is a new
>>>>>>>>>> ProcessorContext
>>>>>>>>>>>>> interface, we
>>>>>>>>>>>>>>>>>>>>>>>>>>>> need a new MockProcessorContext
>>>>>>>>>>>>> implementation in the
>>>>>>>>>>>>>>>>>>> test-
>>>>>>>>>>>>>>>>>>>>>>>>>>>> utils module.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> The changeset for the KIP
>>>>>>>> document is
>>>>>>>>>> here:
>>>>>> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=14&selectedPageVersions=10
>>>>>>>>>>>>>>>>>>>>>>>>>>>> And the KIP itself is here:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-478+-+Strongly+typed+Processor+API
>>>>>>>>>>>>>>>>>>>>>>>>>>>> If you have any concerns, please
>>>>>>>> let
>>>>>>>>>> me know!
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>>>>>>>> -John
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
> 

Reply via email to