Thanks Paul and Sophie,

Your feedback certainly underscores the need to be explicit
in the javadoc about why that parameter is Optional. Getting
this kind of feedback before the release is exactly the kind
of outcome we hope to get from the KIP process!

Thanks,
-John

On Tue, 2020-09-29 at 22:32 -0500, Paul Whalen wrote:
> John, I totally agree that adding a method to Processor is cumbersome and
> not a good path.  I was imagining maybe a separate interface that could be
> used in the appropriate context, but I don't think that makes too much
> sense - it's just too far away from what Kafka Streams is.  I was
> originally more interested in the "why" Optional than the "how" (I think my
> original reply overplayed the "optional as an argument" concern).  But
> you've convinced me that there is a perfectly legitimate "why".  We should
> make sure that it's clear why it's Optional, but I suppose that goes
> without saying.  It's a nice opportunity to make the API reflect more what
> is actually going on under the hood.
> 
> Thanks!
> Paul
> 
> On Tue, Sep 29, 2020 at 10:05 PM Sophie Blee-Goldman <sop...@confluent.io>
> wrote:
> 
> > FWIW, while I'm really not a fan of Optional in general, I agree that its
> > usage
> > here seems appropriate. Even for those rare software developers who
> > carefully
> > read all the docs several times over, I think it wouldn't be too hard to
> > miss a
> > note about the RecordMetadata possibly being null.
> > 
> > Especially because it's not that obvious why at first glance, and takes a
> > bit of
> > thinking to realize that records originating from a Punctuator wouldn't
> > have a
> > "current record". This  is something that has definitely confused users
> > today.
> > 
> > It's on us to improve the education here -- and an Optional<RecordMetadata>
> > would naturally raise awareness of this subtlety
> > 
> > On Tue, Sep 29, 2020 at 7:40 PM Sophie Blee-Goldman <sop...@confluent.io>
> > wrote:
> > 
> > > Does my reply address your concerns?
> > > 
> > > 
> > > Yes; also, I definitely misread part of the proposal earlier and thought
> > > you had put
> > > the timestamp field in RecordMetadata. Sorry for not giving things a
> > > closer look
> > > before responding! I'm not sure my original message made much sense given
> > > the misunderstanding, but thanks for responding anyway :P
> > > 
> > > Having given the proposal a second pass, I agree, it's very elegant. +1
> > > 
> > > On Tue, Sep 29, 2020 at 6:50 PM John Roesler <vvcep...@apache.org>
> > wrote:
> > > > Thanks for the reply, Sophie,
> > > > 
> > > > I think I may have summarized too much in my prior reply.
> > > > 
> > > > In the currently proposed KIP, any caller of forward() must
> > > > supply a Record, which consists of:
> > > > * key
> > > > * value
> > > > * timestamp
> > > > * headers (with a convenience constructor that sets empty
> > > > headers)
> > > > 
> > > > These aren't what I was referring to as potentially being
> > > > undefined downstream, since thanks to the introduction of
> > > > Record, they are, as you're advocating, required to be
> > > > defined everywhere, even when forwarding from a punctuator.
> > > > 
> > > > So to be clear, the intent of this change is actually to
> > > > _enforce_ that timestamp would never be undefined (which it
> > > > currently can be). Also, since punctuators _are_ going to
> > > > have to "make up" a timestamp going forward, we should note
> > > > that the "punctuate" method currently passes in a good
> > > > timestamp that they can use: for system-time punctuations,
> > > > they receive the current system time, and for stream-time
> > > > punctuations, they get the current stream time.
> > > > 
> > > > The potentially undefined RecordMetadata only contains these
> > > > fields:
> > > > * topic
> > > > * partition
> > > > * offset
> > > > 
> > > > These fields aren't required (or even used) in a Sink, and
> > > > it doesn't seem like they would be important to many
> > > > applications. Furthermore, it doesn't _seem_ like you'd even
> > > > want to set these fields. They seem purely informational and
> > > > only useful in the context when you are actually processing
> > > > a real input record. It doesn't sound like you were asking
> > > > for it, but just to put it on the record, I think if we were
> > > > to require values for the metadata from punctuators, people
> > > > would mostly just make up their own dummy values, to no
> > > > one's benefit.
> > > > 
> > > > I should also note that with the current
> > > > Record/RecordMetadata split, we will have the freedom to
> > > > move fields into the Record class (or even add new fields)
> > > > if we want them to become "data" as opposed to "metadata" in
> > > > the future.
> > > > 
> > > > Thanks for your reply; I was similarly floored when I
> > > > realized the true nature of the current situation. Does my
> > > > reply address your concerns?
> > > > 
> > > > Thanks,
> > > > -John
> > > > 
> > > > On Tue, 2020-09-29 at 18:34 -0700, Sophie Blee-Goldman
> > > > wrote:
> > > > > > However, the record metadata is only defined when the parent
> > forwards
> > > > > > while processing a
> > > > > 
> > > > > real record, not when it calls forward from the punctuator
> > > > > 
> > > > > 
> > > > > Can we take a step back for a second...why wouldn't you be required to
> > > > set
> > > > > the RecordContext
> > > > > yourself when calling forward from a Punctuator? I think I agree with
> > > > Paul
> > > > > here, it seems kind of
> > > > > absurd not to enforce that the RecordContext be present inside the
> > > > > process() method.
> > > > > 
> > > > > The original problem with Punctuators, as I understood it, was that
> > all
> > > > of
> > > > > the RecordContext
> > > > > fields were exposed automatically to both the Processor and any
> > > > Punctuator,
> > > > > due to being
> > > > > direct methods on the ProcessorContext. We can't control which
> > > > > ProcessorContext methods
> > > > > someone will call from with a Punctuator vs from a Processor. The best
> > > > we
> > > > > could do was
> > > > > set these "nonsense" fields to null when inside a Punctuator, or set
> > > > them
> > > > > to some dummy
> > > > > values as you pointed out.
> > > > > 
> > > > > But then you proposed the solution of a separate RecordContext which
> > is
> > > > not
> > > > > attached to the
> > > > > ProcessorContext at all. This seemed to solve the above problem very
> > > > > neatly: we only pass
> > > > > in the RecordContext to the process() method, so we don't have to
> > worry
> > > > > about people trying
> > > > > to access these fields from within a Punctuator. The fields aren't
> > > > > accessible unless they're
> > > > > defined.
> > > > > 
> > > > > So what happens when someone wants to forward something from within a
> > > > > Punctuator? I
> > > > > don't think it's reasonable to let the timestamp field be undefined,
> > > > ever.
> > > > > What if the Punctuator
> > > > > forwards directly to a sink, or directly to some windowing logic. Are
> > we
> > > > > supposed to add
> > > > > handling for the RecordContext == null case to every processor? Or are
> > > > we
> > > > > just going to
> > > > > assume the implicit restriction that users will only forward records
> > > > from a
> > > > > Punctuator to
> > > > > downstream processors that know how to handle and/or set the
> > > > RecordContext
> > > > > if it's
> > > > > undefined. That seems to throw away a lot of the awesome safety added
> > in
> > > > > this KIP
> > > > > 
> > > > > Apologies for the rant. But I feel pretty strongly that allowing to
> > > > forward
> > > > > records from a
> > > > > Punctuator without a defined RecordContext would be asking for
> > trouble.
> > > > > Imo, if you
> > > > > want to forward from a Punctuator, you need to store the info you need
> > > > in
> > > > > order to
> > > > > set the timestamp, or make one up yourself
> > > > > 
> > > > > (the one alternative I can think of here is that maybe we could pass
> > in
> > > > the
> > > > > current
> > > > > partition time, so users can at least put in a reasonable estimate for
> > > > the
> > > > > timestamp
> > > > > that won't cause it to get dropped and won't potentially lurch the
> > > > > streamtime far into
> > > > > the future. This would be similar to what we do in the
> > > > TimestampExtractor)
> > > > > On Tue, Sep 29, 2020 at 6:06 PM John Roesler <vvcep...@apache.org>
> > > > wrote:
> > > > > > Oh, I guess one other thing I should have mentioned is that I’ve
> > > > recently
> > > > > > discovered that in cases where the context is undefined, we
> > currently
> > > > just
> > > > > > fill in dummy values for the context. So there’s a good chance that
> > > > real
> > > > > > applications in use are depending on undefined context without even
> > > > > > realizing it. What I’m hoping to do is just make the situation
> > > > explicit and
> > > > > > get rid of the dummy values.
> > > > > > 
> > > > > > Thanks,
> > > > > > John
> > > > > > 
> > > > > > On Tue, Sep 29, 2020, at 20:01, John Roesler wrote:
> > > > > > > Thanks for the review, Paul!
> > > > > > > 
> > > > > > > I had read some of that debate before. There seems to be some
> > > > subtext
> > > > > > > there, because they advise against using Optional in cases like
> > > > this,
> > > > > > > but there doesn’t seem to be a specific reason why it’s
> > > > inappropriate.
> > > > > > > I got the impression they were just afraid that people would go
> > > > > > > overboard and make everything Optional.
> > > > > > > 
> > > > > > > I could also make two methods, but it seemed like it might be an
> > > > > > > unfortunate way to handle the issue, since Processor is just
> > about a
> > > > > > > Function as-is, but the two-method approach would require people
> > to
> > > > > > > implement both methods.
> > > > > > > 
> > > > > > > To your question, this is something that’s only recently became
> > > > clear
> > > > > > > to me. Imagine you have a parent processor that calls forward both
> > > > from
> > > > > > > process and a punctuator. The child will have process() invoked in
> > > > both
> > > > > > > cases, and won’t be able to distinguish them. However, the record
> > > > > > > metadata is only defined when the parent forwards while
> > processing a
> > > > > > > real record, not when it calls forward from the punctuator.
> > > > > > > 
> > > > > > > This is why I wanted to make the metadata Optional, to advertise
> > > > that
> > > > > > > the metadata might be undefined if any ancestor processor ever
> > calls
> > > > > > > forward from a punctuator. We could remove the Optional and
> > instead
> > > > > > > just document that the argument might be null.
> > > > > > > 
> > > > > > > With that context in place, what’s your take?
> > > > > > > 
> > > > > > > Thanks,
> > > > > > > John
> > > > > > > 
> > > > > > > On Tue, Sep 29, 2020, at 19:09, Paul Whalen wrote:
> > > > > > > > Looks pretty good to me, though the Processor#process(Record,
> > > > > > > > Optional<RecordMetadata>) signature caught my eye.  There's some
> > > > > > debate
> > > > > > > > (
> > > > > > > > 
> > https://stackoverflow.com/questions/31922866/why-should-java-8s-optional-not-be-used-in-arguments
> > > > > > )
> > > > > > > > about whether to use Optionals in arguments, and while that's a
> > > > bit of
> > > > > > a
> > > > > > > > religious debate in the abstract, it did make me wonder whether
> > it
> > > > > > makes
> > > > > > > > sense in this specific case.  When is it actually not present?
> > I
> > > > was
> > > > > > > > under
> > > > > > > > the impression that we should always have access to it in
> > > > process(),
> > > > > > and
> > > > > > > > that the concern about metadata being undefined was about having
> > > > > > access
> > > > > > > > to
> > > > > > > > record metadata in the ProcessorContext held for use inside a
> > > > > > > > Punctuator.
> > > > > > > > 
> > > > > > > > If that's not the case and it is truly optional in process(), is
> > > > there
> > > > > > an
> > > > > > > > opportunity for an alternate interface for the cases when we
> > > > don't get
> > > > > > it,
> > > > > > > > rather than force the branching on implementers of the
> > interface?
> > > > > > > > Apologies if I've missed something, I took a look at the PR and
> > I
> > > > > > didn't
> > > > > > > > see any spots where I thought it would be empty.  Perhaps an
> > > > example
> > > > > > of a
> > > > > > > > Punctuator using (and not using) the new API would clear things
> > > > up.
> > > > > > > > Best,
> > > > > > > > Paul
> > > > > > > > 
> > > > > > > > On Tue, Sep 29, 2020 at 4:10 PM John Roesler <
> > vvcep...@apache.org
> > > > > > wrote:
> > > > > > > > > Hello again, all,
> > > > > > > > > 
> > > > > > > > > Thanks for the latest round of discussion. I've taken the
> > > > > > > > > recent feedback and come up with an updated KIP that seems
> > > > > > > > > actually quite a bit nicer than the prior proposal.
> > > > > > > > > 
> > > > > > > > > The specific diff on the KIP is here:
> > > > > > > > > 
> > > > > > > > > 
> > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=15&selectedPageVersions=14
> > > > > > > > > These changes are implemented in this POC PR:
> > > > > > > > > https://github.com/apache/kafka/pull/9346
> > > > > > > > > 
> > > > > > > > > The basic idea is that, building on the recent conversaion,
> > > > > > > > > we would transition away from the current API where we get
> > > > > > > > > only key/value in the process() method and other "data"
> > > > > > > > > comes in the ProcessorContext along with the "metadata".
> > > > > > > > > 
> > > > > > > > > Instead, we formalize what is "data" and what is "metadata",
> > > > > > > > > and pass it all in to the process method:
> > > > > > > > > Processor#process(Record, Optional<RecordMetadata>)
> > > > > > > > > 
> > > > > > > > > Also, you forward the whole data class instead of mutating
> > > > > > > > > the ProcessorContext fields and also calling forward:
> > > > > > > > > ProcessorContext#forward(Record)
> > > > > > > > > 
> > > > > > > > > The Record class itself ships with methods like
> > > > > > > > > record#withValue(NewV newValue)
> > > > > > > > > that make a shallow copy of the input Record, enabling
> > > > > > > > > Processors to safely handle the record without polluting the
> > > > > > > > > context of their parents and siblings.
> > > > > > > > > 
> > > > > > > > > This proposal has a number of key benefits:
> > > > > > > > > * As we've discovered in KAFKA-9584, it's unsafe to mutate
> > > > > > > > > the Headers via the ProcessorContext. This proposal offers a
> > > > > > > > > way to safely forward changes only to downstream processors.
> > > > > > > > > * The new API has symmetry (each processor's input is the
> > > > > > > > > output of its parent processor)
> > > > > > > > > * The API makes clear that the record metadata isn't always
> > > > > > > > > defined (for example, in a punctuation, there is no current
> > > > > > > > > topic/partition/offset)
> > > > > > > > > * The API enables punctuators to forward well defined
> > > > > > > > > headers downstream, which is currently not possible.
> > > > > > > > > 
> > > > > > > > > Unless their are objections, I'll go ahead and re-finalize
> > > > > > > > > this KIP and update that PR to a mergeable state.
> > > > > > > > > 
> > > > > > > > > Thanks, all,
> > > > > > > > > -John
> > > > > > > > > 
> > > > > > > > > 
> > > > > > > > > On Thu, 2020-09-24 at 09:41 -0700, Matthias J. Sax wrote:
> > > > > > > > > > Interesting proposal. However, I am not totally convinced,
> > > > because
> > > > > > I see
> > > > > > > > > > a fundamental difference between "data" and "metadata".
> > > > > > > > > > 
> > > > > > > > > > Topic/partition/offset are "metadata" in the strong sense
> > and
> > > > they
> > > > > > are
> > > > > > > > > > immutable.
> > > > > > > > > > 
> > > > > > > > > > On the other hand there is "primary" data like key and
> > value,
> > > > as
> > > > > > well as
> > > > > > > > > > "secondary" data like timestamp and headers. The issue seems
> > > > that
> > > > > > we
> > > > > > > > > > treat "secondary data" more like metadata atm?
> > > > > > > > > > 
> > > > > > > > > > Thus, promoting timestamp and headers into a first class
> > > > citizen
> > > > > > roll
> > > > > > > > > > make sense to me (my original proposal about `RecordContext`
> > > > would
> > > > > > still
> > > > > > > > > > fall short with this regard). However, putting both (data
> > and
> > > > > > metadata)
> > > > > > > > > > into a `Record` abstraction might go too far?
> > > > > > > > > > 
> > > > > > > > > > I am also a little bit concerned about `Record.copy()`
> > > > because it
> > > > > > might
> > > > > > > > > > be a trap: Users might assume it does a full deep copy of
> > the
> > > > > > record,
> > > > > > > > > > however, it would not. It would only create a new `Record`
> > > > object
> > > > > > as
> > > > > > > > > > wrapper that points to the same key/value/header objects as
> > > > the
> > > > > > input
> > > > > > > > > > record.
> > > > > > > > > > 
> > > > > > > > > > With the current `context.forward(key, value)` we don't have
> > > > this
> > > > > > "deep
> > > > > > > > > > copy" issue -- it's pretty clear what is happening.
> > > > > > > > > > 
> > > > > > > > > > Instead of `To.all().withTimestamp()` we could also add
> > > > > > > > > > `context.forward(key, value, timestamp)` etc (just wondering
> > > > about
> > > > > > the
> > > > > > > > > > exposition in overload)?
> > > > > > > > > > 
> > > > > > > > > > Also, `Record.withValue` etc sounds odd? Should a record not
> > > > be
> > > > > > > > > > immutable? So, we could have something like
> > > > > > > > > > 
> > > > > > > > > > 
> > `RecordFactory.withKeyValue(...).withTimestamp(...).withHeaders(...).build()`.
> > > > > > > > > > But it looks rather verbose?
> > > > > > > > > > 
> > > > > > > > > > The other question is of course, to what extend to we want
> > to
> > > > keep
> > > > > > the
> > > > > > > > > > distinction between "primary" and "secondary" data? To me,
> > > > it's a
> > > > > > > > > > question of easy of use?
> > > > > > > > > > 
> > > > > > > > > > Just putting all this out to move the discussion forward.
> > > > Don't
> > > > > > have a
> > > > > > > > > > concrete proposal atm.
> > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > > -Matthias
> > > > > > > > > > 
> > > > > > > > > > 
> > > > > > > > > > On 9/14/20 9:24 AM, John Roesler wrote:
> > > > > > > > > > > Thanks for this thought, Matthias!
> > > > > > > > > > > 
> > > > > > > > > > > To be honest, it's bugged me quite a bit that _all_ the
> > > > > > > > > > > record information hasn't been an argument to `process`. I
> > > > > > > > > > > suppose I was trying to be conservative in this proposal,
> > > > > > > > > > > but then again, if we're adding new Processor and
> > > > > > > > > > > ProcessorContext interfaces, then this is the time to make
> > > > > > > > > > > such a change.
> > > > > > > > > > > 
> > > > > > > > > > > To be unambiguous, I think this is what we're talking
> > about:
> > > > > > > > > > > ProcessorContext:
> > > > > > > > > > > * applicationId
> > > > > > > > > > > * taskId
> > > > > > > > > > > * appConfigs
> > > > > > > > > > > * appConfigsWithPrefix
> > > > > > > > > > > * keySerde
> > > > > > > > > > > * valueSerde
> > > > > > > > > > > * stateDir
> > > > > > > > > > > * metrics
> > > > > > > > > > > * schedule
> > > > > > > > > > > * commit
> > > > > > > > > > > * forward
> > > > > > > > > > > 
> > > > > > > > > > > StateStoreContext:
> > > > > > > > > > > * applicationId
> > > > > > > > > > > * taskId
> > > > > > > > > > > * appConfigs
> > > > > > > > > > > * appConfigsWithPrefix
> > > > > > > > > > > * keySerde
> > > > > > > > > > > * valueSerde
> > > > > > > > > > > * stateDir
> > > > > > > > > > > * metrics
> > > > > > > > > > > * register
> > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > > RecordContext
> > > > > > > > > > > * topic
> > > > > > > > > > > * partition
> > > > > > > > > > > * offset
> > > > > > > > > > > * timestamp
> > > > > > > > > > > * headers
> > > > > > > > > > > 
> > > > > > > > > > > 
> > > > > > > > > > > Your proposal sounds good to me as-is. Just to cover the
> > > > > > > > > > > bases, though, I'm wondering if we should push the idea
> > just
> > > > > > > > > > > a little farther. Instead of decomposing
> > key,value,context,
> > > > > > > > > > > we could just keep them all in one object, like this:
> > > > > > > > > > > 
> > > > > > > > > > > Record:
> > > > > > > > > > > * key
> > > > > > > > > > > * value
> > > > > > > > > > > * topic
> > > > > > > > > > > * partition
> > > > > > > > > > > * offset
> > > > > > > > > > > * timestamp
> > > > > > > > > > > * headers
> > > > > > > > > > > 
> > > > > > > > > > > Then, we could have:
> > > > > > > > > > > Processor#process(Record)
> > > > > > > > > > > ProcessorContext#forward(Record, To)
> > > > > > > > > > > 
> > > > > > > > > > > Viewed from this perspective, a record has three
> > properties
> > > > > > > > > > > that people may specify in their processors: key, value,
> > and
> > > > > > > > > > > timestamp.
> > > > > > > > > > > 
> > > > > > > > > > > We could deprecate `To#withTimestamp` and enable people to
> > > > > > > > > > > specify the timestamp along with the key and value when
> > they
> > > > > > > > > > > forward a record.
> > > > > > > > > > > 
> > > > > > > > > > > E.g.,
> > > > > > > > > > > RecordBuilder toForward = RecordBuilder.copy(record)
> > > > > > > > > > > toForward.withKey(newKey)
> > > > > > > > > > > toForward.withValue(newValue)
> > > > > > > > > > > toForward.withTimestamp(newTimestamp)
> > > > > > > > > > > Record newRecord = toForward.build()
> > > > > > > > > > > context.forward(newRecord, To.child("child1"))
> > > > > > > > > > > 
> > > > > > > > > > > Or, the more compact common case:
> > > > > > > > > > > current:
> > > > > > > > > > >  context.forward(key, "newValue")
> > > > > > > > > > > proposed:
> > > > > > > > > > > 
> > context.forward(copy(record).withValue("newValue").build())
> > > > > > > > > > > 
> > > > > > > > > > > It's slightly more verbose, but also more extensible. This
> > > > > > > > > > > would give us a clean path to add header support in PAPI
> > as
> > > > > > > > > > > well, simply by adding `withHeaders` in RecordBuilder.
> > > > > > > > > > > 
> > > > > > > > > > > It's also more symmetrical, since the recipient of
> > `forward`
> > > > > > > > > > > would just get the sent `Record`. Whereas today, the
> > sender
> > > > > > > > > > > puts the timestamp in `To`, but the recipient gets in in
> > its
> > > > > > > > > > > own `ProcessorContext`.
> > > > > > > > > > > 
> > > > > > > > > > > WDYT?
> > > > > > > > > > > -John
> > > > > > > > > > > 
> > > > > > > > > > > On Fri, 2020-09-11 at 12:30 -0700, Matthias J. Sax wrote:
> > > > > > > > > > > > I think separating the different contexts make sense.
> > > > > > > > > > > > 
> > > > > > > > > > > > In fact, we could even go one step further and remove
> > the
> > > > > > record
> > > > > > > > > context
> > > > > > > > > > > > from the processor context completely and we add a third
> > > > > > parameter to
> > > > > > > > > > > > `process(key, value, recordContext)`. This would make it
> > > > clear
> > > > > > that
> > > > > > > > > the
> > > > > > > > > > > > context is for the input record only and it's not
> > > > possible to
> > > > > > pass
> > > > > > > > > it to
> > > > > > > > > > > > a `punctuate` callback.
> > > > > > > > > > > > 
> > > > > > > > > > > > For the stores and changelogging: I think there are two
> > > > cases.
> > > > > > (1)
> > > > > > > > > You
> > > > > > > > > > > > use a plain key-value store. For this case, it seems you
> > > > do
> > > > > > not care
> > > > > > > > > > > > about the timestamp and thus does not care what
> > timestamp
> > > > is
> > > > > > set in
> > > > > > > > > the
> > > > > > > > > > > > changelog records. (We can set anything we want, as it's
> > > > not
> > > > > > > > > relevant at
> > > > > > > > > > > > all -- the timestamp is ignored on read anyway.) (2) The
> > > > other
> > > > > > case
> > > > > > > > > is,
> > > > > > > > > > > > that one does care about timestamps, and for this case
> > > > should
> > > > > > use
> > > > > > > > > > > > TimestampedKeyValueStore. The passed timestamp will be
> > > > set on
> > > > > > the
> > > > > > > > > > > > changelog records for this case.
> > > > > > > > > > > > 
> > > > > > > > > > > > Thus, for both cases, accessing the record context does
> > > > not
> > > > > > seems to
> > > > > > > > > be
> > > > > > > > > > > > a requirement. And providing access to the processor
> > > > context
> > > > > > to, eg.,
> > > > > > > > > > > > `forward()` or similar seems safe.
> > > > > > > > > > > > 
> > > > > > > > > > > > 
> > > > > > > > > > > > -Matthias
> > > > > > > > > > > > 
> > > > > > > > > > > > On 9/10/20 7:25 PM, John Roesler wrote:
> > > > > > > > > > > > > Thanks for the reply, Paul!
> > > > > > > > > > > > > 
> > > > > > > > > > > > > I certainly intend to make sure that the changelogging
> > > > layer
> > > > > > > > > > > > > continues to work the way it does now, by hook or by
> > > > crook.
> > > > > > > > > > > > > I think the easiest path for me is to just "cheat" and
> > > > get
> > > > > > > > > > > > > the real ProcessorContext into the ChangeLoggingStore
> > > > > > > > > > > > > implementation somehow. I'll tag you on the PR when I
> > > > create
> > > > > > > > > > > > > it, so you have an opportunity to express a preference
> > > > about
> > > > > > > > > > > > > the implementation choice, and maybe even compile/test
> > > > > > > > > > > > > against it to make sure your stuff still works.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Regarding this:
> > > > > > > > > > > > > 
> > > > > > > > > > > > > > we have an interest in making a state store with a
> > > > richer
> > > > > > > > > > > > > > way of querying its data (like perhaps getting all
> > > > values
> > > > > > > > > > > > > > associated with a secondary key), while still
> > > > ultimately
> > > > > > > > > > > > > > writing to the changelog topic for later
> > restoration.
> > > > > > > > > > > > > This is very intriguing to me. On the side, I've been
> > > > > > > > > > > > > preparing a couple of ideas related to this topic. I
> > > > don't
> > > > > > > > > > > > > think I have a coherent enough thought to even express
> > > > it in
> > > > > > > > > > > > > a Jira right now, but when I do, I'll tag you on it
> > > > also to
> > > > > > > > > > > > > see what you think.
> > > > > > > > > > > > > 
> > > > > > > > > > > > > Whenever you're ready to share the usability
> > improvement
> > > > > > > > > > > > > ideas, I'm very interested to see what you've come up
> > > > with.
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > -John
> > > > > > > > > > > > > 
> > > > > > > > > > > > > On Thu, 2020-09-10 at 21:02 -0500, Paul Whalen wrote:
> > > > > > > > > > > > > > > when you use a HashMap or RocksDB or other "state
> > > > > > stores", you
> > > > > > > > > don't
> > > > > > > > > > > > > > > expect them to automatically know extra stuff
> > about
> > > > the
> > > > > > record
> > > > > > > > > you're
> > > > > > > > > > > > > > > storing.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > So, I don't think there is any reason we *can't*
> > > > retain the
> > > > > > > > > record context
> > > > > > > > > > > > > > > in the StateStoreContext, and if any users came
> > > > along
> > > > > > with a
> > > > > > > > > clear use case
> > > > > > > > > > > > > > > I'd find that convincing.
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > I agree with the principle of being conservative
> > with
> > > > the
> > > > > > > > > StateStoreContext
> > > > > > > > > > > > > > API.  Regarding user expectations or a clear use
> > > > case, the
> > > > > > only
> > > > > > > > > > > > > > counterpoint I would offer is that we sort of have
> > > > that
> > > > > > use case
> > > > > > > > > already,
> > > > > > > > > > > > > > which is the example I gave of the change logging
> > > > store
> > > > > > using the
> > > > > > > > > > > > > > timestamp.  I am curious if this functionality will
> > be
> > > > > > retained
> > > > > > > > > when using
> > > > > > > > > > > > > > built in state stores, or will a low-level processor
> > > > get a
> > > > > > > > > KeyValueStore
> > > > > > > > > > > > > > that no longer writes to the changelog topic with
> > the
> > > > > > record's
> > > > > > > > > timestamp.
> > > > > > > > > > > > > > While I personally don't care much about that
> > > > functionality
> > > > > > > > > specifically, I
> > > > > > > > > > > > > > have a general desire for custom state stores to
> > > > easily do
> > > > > > the
> > > > > > > > > things that
> > > > > > > > > > > > > > built in state stores do.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > It genuinely did not occur to me that users might be
> > > > > > looking up
> > > > > > > > > and/or
> > > > > > > > > > > > > > > updating records of other keys from within a
> > > > Processor.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > I'm glad you said this Sophie, because it gives me
> > an
> > > > > > > > > opportunity to say
> > > > > > > > > > > > > > that this is actually a *huge* use case for my team.
> > > > The
> > > > > > state
> > > > > > > > > store
> > > > > > > > > > > > > > usability improvements I was referring to in my
> > > > previous
> > > > > > message
> > > > > > > > > were about
> > > > > > > > > > > > > > enabling the user to write custom stores while still
> > > > easily
> > > > > > > > > hooking into
> > > > > > > > > > > > > > the ability to write to a changelog topic.  I think
> > > > that is
> > > > > > > > > technically
> > > > > > > > > > > > > > possible now, but I don't think it's trivial.
> > > > > > Specifically, we
> > > > > > > > > have an
> > > > > > > > > > > > > > interest in making a state store with a richer way
> > of
> > > > > > querying
> > > > > > > > > its data
> > > > > > > > > > > > > > (like perhaps getting all values associated with a
> > > > > > secondary
> > > > > > > > > key), while
> > > > > > > > > > > > > > still ultimately writing to the changelog topic for
> > > > later
> > > > > > > > > restoration.
> > > > > > > > > > > > > > We recognize that this use case throws away some of
> > > > what
> > > > > > kafka
> > > > > > > > > streams
> > > > > > > > > > > > > > (especially the DSL) is good at - easy
> > > > parallelizability by
> > > > > > > > > partitioning
> > > > > > > > > > > > > > all processing by key - and that our business logic
> > > > would
> > > > > > > > > completely fall
> > > > > > > > > > > > > > apart if we were consuming from multi-partition
> > > > topics with
> > > > > > > > > multiple
> > > > > > > > > > > > > > consumers.  But we have found that using the low
> > level
> > > > > > processor
> > > > > > > > > API is
> > > > > > > > > > > > > > good for the very simple stream processing
> > primitives
> > > > it
> > > > > > > > > provides: handling
> > > > > > > > > > > > > > the plumbing of consuming from multiple kafka topics
> > > > and
> > > > > > > > > potentially
> > > > > > > > > > > > > > updating persistent local state in a reliable way.
> > > > That in
> > > > > > > > > itself has
> > > > > > > > > > > > > > proven to be a worthwhile programming model.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Since I got off track a bit, let me summarize: I
> > don't
> > > > > > > > > particularly care
> > > > > > > > > > > > > > about the record context being available to state
> > > > store
> > > > > > > > > implementations,
> > > > > > > > > > > > > > and I think this KIP is headed in the right
> > direction
> > > > in
> > > > > > that
> > > > > > > > > regard.  But
> > > > > > > > > > > > > > more generally, I wanted to express the importance
> > of
> > > > > > > > > maintaining a
> > > > > > > > > > > > > > powerful and flexible StateStore interface.
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > Thanks!
> > > > > > > > > > > > > > Paul
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > On Thu, Sep 10, 2020 at 6:11 PM Sophie Blee-Goldman
> > <
> > > > > > > > > sop...@confluent.io>
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > Aha, I did misinterpret the example in your
> > previous
> > > > > > response
> > > > > > > > > regarding the
> > > > > > > > > > > > > > > range query after all. I thought you just meant a
> > > > > > time-range
> > > > > > > > > query inside a
> > > > > > > > > > > > > > > punctuator. It genuinely did not occur to me that
> > > > users
> > > > > > might
> > > > > > > > > be looking up
> > > > > > > > > > > > > > > and/or updating records of other keys from within
> > a
> > > > > > Processor.
> > > > > > > > > Sorry for
> > > > > > > > > > > > > > > being closed minded
> > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > I won't drag out this discussion any further by
> > > > asking
> > > > > > whether
> > > > > > > > > that might
> > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > a valid use case or just a lurking bug in itself
> > :)
> > > > > > > > > > > > > > > Thanks for humoring me. The current proposal for
> > > > KIP-478
> > > > > > > > > sounds good to me
> > > > > > > > > > > > > > > On Thu, Sep 10, 2020 at 3:43 PM John Roesler <
> > > > > > > > > vvcep...@apache.org> wrote:
> > > > > > > > > > > > > > > > Ah, thanks Sophie,
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > I'm sorry for misinterpreting your resonse. Yes,
> > > > we
> > > > > > > > > > > > > > > > absolutely can and should clear the context
> > before
> > > > > > > > > > > > > > > > punctuating.
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > My secondary concern is maybe more far-fetched.
> > I
> > > > was
> > > > > > > > > > > > > > > > thinking that inside process(key,value), a
> > > > Processor
> > > > > > might
> > > > > > > > > > > > > > > > do a get/put of a _different_ key. Consider, for
> > > > > > example,
> > > > > > > > > > > > > > > > the way that Suppress processors work. When they
> > > > get a
> > > > > > > > > > > > > > > > record, they add it to the store and then do a
> > > > range
> > > > > > scan
> > > > > > > > > > > > > > > > and possibly forward a _different_ record. Of
> > > > course,
> > > > > > this
> > > > > > > > > > > > > > > > is an operation that is deeply coupled to the
> > > > > > internals, and
> > > > > > > > > > > > > > > > the Suppress processor accordingly actually does
> > > > get
> > > > > > access
> > > > > > > > > > > > > > > > to the internal context so that it can set the
> > > > context
> > > > > > > > > > > > > > > > before forwarding.
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > Still, it seems like I've had a handful of
> > > > > > conversations
> > > > > > > > > > > > > > > > with people over the years in which they tell me
> > > > they
> > > > > > are
> > > > > > > > > > > > > > > > using state stores in a way that transcends the
> > > > "get
> > > > > > and put
> > > > > > > > > > > > > > > > the currently processing record" access
> > pattern. I
> > > > > > doubt
> > > > > > > > > > > > > > > > that those folks would even have considered the
> > > > > > possiblity
> > > > > > > > > > > > > > > > that the currently processing record's _context_
> > > > could
> > > > > > > > > > > > > > > > pollute their state store operations, as I
> > myself
> > > > > > never gave
> > > > > > > > > > > > > > > > it a second thought until the current
> > conversation
> > > > > > began. In
> > > > > > > > > > > > > > > > cases like that, we have actually set a trap for
> > > > these
> > > > > > > > > > > > > > > > people, and it seems better to dismantle the
> > trap.
> > > > > > > > > > > > > > > > As you noted, really the only people who would
> > be
> > > > > > negatively
> > > > > > > > > > > > > > > > impacted are people who implement their own
> > state
> > > > > > stores.
> > > > > > > > > > > > > > > > These folks will get the deprecation warning and
> > > > try to
> > > > > > > > > > > > > > > > adapt their stores to the new interface. If they
> > > > needed
> > > > > > > > > > > > > > > > access to the record context, they would find
> > > > it's now
> > > > > > > > > > > > > > > > missing. They'd ask us about it, and we'd have
> > the
> > > > > > ability
> > > > > > > > > > > > > > > > to explain the lurking bug that they have had in
> > > > their
> > > > > > > > > > > > > > > > stores all along, as well as the new recommended
> > > > > > pattern
> > > > > > > > > > > > > > > > (just pass everything you need in the value). If
> > > > that's
> > > > > > > > > > > > > > > > unsatisfying, _then_ we should consider amending
> > > > the
> > > > > > API.
> > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > -John
> > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > On Thu, 2020-09-10 at 15:21 -0700, Sophie
> > > > Blee-Goldman
> > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > Regarding your first sentence, "...the
> > > > processor
> > > > > > would
> > > > > > > > > null
> > > > > > > > > > > > > > > > > > out the record context...", this is not
> > > > possible,
> > > > > > since
> > > > > > > > > the
> > > > > > > > > > > > > > > > > > processor doesn't have write access to the
> > > > > > context. We
> > > > > > > > > could
> > > > > > > > > > > > > > > > > > add it,
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > Sorry, this was poorly phrased, I definitely
> > > > did not
> > > > > > mean
> > > > > > > > > to imply that
> > > > > > > > > > > > > > > > we
> > > > > > > > > > > > > > > > > should make the context modifiable by the
> > > > Processors
> > > > > > > > > themselves. I
> > > > > > > > > > > > > > > meant
> > > > > > > > > > > > > > > > > this should be handled by the internal
> > > > processing
> > > > > > > > > framework that deals
> > > > > > > > > > > > > > > > with
> > > > > > > > > > > > > > > > > passing records from one Processor to the
> > next,
> > > > > > setting
> > > > > > > > > the record
> > > > > > > > > > > > > > > > context
> > > > > > > > > > > > > > > > > when a new record is picked up, invoking the
> > > > > > punctuators,
> > > > > > > > > etc. I
> > > > > > > > > > > > > > > believe
> > > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > all currently happens in the StreamTask? It
> > > > already
> > > > > > can
> > > > > > > > > and does
> > > > > > > > > > > > > > > > overwrite
> > > > > > > > > > > > > > > > > the record context as new records are
> > > > processed, and
> > > > > > is
> > > > > > > > > also
> > > > > > > > > > > > > > > responsible
> > > > > > > > > > > > > > > > > for calling the punctuators, so it doesn't
> > seem
> > > > like
> > > > > > a
> > > > > > > > > huge leap to
> > > > > > > > > > > > > > > just
> > > > > > > > > > > > > > > > say
> > > > > > > > > > > > > > > > > "null out the current record before
> > punctuating"
> > > > > > > > > > > > > > > > > To clarify, I was never advocating or even
> > > > > > considering to
> > > > > > > > > give the
> > > > > > > > > > > > > > > > > Processors
> > > > > > > > > > > > > > > > > write access to the record context. Sorry if
> > my
> > > > last
> > > > > > > > > message (or all of
> > > > > > > > > > > > > > > > > them)
> > > > > > > > > > > > > > > > > was misleading. I just wanted to point out
> > that
> > > > the
> > > > > > > > > punctuator concern
> > > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > > orthogonal to the question of whether we
> > should
> > > > > > include
> > > > > > > > > the record
> > > > > > > > > > > > > > > > context
> > > > > > > > > > > > > > > > > in the StateStoreContext. It's definitely a
> > real
> > > > > > problem,
> > > > > > > > > but it's a
> > > > > > > > > > > > > > > > > problem
> > > > > > > > > > > > > > > > > that exists at the Processor level and not
> > just
> > > > the
> > > > > > > > > StateStore.
> > > > > > > > > > > > > > > > > So, I don't think there is any reason we
> > *can't*
> > > > > > retain
> > > > > > > > > the record
> > > > > > > > > > > > > > > > context
> > > > > > > > > > > > > > > > > in the
> > > > > > > > > > > > > > > > > StateStoreContext, and if any users came along
> > > > with a
> > > > > > > > > clear use case
> > > > > > > > > > > > > > > I'd
> > > > > > > > > > > > > > > > > find
> > > > > > > > > > > > > > > > > that convincing. In the absence of any
> > > > examples, the
> > > > > > > > > conservative
> > > > > > > > > > > > > > > > approach
> > > > > > > > > > > > > > > > > sounds good to me.
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > If it turns out that someone did need the
> > record
> > > > > > context
> > > > > > > > > in their
> > > > > > > > > > > > > > > custom
> > > > > > > > > > > > > > > > > state
> > > > > > > > > > > > > > > > > store, I'm sure they'll submit a politely
> > > > worded bug
> > > > > > > > > report alerting us
> > > > > > > > > > > > > > > > > that we
> > > > > > > > > > > > > > > > > broke their application.
> > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > On Thu, Sep 10, 2020 at 3:05 PM John Roesler <
> > > > > > > > > vvcep...@apache.org>
> > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > Thanks, Sophie,
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > Yes, now that you point it out, I can see
> > > > that the
> > > > > > record
> > > > > > > > > > > > > > > > > > context itself should be nulled out by
> > Streams
> > > > > > before
> > > > > > > > > > > > > > > > > > invoking punctuators. From that perspective,
> > > > we
> > > > > > don't
> > > > > > > > > need
> > > > > > > > > > > > > > > > > > to think about the second-order problem of
> > > > what's
> > > > > > in the
> > > > > > > > > > > > > > > > > > context for the state store when called
> > from a
> > > > > > > > > punctuator.
> > > > > > > > > > > > > > > > > > Regarding your first sentence, "...the
> > > > processor
> > > > > > would
> > > > > > > > > null
> > > > > > > > > > > > > > > > > > out the record context...", this is not
> > > > possible,
> > > > > > since
> > > > > > > > > the
> > > > > > > > > > > > > > > > > > processor doesn't have write access to the
> > > > > > context. We
> > > > > > > > > could
> > > > > > > > > > > > > > > > > > add it, but then all kinds of strange
> > effects
> > > > > > would ensue
> > > > > > > > > > > > > > > > > > when downstream processors execute but the
> > > > context
> > > > > > is
> > > > > > > > > empty,
> > > > > > > > > > > > > > > > > > etc. Better to just let the framework manage
> > > > the
> > > > > > record
> > > > > > > > > > > > > > > > > > context and keep it read-only for
> > Processors.
> > > > > > > > > > > > > > > > > > Reading between the lines of your last
> > reply,
> > > > it
> > > > > > sounds
> > > > > > > > > that
> > > > > > > > > > > > > > > > > > the disconnect may just have been a mutual
> > > > > > > > > misunderstanding
> > > > > > > > > > > > > > > > > > about whether or not Processors currently
> > have
> > > > > > access to
> > > > > > > > > set
> > > > > > > > > > > > > > > > > > the record context. Since they do not, if we
> > > > > > wanted to
> > > > > > > > > add
> > > > > > > > > > > > > > > > > > the record context to StateStoreContext in a
> > > > > > well-defined
> > > > > > > > > > > > > > > > > > way, we'd also have to add the ability for
> > > > > > Processors to
> > > > > > > > > > > > > > > > > > manipulate it. But then, we're just
> > creating a
> > > > > > > > > side-channel
> > > > > > > > > > > > > > > > > > for Processors to pass some information in
> > > > > > arguments to
> > > > > > > > > > > > > > > > > > "put()" and other information implicitly
> > > > through
> > > > > > the
> > > > > > > > > > > > > > > > > > context. It seems better just to go for a
> > > > single
> > > > > > channel
> > > > > > > > > for
> > > > > > > > > > > > > > > > > > now.
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > It sounds like you're basically in favor of
> > > > the
> > > > > > > > > conservative
> > > > > > > > > > > > > > > > > > approach, and you just wanted to understand
> > > > the
> > > > > > blockers
> > > > > > > > > > > > > > > > > > that I implied. Does my clarification make
> > > > sense?
> > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > -John
> > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > On Thu, 2020-09-10 at 10:54 -0700, Sophie
> > > > > > Blee-Goldman
> > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > I was just thinking that the processor
> > would
> > > > > > null out
> > > > > > > > > the record
> > > > > > > > > > > > > > > > context
> > > > > > > > > > > > > > > > > > > after it
> > > > > > > > > > > > > > > > > > > finished processing the record, so I'm not
> > > > sure I
> > > > > > > > > follow why this
> > > > > > > > > > > > > > > > would
> > > > > > > > > > > > > > > > > > not
> > > > > > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > > > possible? AFAIK we never call a punctuator
> > > > in the
> > > > > > > > > middle of
> > > > > > > > > > > > > > > > processing a
> > > > > > > > > > > > > > > > > > > record through the topology, and even if
> > we
> > > > did,
> > > > > > we
> > > > > > > > > still know when
> > > > > > > > > > > > > > > > it is
> > > > > > > > > > > > > > > > > > > about
> > > > > > > > > > > > > > > > > > > to be called and could set it to null
> > > > beforehand.
> > > > > > > > > > > > > > > > > > > I'm not trying to advocate for it here,
> > I'm
> > > > in
> > > > > > > > > agreement that
> > > > > > > > > > > > > > > > anything
> > > > > > > > > > > > > > > > > > you
> > > > > > > > > > > > > > > > > > > want
> > > > > > > > > > > > > > > > > > > to access within the store can and should
> > be
> > > > > > accessed
> > > > > > > > > within the
> > > > > > > > > > > > > > > > calling
> > > > > > > > > > > > > > > > > > > Processor/Punctuator before reaching the
> > > > store.
> > > > > > The
> > > > > > > > > "we can always
> > > > > > > > > > > > > > > > add it
> > > > > > > > > > > > > > > > > > > later if necessary" argument is also
> > pretty
> > > > > > > > > convincing. Just trying
> > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > understand
> > > > > > > > > > > > > > > > > > > why this wouldn't be possible.
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > FWIW, the question of "what is the current
> > > > > > record in
> > > > > > > > > the context
> > > > > > > > > > > > > > > of a
> > > > > > > > > > > > > > > > > > > Punctuator"
> > > > > > > > > > > > > > > > > > > exists independently of whether we want to
> > > > add
> > > > > > this to
> > > > > > > > > the
> > > > > > > > > > > > > > > > > > StateStoreContext
> > > > > > > > > > > > > > > > > > > or not. The full ProcessorContext,
> > > > including the
> > > > > > > > > current record
> > > > > > > > > > > > > > > > context,
> > > > > > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > > > > already available within a Punctuator, so
> > > > > > removing the
> > > > > > > > > current
> > > > > > > > > > > > > > > record
> > > > > > > > > > > > > > > > > > > context
> > > > > > > > > > > > > > > > > > > from the StateStoreContext does not solve
> > > > the
> > > > > > problem.
> > > > > > > > > Users can --
> > > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > > have
> > > > > > > > > > > > > > > > > > > (see KAFKA-9584 <
> > > > > > > > > https://issues.apache.org/jira/browse/KAFKA-9584
> > > > > > > > > > > > > > > > ;;)
> > > > > > > > > > > > > > > > --
> > > > > > > > > > > > > > > > > > hit
> > > > > > > > > > > > > > > > > > > such subtle bugs without ever invoking a
> > > > > > StateStore
> > > > > > > > > > > > > > > > > > > from their punctuator.
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > Again, I think I do agree that we should
> > > > leave
> > > > > > the
> > > > > > > > > current record
> > > > > > > > > > > > > > > > context
> > > > > > > > > > > > > > > > > > > off of
> > > > > > > > > > > > > > > > > > > the StateStoreContext, but I don't think
> > the
> > > > > > > > > Punctuator argument
> > > > > > > > > > > > > > > > against
> > > > > > > > > > > > > > > > > > it
> > > > > > > > > > > > > > > > > > > is
> > > > > > > > > > > > > > > > > > > very convincing. It sounds to me like we
> > > > need to
> > > > > > > > > disallow access to
> > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > current
> > > > > > > > > > > > > > > > > > > record context from within the Punctuator,
> > > > > > independent
> > > > > > > > > of anything
> > > > > > > > > > > > > > > > to do
> > > > > > > > > > > > > > > > > > > with
> > > > > > > > > > > > > > > > > > > state stores
> > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > On Thu, Sep 10, 2020 at 7:12 AM John
> > > > Roesler <
> > > > > > > > > vvcep...@apache.org>
> > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > Thanks for the thoughts, Sophie.
> > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > I agree that the extra information could
> > > > be
> > > > > > useful.
> > > > > > > > > My only
> > > > > > > > > > > > > > > > concern is
> > > > > > > > > > > > > > > > > > > > that it doesn’t seem like we can
> > actually
> > > > > > supply
> > > > > > > > > that extra
> > > > > > > > > > > > > > > > information
> > > > > > > > > > > > > > > > > > > > correctly. So, then we have a situation
> > > > where
> > > > > > the
> > > > > > > > > system offers
> > > > > > > > > > > > > > > > useful
> > > > > > > > > > > > > > > > > > API
> > > > > > > > > > > > > > > > > > > > calls that are only correct in a narrow
> > > > range
> > > > > > of use
> > > > > > > > > cases.
> > > > > > > > > > > > > > > > Outside of
> > > > > > > > > > > > > > > > > > > > those use cases, you get incorrect
> > > > behavior.
> > > > > > > > > > > > > > > > > > > > If it were possible to null out the
> > > > context
> > > > > > before
> > > > > > > > > you put a
> > > > > > > > > > > > > > > > document
> > > > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > which the context doesn’t apply, then
> > the
> > > > > > concern
> > > > > > > > > would be
> > > > > > > > > > > > > > > > mitigated.
> > > > > > > > > > > > > > > > > > But
> > > > > > > > > > > > > > > > > > > > it would still be pretty weird from the
> > > > > > perspective
> > > > > > > > > of the store
> > > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > > > sometimes the context is populated and
> > > > other
> > > > > > times,
> > > > > > > > > it’s null.
> > > > > > > > > > > > > > > > > > > > But that seems moot, since it doesn’t
> > seem
> > > > > > possible
> > > > > > > > > to null out
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > context. Only the Processor could know
> > > > whether
> > > > > > it’s
> > > > > > > > > about to put
> > > > > > > > > > > > > > > a
> > > > > > > > > > > > > > > > > > document
> > > > > > > > > > > > > > > > > > > > different from the context or not. And
> > it
> > > > > > would be
> > > > > > > > > inappropriate
> > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > offer a
> > > > > > > > > > > > > > > > > > > > public ProcessorContext api to manage
> > the
> > > > > > record
> > > > > > > > > context.
> > > > > > > > > > > > > > > > > > > > Ultimately, it still seems like if you
> > > > want to
> > > > > > store
> > > > > > > > > headers, you
> > > > > > > > > > > > > > > > can
> > > > > > > > > > > > > > > > > > > > store them explicitly, right? That
> > > > doesn’t seem
> > > > > > > > > onerous to me,
> > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > it
> > > > > > > > > > > > > > > > > > kind
> > > > > > > > > > > > > > > > > > > > of seems better than relying on
> > undefined
> > > > or
> > > > > > > > > asymmetrical
> > > > > > > > > > > > > > > behavior
> > > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > store itself.
> > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > Anyway, I’m not saying that we couldn’t
> > > > solve
> > > > > > these
> > > > > > > > > problems.
> > > > > > > > > > > > > > > Just
> > > > > > > > > > > > > > > > > > that it
> > > > > > > > > > > > > > > > > > > > seems a little that we can be
> > > > conservative and
> > > > > > avoid
> > > > > > > > > them for
> > > > > > > > > > > > > > > now.
> > > > > > > > > > > > > > > > If
> > > > > > > > > > > > > > > > > > it
> > > > > > > > > > > > > > > > > > > > turns out we really need to solve them,
> > > > we can
> > > > > > > > > always do it
> > > > > > > > > > > > > > > later.
> > > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > > John
> > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > On Wed, Sep 9, 2020, at 22:46, Sophie
> > > > > > Blee-Goldman
> > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > If you were to call "put" from a
> > > > > > punctuator, or
> > > > > > > > > do a
> > > > > > > > > > > > > > > > > > > > > > `range()` query and then update one
> > of
> > > > > > those
> > > > > > > > > records with
> > > > > > > > > > > > > > > > > > > > > > `put()`, you'd have a very subtle
> > bug
> > > > on
> > > > > > your
> > > > > > > > > hands.
> > > > > > > > > > > > > > > > > > > > > Can you elaborate on this a bit? I
> > agree
> > > > > > that the
> > > > > > > > > punctuator
> > > > > > > > > > > > > > > > case is
> > > > > > > > > > > > > > > > > > an
> > > > > > > > > > > > > > > > > > > > > obvious exemption to the assumption
> > that
> > > > > > store
> > > > > > > > > invocations
> > > > > > > > > > > > > > > always
> > > > > > > > > > > > > > > > > > > > > have a corresponding "current record",
> > > > but I
> > > > > > don't
> > > > > > > > > understand
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > second example. Are you envisioning a
> > > > > > scenario
> > > > > > > > > where the
> > > > > > > > > > > > > > > #process
> > > > > > > > > > > > > > > > > > > > > method performs a range query and then
> > > > > > updates
> > > > > > > > > records? Or were
> > > > > > > > > > > > > > > > > > > > > you just giving another example of the
> > > > > > punctuator
> > > > > > > > > case?
> > > > > > > > > > > > > > > > > > > > > I only bring it up because I agree
> > that
> > > > the
> > > > > > > > > current record
> > > > > > > > > > > > > > > > > > information
> > > > > > > > > > > > > > > > > > > > could
> > > > > > > > > > > > > > > > > > > > > still be useful within the context of
> > > > the
> > > > > > store.
> > > > > > > > > As a non-user
> > > > > > > > > > > > > > > my
> > > > > > > > > > > > > > > > > > input
> > > > > > > > > > > > > > > > > > > > on
> > > > > > > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > > > > definitely has limited value, but it
> > > > just
> > > > > > isn't
> > > > > > > > > striking me as
> > > > > > > > > > > > > > > > > > obvious
> > > > > > > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > > > > we
> > > > > > > > > > > > > > > > > > > > > should remove access to the current
> > > > record
> > > > > > context
> > > > > > > > > from the
> > > > > > > > > > > > > > > state
> > > > > > > > > > > > > > > > > > stores.
> > > > > > > > > > > > > > > > > > > > > If there is no current record, as in
> > the
> > > > > > > > > punctuator case, we
> > > > > > > > > > > > > > > > should
> > > > > > > > > > > > > > > > > > just
> > > > > > > > > > > > > > > > > > > > > set
> > > > > > > > > > > > > > > > > > > > > the record context to null (or
> > > > > > Optional.empty,
> > > > > > > > > etc).
> > > > > > > > > > > > > > > > > > > > > That said, the put() always has to
> > come
> > > > from
> > > > > > > > > somewhere, and
> > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > > > > somewhere is always going to be
> > either a
> > > > > > Processor
> > > > > > > > > or a
> > > > > > > > > > > > > > > > Punctuator,
> > > > > > > > > > > > > > > > > > both
> > > > > > > > > > > > > > > > > > > > > of which will still have access to the
> > > > full
> > > > > > > > > context. So
> > > > > > > > > > > > > > > > additional
> > > > > > > > > > > > > > > > > > info
> > > > > > > > > > > > > > > > > > > > > such as
> > > > > > > > > > > > > > > > > > > > > the timestamp can and should probably
> > be
> > > > > > supplied
> > > > > > > > > to the store
> > > > > > > > > > > > > > > > before
> > > > > > > > > > > > > > > > > > > > > calling put(), rather than looked up
> > by
> > > > the
> > > > > > store.
> > > > > > > > > But I can
> > > > > > > > > > > > > > > see
> > > > > > > > > > > > > > > > some
> > > > > > > > > > > > > > > > > > > > other
> > > > > > > > > > > > > > > > > > > > > things being useful, for example the
> > > > current
> > > > > > > > > record's headers.
> > > > > > > > > > > > > > > > Maybe
> > > > > > > > > > > > > > > > > > > > if/when
> > > > > > > > > > > > > > > > > > > > > we add better (or any) support for
> > > > headers in
> > > > > > > > > state stores this
> > > > > > > > > > > > > > > > will
> > > > > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > > > > > less true.
> > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > Of course as John has made clear, it's
> > > > > > pretty hard
> > > > > > > > > to judge
> > > > > > > > > > > > > > > > without
> > > > > > > > > > > > > > > > > > > > > examples
> > > > > > > > > > > > > > > > > > > > > and more insight as to what actually
> > > > goes on
> > > > > > > > > within a custom
> > > > > > > > > > > > > > > > state
> > > > > > > > > > > > > > > > > > store
> > > > > > > > > > > > > > > > > > > > > On Wed, Sep 9, 2020 at 8:07 PM John
> > > > Roesler <
> > > > > > > > > > > > > > > vvcep...@apache.org
> > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > Hi Paul,
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > It's good to hear from you!
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > I'm glad you're in favor of the
> > > > direction.
> > > > > > > > > Especially when
> > > > > > > > > > > > > > > > > > > > > > it comes to public API and usability
> > > > > > concens, I
> > > > > > > > > tend to
> > > > > > > > > > > > > > > > > > > > > > think that "the folks who matter"
> > are
> > > > > > actually
> > > > > > > > > the folks who
> > > > > > > > > > > > > > > > > > > > > > have to use the APIs to accomplish
> > > > real
> > > > > > tasks.
> > > > > > > > > It can be
> > > > > > > > > > > > > > > > > > > > > > hard for me to be sure I'm thinking
> > > > > > clearly from
> > > > > > > > > that
> > > > > > > > > > > > > > > > > > > > > > perspective.
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > Funny story, I also started down
> > this
> > > > road
> > > > > > a
> > > > > > > > > couple of times
> > > > > > > > > > > > > > > > > > > > > > already and backed them out before
> > > > the KIP
> > > > > > > > > because I was
> > > > > > > > > > > > > > > > > > > > > > afraid of the scope of the proposal.
> > > > > > > > > Unfortunately, needing
> > > > > > > > > > > > > > > > > > > > > > to make a new ProcessorContext kind
> > of
> > > > > > forced my
> > > > > > > > > hand.
> > > > > > > > > > > > > > > > > > > > > > I see you've called me out about the
> > > > > > > > > ChangeLogging stores :)
> > > > > > > > > > > > > > > > > > > > > > In fact, I think these are the
> > > > main/only
> > > > > > reason
> > > > > > > > > that stores
> > > > > > > > > > > > > > > > > > > > > > might really need to invoke
> > > > "forward()". My
> > > > > > > > > secret plan was
> > > > > > > > > > > > > > > > > > > > > > to cheat and either accomplish
> > > > > > change-logging by
> > > > > > > > > a different
> > > > > > > > > > > > > > > > > > > > > > mechanism than implementing the
> > store
> > > > > > interface,
> > > > > > > > > or by just
> > > > > > > > > > > > > > > > > > > > > > breaking encapsulation to sneak the
> > > > "real"
> > > > > > > > > ProcessorContext
> > > > > > > > > > > > > > > > > > > > > > into the ChangeLogging stores. But
> > > > those
> > > > > > are all
> > > > > > > > > > > > > > > > > > > > > > implementation details. I think the
> > > > key
> > > > > > question
> > > > > > > > > is whether
> > > > > > > > > > > > > > > > > > > > > > anyone else has a store
> > > > implementation that
> > > > > > > > > needs to call
> > > > > > > > > > > > > > > > > > > > > > "forward()". It's not what you
> > > > mentioned,
> > > > > > but
> > > > > > > > > since you
> > > > > > > > > > > > > > > > > > > > > > spoke up, I'll just ask: if you have
> > > > a use
> > > > > > case
> > > > > > > > > for calling
> > > > > > > > > > > > > > > > > > > > > > "forward()" in a store, please share
> > > > it.
> > > > > > > > > > > > > > > > > > > > > > Regarding the other record-specific
> > > > context
> > > > > > > > > methods, I think
> > > > > > > > > > > > > > > > > > > > > > you have a good point, but I also
> > > > can't
> > > > > > quite
> > > > > > > > > wrap my head
> > > > > > > > > > > > > > > > > > > > > > around how we can actually guarantee
> > > > it to
> > > > > > work
> > > > > > > > > in general.
> > > > > > > > > > > > > > > > > > > > > > For example, the case you cited,
> > > > where the
> > > > > > > > > implementation of
> > > > > > > > > > > > > > > > > > > > > > `KeyValueStore#put(key, value)` uses
> > > > the
> > > > > > context
> > > > > > > > > to augment
> > > > > > > > > > > > > > > > > > > > > > the record with timestamp
> > > > information. This
> > > > > > > > > relies on the
> > > > > > > > > > > > > > > > > > > > > > assumption that you would only call
> > > > > > "put()" from
> > > > > > > > > inside a
> > > > > > > > > > > > > > > > > > > > > > `Processor#process(key, value)` call
> > > > in
> > > > > > which
> > > > > > > > > the record
> > > > > > > > > > > > > > > > > > > > > > being processed is the same record
> > > > that
> > > > > > you're
> > > > > > > > > trying to put
> > > > > > > > > > > > > > > > > > > > > > into the store.
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > If you were to call "put" from a
> > > > > > punctuator, or
> > > > > > > > > do a
> > > > > > > > > > > > > > > > > > > > > > `range()` query and then update one
> > of
> > > > > > those
> > > > > > > > > records with
> > > > > > > > > > > > > > > > > > > > > > `put()`, you'd have a very subtle
> > bug
> > > > on
> > > > > > your
> > > > > > > > > hands. Right
> > > > > > > > > > > > > > > > > > > > > > now, the Streams component that
> > > > actually
> > > > > > calls
> > > > > > > > > the Processor
> > > > > > > > > > > > > > > > > > > > > > takes care to set the right record
> > > > context
> > > > > > > > > before invoking
> > > > > > > > > > > > > > > > > > > > > > the method, and in the case of
> > > > caching,
> > > > > > etc., it
> > > > > > > > > also takes
> > > > > > > > > > > > > > > > > > > > > > care to swap out the old context and
> > > > keep
> > > > > > it
> > > > > > > > > somewhere safe.
> > > > > > > > > > > > > > > > > > > > > > But when it comes to public API
> > > > Processors
> > > > > > > > > calling methods
> > > > > > > > > > > > > > > > > > > > > > on StateStores, there's no
> > > > opportunity for
> > > > > > any
> > > > > > > > > component to
> > > > > > > > > > > > > > > > > > > > > > make sure the context is always
> > > > correct.
> > > > > > > > > > > > > > > > > > > > > > In the face of that situation, it
> > > > seemed
> > > > > > better
> > > > > > > > > to just move
> > > > > > > > > > > > > > > > > > > > > > in the direction of a "normal" data
> > > > store.
> > > > > > I.e.,
> > > > > > > > > when you
> > > > > > > > > > > > > > > > > > > > > > use a HashMap or RocksDB or other
> > > > "state
> > > > > > > > > stores", you don't
> > > > > > > > > > > > > > > > > > > > > > expect them to automatically know
> > > > extra
> > > > > > stuff
> > > > > > > > > about the
> > > > > > > > > > > > > > > > > > > > > > record you're storing. If you need
> > > > them to
> > > > > > know
> > > > > > > > > something,
> > > > > > > > > > > > > > > > > > > > > > you just put it in the value.
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > All of that said, I'm just reasoning
> > > > from
> > > > > > first
> > > > > > > > > principles
> > > > > > > > > > > > > > > > > > > > > > here. To really know if this is a
> > > > mistake
> > > > > > or
> > > > > > > > > not, I need to
> > > > > > > > > > > > > > > > > > > > > > be in your place. So please push
> > back
> > > > if
> > > > > > you
> > > > > > > > > think what I
> > > > > > > > > > > > > > > > > > > > > > said is nonsense. My personal plan
> > > > was to
> > > > > > keep
> > > > > > > > > an eye out
> > > > > > > > > > > > > > > > > > > > > > during the period where the old API
> > > > was
> > > > > > still
> > > > > > > > > present, but
> > > > > > > > > > > > > > > > > > > > > > deprecated, to see if people were
> > > > > > struggling to
> > > > > > > > > use the new
> > > > > > > > > > > > > > > > > > > > > > API. If so, then we'd have a chance
> > to
> > > > > > address
> > > > > > > > > it before
> > > > > > > > > > > > > > > > > > > > > > dropping the old API. But it's even
> > > > better
> > > > > > if
> > > > > > > > > you can help
> > > > > > > > > > > > > > > > > > > > > > think it through now.
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > It did also cross my mind to _not_
> > > > add the
> > > > > > > > > > > > > > > > > > > > > > StateStoreContext, but just to
> > > > continue to
> > > > > > punt
> > > > > > > > > on the
> > > > > > > > > > > > > > > > > > > > > > question by just dropping in the new
> > > > > > > > > ProcessorContext to the
> > > > > > > > > > > > > > > > > > > > > > new init method. If
> > StateStoreContext
> > > > > > seems too
> > > > > > > > > bold, we can
> > > > > > > > > > > > > > > > > > > > > > go that direction. But if we
> > actually
> > > > add
> > > > > > some
> > > > > > > > > methods to
> > > > > > > > > > > > > > > > > > > > > > StateStoreContext, I'd like to be
> > > > able to
> > > > > > ensure
> > > > > > > > > they would
> > > > > > > > > > > > > > > > > > > > > > be well defined. I think the current
> > > > > > situation
> > > > > > > > > was more of
> > > > > > > > > > > > > > > > > > > > > > an oversight than a choice.
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > Thanks again for your reply,
> > > > > > > > > > > > > > > > > > > > > > -John
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > On Wed, 2020-09-09 at 21:23 -0500,
> > > > Paul
> > > > > > Whalen
> > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > John,
> > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > It's exciting to see this KIP head
> > > > in
> > > > > > this
> > > > > > > > > direction!  In
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > last
> > > > > > > > > > > > > > > > > > > > year
> > > > > > > > > > > > > > > > > > > > > > or
> > > > > > > > > > > > > > > > > > > > > > > so I've tried to sketch out some
> > > > > > usability
> > > > > > > > > improvements for
> > > > > > > > > > > > > > > > > > custom
> > > > > > > > > > > > > > > > > > > > state
> > > > > > > > > > > > > > > > > > > > > > > stores, and I also ended up
> > > > splitting
> > > > > > out the
> > > > > > > > > > > > > > > > StateStoreContext
> > > > > > > > > > > > > > > > > > from
> > > > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > ProcessorContext in an attempt to
> > > > > > facilitate
> > > > > > > > > what I was
> > > > > > > > > > > > > > > > doing.  I
> > > > > > > > > > > > > > > > > > > > sort of
> > > > > > > > > > > > > > > > > > > > > > > abandoned it when I realized how
> > > > large
> > > > > > the
> > > > > > > > > ideal change
> > > > > > > > > > > > > > > might
> > > > > > > > > > > > > > > > > > have
> > > > > > > > > > > > > > > > > > > > to be,
> > > > > > > > > > > > > > > > > > > > > > > but it's great to see that there
> > is
> > > > other
> > > > > > > > > interest in
> > > > > > > > > > > > > > > moving
> > > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > > > > > > direction (from the folks that
> > > > matter :)
> > > > > > ).
> > > > > > > > > > > > > > > > > > > > > > > Having taken a stab at it myself,
> > I
> > > > have
> > > > > > a
> > > > > > > > > comment/question
> > > > > > > > > > > > > > > > on
> > > > > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > > > > > bullet
> > > > > > > > > > > > > > > > > > > > > > > about StateStoreContext:
> > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > It does *not*  include anything
> > > > > > processor- or
> > > > > > > > > record-
> > > > > > > > > > > > > > > > specific,
> > > > > > > > > > > > > > > > > > like
> > > > > > > > > > > > > > > > > > > > > > > > `forward()` or any information
> > > > about
> > > > > > the
> > > > > > > > > "current"
> > > > > > > > > > > > > > > record,
> > > > > > > > > > > > > > > > > > which is
> > > > > > > > > > > > > > > > > > > > > > only a
> > > > > > > > > > > > > > > > > > > > > > > > well-defined in the context of
> > the
> > > > > > > > > Processor. Processors
> > > > > > > > > > > > > > > > > > process
> > > > > > > > > > > > > > > > > > > > one
> > > > > > > > > > > > > > > > > > > > > > record
> > > > > > > > > > > > > > > > > > > > > > > > at a time, but state stores may
> > be
> > > > > > used to
> > > > > > > > > store and
> > > > > > > > > > > > > > > fetch
> > > > > > > > > > > > > > > > many
> > > > > > > > > > > > > > > > > > > > > > records, so
> > > > > > > > > > > > > > > > > > > > > > > > there is no "current record".
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > I totally agree that
> > > > record-specific or
> > > > > > > > > processor-specific
> > > > > > > > > > > > > > > > > > context
> > > > > > > > > > > > > > > > > > > > in a
> > > > > > > > > > > > > > > > > > > > > > > state store is often not
> > > > well-defined
> > > > > > and it
> > > > > > > > > would be good
> > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > > > separate
> > > > > > > > > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > > > > > > out, but sometimes it (at least
> > > > > > > > > record-specific context) is
> > > > > > > > > > > > > > > > > > actually
> > > > > > > > > > > > > > > > > > > > > > > useful, for example, passing the
> > > > record's
> > > > > > > > > timestamp through
> > > > > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > > > > > > > underlying storage (or changelog
> > > > topic):
> > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java#L121
> > > > > > > > > > > > > > > > > > > > > > > You could have the writer client
> > of
> > > > the
> > > > > > state
> > > > > > > > > store pass
> > > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > > > > > through,
> > > > > > > > > > > > > > > > > > > > > > but
> > > > > > > > > > > > > > > > > > > > > > > it would be nice to be able to
> > write
> > > > > > state
> > > > > > > > > stores where the
> > > > > > > > > > > > > > > > > > client
> > > > > > > > > > > > > > > > > > > > did
> > > > > > > > > > > > > > > > > > > > > > not
> > > > > > > > > > > > > > > > > > > > > > > have this responsibility.  I'm not
> > > > sure
> > > > > > if the
> > > > > > > > > solution is
> > > > > > > > > > > > > > > > to add
> > > > > > > > > > > > > > > > > > > > some
> > > > > > > > > > > > > > > > > > > > > > > things back to StateStoreContext,
> > or
> > > > > > make yet
> > > > > > > > > another
> > > > > > > > > > > > > > > context
> > > > > > > > > > > > > > > > > > that
> > > > > > > > > > > > > > > > > > > > > > > represents record-specific context
> > > > while
> > > > > > > > > inside a state
> > > > > > > > > > > > > > > > store.
> > > > > > > > > > > > > > > > > > > > > > > Best,
> > > > > > > > > > > > > > > > > > > > > > > Paul
> > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > On Wed, Sep 9, 2020 at 5:43 PM
> > John
> > > > > > Roesler <
> > > > > > > > > > > > > > > > j...@vvcephei.org>
> > > > > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > > > > > > > > Hello all,
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > I've been slowly pushing KIP-478
> > > > > > forward
> > > > > > > > > over the last
> > > > > > > > > > > > > > > > year,
> > > > > > > > > > > > > > > > > > > > > > > > and I'm happy to say that we're
> > > > making
> > > > > > good
> > > > > > > > > progress now.
> > > > > > > > > > > > > > > > > > > > > > > > However, several issues with the
> > > > > > original
> > > > > > > > > design have
> > > > > > > > > > > > > > > come
> > > > > > > > > > > > > > > > > > > > > > > > to light.
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > The major changes:
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > We discovered that the original
> > > > plan
> > > > > > of just
> > > > > > > > > adding
> > > > > > > > > > > > > > > generic
> > > > > > > > > > > > > > > > > > > > > > > > parameters to ProcessorContext
> > > > was too
> > > > > > > > > disruptive, so we
> > > > > > > > > > > > > > > > are
> > > > > > > > > > > > > > > > > > > > > > > > now adding a new
> > > > api.ProcessorContext.
> > > > > > > > > > > > > > > > > > > > > > > > That choice forces us to add a
> > new
> > > > > > > > > StateStore.init method
> > > > > > > > > > > > > > > > > > > > > > > > for the new context, but
> > > > > > ProcessorContext
> > > > > > > > > really isn't
> > > > > > > > > > > > > > > > ideal
> > > > > > > > > > > > > > > > > > > > > > > > for state stores to begin with,
> > > > so I'm
> > > > > > > > > proposing a new
> > > > > > > > > > > > > > > > > > > > > > > > StateStoreContext for this
> > > > purpose. In
> > > > > > a
> > > > > > > > > nutshell, there
> > > > > > > > > > > > > > > > are
> > > > > > > > > > > > > > > > > > > > > > > > quite a few methods in
> > > > > > ProcessorContext that
> > > > > > > > > actually
> > > > > > > > > > > > > > > > should
> > > > > > > > > > > > > > > > > > > > > > > > never be called from inside a
> > > > > > StateStore.
> > > > > > > > > > > > > > > > > > > > > > > > Also, since there is a new
> > > > > > ProcessorContext
> > > > > > > > > interface, we
> > > > > > > > > > > > > > > > > > > > > > > > need a new MockProcessorContext
> > > > > > > > > implementation in the
> > > > > > > > > > > > > > > test-
> > > > > > > > > > > > > > > > > > > > > > > > utils module.
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > The changeset for the KIP
> > > > document is
> > > > > > here:
> > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=14&selectedPageVersions=10
> > > > > > > > > > > > > > > > > > > > > > > > And the KIP itself is here:
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > 
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-478+-+Strongly+typed+Processor+API
> > > > > > > > > > > > > > > > > > > > > > > > If you have any concerns, please
> > > > let
> > > > > > me know!
> > > > > > > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > > > > > > -John
> > > > > > > > > > > > > > > > > > > > > > > > 
> > > > > > > > > > > > > > > > > > > > > > > > 

Reply via email to