FWIW, while I'm really not a fan of Optional in general, I agree that its
usage
here seems appropriate. Even for those rare software developers who
carefully
read all the docs several times over, I think it wouldn't be too hard to
miss a
note about the RecordMetadata possibly being null.

Especially because it's not that obvious why at first glance, and takes a
bit of
thinking to realize that records originating from a Punctuator wouldn't
have a
"current record". This  is something that has definitely confused users
today.

It's on us to improve the education here -- and an Optional<RecordMetadata>
would naturally raise awareness of this subtlety

On Tue, Sep 29, 2020 at 7:40 PM Sophie Blee-Goldman <sop...@confluent.io>
wrote:

> Does my reply address your concerns?
>
>
> Yes; also, I definitely misread part of the proposal earlier and thought
> you had put
> the timestamp field in RecordMetadata. Sorry for not giving things a
> closer look
> before responding! I'm not sure my original message made much sense given
> the misunderstanding, but thanks for responding anyway :P
>
> Having given the proposal a second pass, I agree, it's very elegant. +1
>
> On Tue, Sep 29, 2020 at 6:50 PM John Roesler <vvcep...@apache.org> wrote:
>
>> Thanks for the reply, Sophie,
>>
>> I think I may have summarized too much in my prior reply.
>>
>> In the currently proposed KIP, any caller of forward() must
>> supply a Record, which consists of:
>> * key
>> * value
>> * timestamp
>> * headers (with a convenience constructor that sets empty
>> headers)
>>
>> These aren't what I was referring to as potentially being
>> undefined downstream, since thanks to the introduction of
>> Record, they are, as you're advocating, required to be
>> defined everywhere, even when forwarding from a punctuator.
>>
>> So to be clear, the intent of this change is actually to
>> _enforce_ that timestamp would never be undefined (which it
>> currently can be). Also, since punctuators _are_ going to
>> have to "make up" a timestamp going forward, we should note
>> that the "punctuate" method currently passes in a good
>> timestamp that they can use: for system-time punctuations,
>> they receive the current system time, and for stream-time
>> punctuations, they get the current stream time.
>>
>> The potentially undefined RecordMetadata only contains these
>> fields:
>> * topic
>> * partition
>> * offset
>>
>> These fields aren't required (or even used) in a Sink, and
>> it doesn't seem like they would be important to many
>> applications. Furthermore, it doesn't _seem_ like you'd even
>> want to set these fields. They seem purely informational and
>> only useful in the context when you are actually processing
>> a real input record. It doesn't sound like you were asking
>> for it, but just to put it on the record, I think if we were
>> to require values for the metadata from punctuators, people
>> would mostly just make up their own dummy values, to no
>> one's benefit.
>>
>> I should also note that with the current
>> Record/RecordMetadata split, we will have the freedom to
>> move fields into the Record class (or even add new fields)
>> if we want them to become "data" as opposed to "metadata" in
>> the future.
>>
>> Thanks for your reply; I was similarly floored when I
>> realized the true nature of the current situation. Does my
>> reply address your concerns?
>>
>> Thanks,
>> -John
>>
>> On Tue, 2020-09-29 at 18:34 -0700, Sophie Blee-Goldman
>> wrote:
>> > > However, the record metadata is only defined when the parent forwards
>> > > while processing a
>> >
>> > real record, not when it calls forward from the punctuator
>> >
>> >
>> > Can we take a step back for a second...why wouldn't you be required to
>> set
>> > the RecordContext
>> > yourself when calling forward from a Punctuator? I think I agree with
>> Paul
>> > here, it seems kind of
>> > absurd not to enforce that the RecordContext be present inside the
>> > process() method.
>> >
>> > The original problem with Punctuators, as I understood it, was that all
>> of
>> > the RecordContext
>> > fields were exposed automatically to both the Processor and any
>> Punctuator,
>> > due to being
>> > direct methods on the ProcessorContext. We can't control which
>> > ProcessorContext methods
>> > someone will call from with a Punctuator vs from a Processor. The best
>> we
>> > could do was
>> > set these "nonsense" fields to null when inside a Punctuator, or set
>> them
>> > to some dummy
>> > values as you pointed out.
>> >
>> > But then you proposed the solution of a separate RecordContext which is
>> not
>> > attached to the
>> > ProcessorContext at all. This seemed to solve the above problem very
>> > neatly: we only pass
>> > in the RecordContext to the process() method, so we don't have to worry
>> > about people trying
>> > to access these fields from within a Punctuator. The fields aren't
>> > accessible unless they're
>> > defined.
>> >
>> > So what happens when someone wants to forward something from within a
>> > Punctuator? I
>> > don't think it's reasonable to let the timestamp field be undefined,
>> ever.
>> > What if the Punctuator
>> > forwards directly to a sink, or directly to some windowing logic. Are we
>> > supposed to add
>> > handling for the RecordContext == null case to every processor? Or are
>> we
>> > just going to
>> > assume the implicit restriction that users will only forward records
>> from a
>> > Punctuator to
>> > downstream processors that know how to handle and/or set the
>> RecordContext
>> > if it's
>> > undefined. That seems to throw away a lot of the awesome safety added in
>> > this KIP
>> >
>> > Apologies for the rant. But I feel pretty strongly that allowing to
>> forward
>> > records from a
>> > Punctuator without a defined RecordContext would be asking for trouble.
>> > Imo, if you
>> > want to forward from a Punctuator, you need to store the info you need
>> in
>> > order to
>> > set the timestamp, or make one up yourself
>> >
>> > (the one alternative I can think of here is that maybe we could pass in
>> the
>> > current
>> > partition time, so users can at least put in a reasonable estimate for
>> the
>> > timestamp
>> > that won't cause it to get dropped and won't potentially lurch the
>> > streamtime far into
>> > the future. This would be similar to what we do in the
>> TimestampExtractor)
>> >
>> > On Tue, Sep 29, 2020 at 6:06 PM John Roesler <vvcep...@apache.org>
>> wrote:
>> >
>> > > Oh, I guess one other thing I should have mentioned is that I’ve
>> recently
>> > > discovered that in cases where the context is undefined, we currently
>> just
>> > > fill in dummy values for the context. So there’s a good chance that
>> real
>> > > applications in use are depending on undefined context without even
>> > > realizing it. What I’m hoping to do is just make the situation
>> explicit and
>> > > get rid of the dummy values.
>> > >
>> > > Thanks,
>> > > John
>> > >
>> > > On Tue, Sep 29, 2020, at 20:01, John Roesler wrote:
>> > > > Thanks for the review, Paul!
>> > > >
>> > > > I had read some of that debate before. There seems to be some
>> subtext
>> > > > there, because they advise against using Optional in cases like
>> this,
>> > > > but there doesn’t seem to be a specific reason why it’s
>> inappropriate.
>> > > > I got the impression they were just afraid that people would go
>> > > > overboard and make everything Optional.
>> > > >
>> > > > I could also make two methods, but it seemed like it might be an
>> > > > unfortunate way to handle the issue, since Processor is just about a
>> > > > Function as-is, but the two-method approach would require people to
>> > > > implement both methods.
>> > > >
>> > > > To your question, this is something that’s only recently became
>> clear
>> > > > to me. Imagine you have a parent processor that calls forward both
>> from
>> > > > process and a punctuator. The child will have process() invoked in
>> both
>> > > > cases, and won’t be able to distinguish them. However, the record
>> > > > metadata is only defined when the parent forwards while processing a
>> > > > real record, not when it calls forward from the punctuator.
>> > > >
>> > > > This is why I wanted to make the metadata Optional, to advertise
>> that
>> > > > the metadata might be undefined if any ancestor processor ever calls
>> > > > forward from a punctuator. We could remove the Optional and instead
>> > > > just document that the argument might be null.
>> > > >
>> > > > With that context in place, what’s your take?
>> > > >
>> > > > Thanks,
>> > > > John
>> > > >
>> > > > On Tue, Sep 29, 2020, at 19:09, Paul Whalen wrote:
>> > > > > Looks pretty good to me, though the Processor#process(Record,
>> > > > > Optional<RecordMetadata>) signature caught my eye.  There's some
>> > > debate
>> > > > > (
>> > > > >
>> > >
>> https://stackoverflow.com/questions/31922866/why-should-java-8s-optional-not-be-used-in-arguments
>> > > )
>> > > > > about whether to use Optionals in arguments, and while that's a
>> bit of
>> > > a
>> > > > > religious debate in the abstract, it did make me wonder whether it
>> > > makes
>> > > > > sense in this specific case.  When is it actually not present?  I
>> was
>> > > > > under
>> > > > > the impression that we should always have access to it in
>> process(),
>> > > and
>> > > > > that the concern about metadata being undefined was about having
>> > > access
>> > > > > to
>> > > > > record metadata in the ProcessorContext held for use inside a
>> > > > > Punctuator.
>> > > > >
>> > > > > If that's not the case and it is truly optional in process(), is
>> there
>> > > an
>> > > > > opportunity for an alternate interface for the cases when we
>> don't get
>> > > it,
>> > > > > rather than force the branching on implementers of the interface?
>> > > > >
>> > > > > Apologies if I've missed something, I took a look at the PR and I
>> > > didn't
>> > > > > see any spots where I thought it would be empty.  Perhaps an
>> example
>> > > of a
>> > > > > Punctuator using (and not using) the new API would clear things
>> up.
>> > > > >
>> > > > > Best,
>> > > > > Paul
>> > > > >
>> > > > > On Tue, Sep 29, 2020 at 4:10 PM John Roesler <vvcep...@apache.org
>> >
>> > > wrote:
>> > > > > > Hello again, all,
>> > > > > >
>> > > > > > Thanks for the latest round of discussion. I've taken the
>> > > > > > recent feedback and come up with an updated KIP that seems
>> > > > > > actually quite a bit nicer than the prior proposal.
>> > > > > >
>> > > > > > The specific diff on the KIP is here:
>> > > > > >
>> > > > > >
>> > >
>> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=15&selectedPageVersions=14
>> > > > > > These changes are implemented in this POC PR:
>> > > > > > https://github.com/apache/kafka/pull/9346
>> > > > > >
>> > > > > > The basic idea is that, building on the recent conversaion,
>> > > > > > we would transition away from the current API where we get
>> > > > > > only key/value in the process() method and other "data"
>> > > > > > comes in the ProcessorContext along with the "metadata".
>> > > > > >
>> > > > > > Instead, we formalize what is "data" and what is "metadata",
>> > > > > > and pass it all in to the process method:
>> > > > > > Processor#process(Record, Optional<RecordMetadata>)
>> > > > > >
>> > > > > > Also, you forward the whole data class instead of mutating
>> > > > > > the ProcessorContext fields and also calling forward:
>> > > > > > ProcessorContext#forward(Record)
>> > > > > >
>> > > > > > The Record class itself ships with methods like
>> > > > > > record#withValue(NewV newValue)
>> > > > > > that make a shallow copy of the input Record, enabling
>> > > > > > Processors to safely handle the record without polluting the
>> > > > > > context of their parents and siblings.
>> > > > > >
>> > > > > > This proposal has a number of key benefits:
>> > > > > > * As we've discovered in KAFKA-9584, it's unsafe to mutate
>> > > > > > the Headers via the ProcessorContext. This proposal offers a
>> > > > > > way to safely forward changes only to downstream processors.
>> > > > > > * The new API has symmetry (each processor's input is the
>> > > > > > output of its parent processor)
>> > > > > > * The API makes clear that the record metadata isn't always
>> > > > > > defined (for example, in a punctuation, there is no current
>> > > > > > topic/partition/offset)
>> > > > > > * The API enables punctuators to forward well defined
>> > > > > > headers downstream, which is currently not possible.
>> > > > > >
>> > > > > > Unless their are objections, I'll go ahead and re-finalize
>> > > > > > this KIP and update that PR to a mergeable state.
>> > > > > >
>> > > > > > Thanks, all,
>> > > > > > -John
>> > > > > >
>> > > > > >
>> > > > > > On Thu, 2020-09-24 at 09:41 -0700, Matthias J. Sax wrote:
>> > > > > > > Interesting proposal. However, I am not totally convinced,
>> because
>> > > I see
>> > > > > > > a fundamental difference between "data" and "metadata".
>> > > > > > >
>> > > > > > > Topic/partition/offset are "metadata" in the strong sense and
>> they
>> > > are
>> > > > > > > immutable.
>> > > > > > >
>> > > > > > > On the other hand there is "primary" data like key and value,
>> as
>> > > well as
>> > > > > > > "secondary" data like timestamp and headers. The issue seems
>> that
>> > > we
>> > > > > > > treat "secondary data" more like metadata atm?
>> > > > > > >
>> > > > > > > Thus, promoting timestamp and headers into a first class
>> citizen
>> > > roll
>> > > > > > > make sense to me (my original proposal about `RecordContext`
>> would
>> > > still
>> > > > > > > fall short with this regard). However, putting both (data and
>> > > metadata)
>> > > > > > > into a `Record` abstraction might go too far?
>> > > > > > >
>> > > > > > > I am also a little bit concerned about `Record.copy()`
>> because it
>> > > might
>> > > > > > > be a trap: Users might assume it does a full deep copy of the
>> > > record,
>> > > > > > > however, it would not. It would only create a new `Record`
>> object
>> > > as
>> > > > > > > wrapper that points to the same key/value/header objects as
>> the
>> > > input
>> > > > > > > record.
>> > > > > > >
>> > > > > > > With the current `context.forward(key, value)` we don't have
>> this
>> > > "deep
>> > > > > > > copy" issue -- it's pretty clear what is happening.
>> > > > > > >
>> > > > > > > Instead of `To.all().withTimestamp()` we could also add
>> > > > > > > `context.forward(key, value, timestamp)` etc (just wondering
>> about
>> > > the
>> > > > > > > exposition in overload)?
>> > > > > > >
>> > > > > > > Also, `Record.withValue` etc sounds odd? Should a record not
>> be
>> > > > > > > immutable? So, we could have something like
>> > > > > > >
>> > > > > > >
>> > >
>> `RecordFactory.withKeyValue(...).withTimestamp(...).withHeaders(...).build()`.
>> > > > > > > But it looks rather verbose?
>> > > > > > >
>> > > > > > > The other question is of course, to what extend to we want to
>> keep
>> > > the
>> > > > > > > distinction between "primary" and "secondary" data? To me,
>> it's a
>> > > > > > > question of easy of use?
>> > > > > > >
>> > > > > > > Just putting all this out to move the discussion forward.
>> Don't
>> > > have a
>> > > > > > > concrete proposal atm.
>> > > > > > >
>> > > > > > >
>> > > > > > > -Matthias
>> > > > > > >
>> > > > > > >
>> > > > > > > On 9/14/20 9:24 AM, John Roesler wrote:
>> > > > > > > > Thanks for this thought, Matthias!
>> > > > > > > >
>> > > > > > > > To be honest, it's bugged me quite a bit that _all_ the
>> > > > > > > > record information hasn't been an argument to `process`. I
>> > > > > > > > suppose I was trying to be conservative in this proposal,
>> > > > > > > > but then again, if we're adding new Processor and
>> > > > > > > > ProcessorContext interfaces, then this is the time to make
>> > > > > > > > such a change.
>> > > > > > > >
>> > > > > > > > To be unambiguous, I think this is what we're talking about:
>> > > > > > > > ProcessorContext:
>> > > > > > > > * applicationId
>> > > > > > > > * taskId
>> > > > > > > > * appConfigs
>> > > > > > > > * appConfigsWithPrefix
>> > > > > > > > * keySerde
>> > > > > > > > * valueSerde
>> > > > > > > > * stateDir
>> > > > > > > > * metrics
>> > > > > > > > * schedule
>> > > > > > > > * commit
>> > > > > > > > * forward
>> > > > > > > >
>> > > > > > > > StateStoreContext:
>> > > > > > > > * applicationId
>> > > > > > > > * taskId
>> > > > > > > > * appConfigs
>> > > > > > > > * appConfigsWithPrefix
>> > > > > > > > * keySerde
>> > > > > > > > * valueSerde
>> > > > > > > > * stateDir
>> > > > > > > > * metrics
>> > > > > > > > * register
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > RecordContext
>> > > > > > > > * topic
>> > > > > > > > * partition
>> > > > > > > > * offset
>> > > > > > > > * timestamp
>> > > > > > > > * headers
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > Your proposal sounds good to me as-is. Just to cover the
>> > > > > > > > bases, though, I'm wondering if we should push the idea just
>> > > > > > > > a little farther. Instead of decomposing key,value,context,
>> > > > > > > > we could just keep them all in one object, like this:
>> > > > > > > >
>> > > > > > > > Record:
>> > > > > > > > * key
>> > > > > > > > * value
>> > > > > > > > * topic
>> > > > > > > > * partition
>> > > > > > > > * offset
>> > > > > > > > * timestamp
>> > > > > > > > * headers
>> > > > > > > >
>> > > > > > > > Then, we could have:
>> > > > > > > > Processor#process(Record)
>> > > > > > > > ProcessorContext#forward(Record, To)
>> > > > > > > >
>> > > > > > > > Viewed from this perspective, a record has three properties
>> > > > > > > > that people may specify in their processors: key, value, and
>> > > > > > > > timestamp.
>> > > > > > > >
>> > > > > > > > We could deprecate `To#withTimestamp` and enable people to
>> > > > > > > > specify the timestamp along with the key and value when they
>> > > > > > > > forward a record.
>> > > > > > > >
>> > > > > > > > E.g.,
>> > > > > > > > RecordBuilder toForward = RecordBuilder.copy(record)
>> > > > > > > > toForward.withKey(newKey)
>> > > > > > > > toForward.withValue(newValue)
>> > > > > > > > toForward.withTimestamp(newTimestamp)
>> > > > > > > > Record newRecord = toForward.build()
>> > > > > > > > context.forward(newRecord, To.child("child1"))
>> > > > > > > >
>> > > > > > > > Or, the more compact common case:
>> > > > > > > > current:
>> > > > > > > >  context.forward(key, "newValue")
>> > > > > > > > proposed:
>> > > > > > > >  context.forward(copy(record).withValue("newValue").build())
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > It's slightly more verbose, but also more extensible. This
>> > > > > > > > would give us a clean path to add header support in PAPI as
>> > > > > > > > well, simply by adding `withHeaders` in RecordBuilder.
>> > > > > > > >
>> > > > > > > > It's also more symmetrical, since the recipient of `forward`
>> > > > > > > > would just get the sent `Record`. Whereas today, the sender
>> > > > > > > > puts the timestamp in `To`, but the recipient gets in in its
>> > > > > > > > own `ProcessorContext`.
>> > > > > > > >
>> > > > > > > > WDYT?
>> > > > > > > > -John
>> > > > > > > >
>> > > > > > > > On Fri, 2020-09-11 at 12:30 -0700, Matthias J. Sax wrote:
>> > > > > > > > > I think separating the different contexts make sense.
>> > > > > > > > >
>> > > > > > > > > In fact, we could even go one step further and remove the
>> > > record
>> > > > > > context
>> > > > > > > > > from the processor context completely and we add a third
>> > > parameter to
>> > > > > > > > > `process(key, value, recordContext)`. This would make it
>> clear
>> > > that
>> > > > > > the
>> > > > > > > > > context is for the input record only and it's not
>> possible to
>> > > pass
>> > > > > > it to
>> > > > > > > > > a `punctuate` callback.
>> > > > > > > > >
>> > > > > > > > > For the stores and changelogging: I think there are two
>> cases.
>> > > (1)
>> > > > > > You
>> > > > > > > > > use a plain key-value store. For this case, it seems you
>> do
>> > > not care
>> > > > > > > > > about the timestamp and thus does not care what timestamp
>> is
>> > > set in
>> > > > > > the
>> > > > > > > > > changelog records. (We can set anything we want, as it's
>> not
>> > > > > > relevant at
>> > > > > > > > > all -- the timestamp is ignored on read anyway.) (2) The
>> other
>> > > case
>> > > > > > is,
>> > > > > > > > > that one does care about timestamps, and for this case
>> should
>> > > use
>> > > > > > > > > TimestampedKeyValueStore. The passed timestamp will be
>> set on
>> > > the
>> > > > > > > > > changelog records for this case.
>> > > > > > > > >
>> > > > > > > > > Thus, for both cases, accessing the record context does
>> not
>> > > seems to
>> > > > > > be
>> > > > > > > > > a requirement. And providing access to the processor
>> context
>> > > to, eg.,
>> > > > > > > > > `forward()` or similar seems safe.
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > -Matthias
>> > > > > > > > >
>> > > > > > > > > On 9/10/20 7:25 PM, John Roesler wrote:
>> > > > > > > > > > Thanks for the reply, Paul!
>> > > > > > > > > >
>> > > > > > > > > > I certainly intend to make sure that the changelogging
>> layer
>> > > > > > > > > > continues to work the way it does now, by hook or by
>> crook.
>> > > > > > > > > > I think the easiest path for me is to just "cheat" and
>> get
>> > > > > > > > > > the real ProcessorContext into the ChangeLoggingStore
>> > > > > > > > > > implementation somehow. I'll tag you on the PR when I
>> create
>> > > > > > > > > > it, so you have an opportunity to express a preference
>> about
>> > > > > > > > > > the implementation choice, and maybe even compile/test
>> > > > > > > > > > against it to make sure your stuff still works.
>> > > > > > > > > >
>> > > > > > > > > > Regarding this:
>> > > > > > > > > >
>> > > > > > > > > > > we have an interest in making a state store with a
>> richer
>> > > > > > > > > > > way of querying its data (like perhaps getting all
>> values
>> > > > > > > > > > > associated with a secondary key), while still
>> ultimately
>> > > > > > > > > > > writing to the changelog topic for later restoration.
>> > > > > > > > > >
>> > > > > > > > > > This is very intriguing to me. On the side, I've been
>> > > > > > > > > > preparing a couple of ideas related to this topic. I
>> don't
>> > > > > > > > > > think I have a coherent enough thought to even express
>> it in
>> > > > > > > > > > a Jira right now, but when I do, I'll tag you on it
>> also to
>> > > > > > > > > > see what you think.
>> > > > > > > > > >
>> > > > > > > > > > Whenever you're ready to share the usability improvement
>> > > > > > > > > > ideas, I'm very interested to see what you've come up
>> with.
>> > > > > > > > > >
>> > > > > > > > > > Thanks,
>> > > > > > > > > > -John
>> > > > > > > > > >
>> > > > > > > > > > On Thu, 2020-09-10 at 21:02 -0500, Paul Whalen wrote:
>> > > > > > > > > > > > when you use a HashMap or RocksDB or other "state
>> > > stores", you
>> > > > > > don't
>> > > > > > > > > > > > expect them to automatically know extra stuff about
>> the
>> > > record
>> > > > > > you're
>> > > > > > > > > > > > storing.
>> > > > > > > > > > >
>> > > > > > > > > > > So, I don't think there is any reason we *can't*
>> retain the
>> > > > > > record context
>> > > > > > > > > > > > in the StateStoreContext, and if any users came
>> along
>> > > with a
>> > > > > > clear use case
>> > > > > > > > > > > > I'd find that convincing.
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > I agree with the principle of being conservative with
>> the
>> > > > > > StateStoreContext
>> > > > > > > > > > > API.  Regarding user expectations or a clear use
>> case, the
>> > > only
>> > > > > > > > > > > counterpoint I would offer is that we sort of have
>> that
>> > > use case
>> > > > > > already,
>> > > > > > > > > > > which is the example I gave of the change logging
>> store
>> > > using the
>> > > > > > > > > > > timestamp.  I am curious if this functionality will be
>> > > retained
>> > > > > > when using
>> > > > > > > > > > > built in state stores, or will a low-level processor
>> get a
>> > > > > > KeyValueStore
>> > > > > > > > > > > that no longer writes to the changelog topic with the
>> > > record's
>> > > > > > timestamp.
>> > > > > > > > > > > While I personally don't care much about that
>> functionality
>> > > > > > specifically, I
>> > > > > > > > > > > have a general desire for custom state stores to
>> easily do
>> > > the
>> > > > > > things that
>> > > > > > > > > > > built in state stores do.
>> > > > > > > > > > >
>> > > > > > > > > > > It genuinely did not occur to me that users might be
>> > > looking up
>> > > > > > and/or
>> > > > > > > > > > > > updating records of other keys from within a
>> Processor.
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > I'm glad you said this Sophie, because it gives me an
>> > > > > > opportunity to say
>> > > > > > > > > > > that this is actually a *huge* use case for my team.
>> The
>> > > state
>> > > > > > store
>> > > > > > > > > > > usability improvements I was referring to in my
>> previous
>> > > message
>> > > > > > were about
>> > > > > > > > > > > enabling the user to write custom stores while still
>> easily
>> > > > > > hooking into
>> > > > > > > > > > > the ability to write to a changelog topic.  I think
>> that is
>> > > > > > technically
>> > > > > > > > > > > possible now, but I don't think it's trivial.
>> > > Specifically, we
>> > > > > > have an
>> > > > > > > > > > > interest in making a state store with a richer way of
>> > > querying
>> > > > > > its data
>> > > > > > > > > > > (like perhaps getting all values associated with a
>> > > secondary
>> > > > > > key), while
>> > > > > > > > > > > still ultimately writing to the changelog topic for
>> later
>> > > > > > restoration.
>> > > > > > > > > > > We recognize that this use case throws away some of
>> what
>> > > kafka
>> > > > > > streams
>> > > > > > > > > > > (especially the DSL) is good at - easy
>> parallelizability by
>> > > > > > partitioning
>> > > > > > > > > > > all processing by key - and that our business logic
>> would
>> > > > > > completely fall
>> > > > > > > > > > > apart if we were consuming from multi-partition
>> topics with
>> > > > > > multiple
>> > > > > > > > > > > consumers.  But we have found that using the low level
>> > > processor
>> > > > > > API is
>> > > > > > > > > > > good for the very simple stream processing primitives
>> it
>> > > > > > provides: handling
>> > > > > > > > > > > the plumbing of consuming from multiple kafka topics
>> and
>> > > > > > potentially
>> > > > > > > > > > > updating persistent local state in a reliable way.
>> That in
>> > > > > > itself has
>> > > > > > > > > > > proven to be a worthwhile programming model.
>> > > > > > > > > > >
>> > > > > > > > > > > Since I got off track a bit, let me summarize: I don't
>> > > > > > particularly care
>> > > > > > > > > > > about the record context being available to state
>> store
>> > > > > > implementations,
>> > > > > > > > > > > and I think this KIP is headed in the right direction
>> in
>> > > that
>> > > > > > regard.  But
>> > > > > > > > > > > more generally, I wanted to express the importance of
>> > > > > > maintaining a
>> > > > > > > > > > > powerful and flexible StateStore interface.
>> > > > > > > > > > >
>> > > > > > > > > > > Thanks!
>> > > > > > > > > > > Paul
>> > > > > > > > > > >
>> > > > > > > > > > > On Thu, Sep 10, 2020 at 6:11 PM Sophie Blee-Goldman <
>> > > > > > sop...@confluent.io>
>> > > > > > > > > > > wrote:
>> > > > > > > > > > >
>> > > > > > > > > > > > Aha, I did misinterpret the example in your previous
>> > > response
>> > > > > > regarding the
>> > > > > > > > > > > > range query after all. I thought you just meant a
>> > > time-range
>> > > > > > query inside a
>> > > > > > > > > > > > punctuator. It genuinely did not occur to me that
>> users
>> > > might
>> > > > > > be looking up
>> > > > > > > > > > > > and/or updating records of other keys from within a
>> > > Processor.
>> > > > > > Sorry for
>> > > > > > > > > > > > being closed minded
>> > > > > > > > > > > >
>> > > > > > > > > > > > I won't drag out this discussion any further by
>> asking
>> > > whether
>> > > > > > that might
>> > > > > > > > > > > > be
>> > > > > > > > > > > > a valid use case or just a lurking bug in itself :)
>> > > > > > > > > > > >
>> > > > > > > > > > > > Thanks for humoring me. The current proposal for
>> KIP-478
>> > > > > > sounds good to me
>> > > > > > > > > > > > On Thu, Sep 10, 2020 at 3:43 PM John Roesler <
>> > > > > > vvcep...@apache.org> wrote:
>> > > > > > > > > > > > > Ah, thanks Sophie,
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > I'm sorry for misinterpreting your resonse. Yes,
>> we
>> > > > > > > > > > > > > absolutely can and should clear the context before
>> > > > > > > > > > > > > punctuating.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > My secondary concern is maybe more far-fetched. I
>> was
>> > > > > > > > > > > > > thinking that inside process(key,value), a
>> Processor
>> > > might
>> > > > > > > > > > > > > do a get/put of a _different_ key. Consider, for
>> > > example,
>> > > > > > > > > > > > > the way that Suppress processors work. When they
>> get a
>> > > > > > > > > > > > > record, they add it to the store and then do a
>> range
>> > > scan
>> > > > > > > > > > > > > and possibly forward a _different_ record. Of
>> course,
>> > > this
>> > > > > > > > > > > > > is an operation that is deeply coupled to the
>> > > internals, and
>> > > > > > > > > > > > > the Suppress processor accordingly actually does
>> get
>> > > access
>> > > > > > > > > > > > > to the internal context so that it can set the
>> context
>> > > > > > > > > > > > > before forwarding.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Still, it seems like I've had a handful of
>> > > conversations
>> > > > > > > > > > > > > with people over the years in which they tell me
>> they
>> > > are
>> > > > > > > > > > > > > using state stores in a way that transcends the
>> "get
>> > > and put
>> > > > > > > > > > > > > the currently processing record" access pattern. I
>> > > doubt
>> > > > > > > > > > > > > that those folks would even have considered the
>> > > possiblity
>> > > > > > > > > > > > > that the currently processing record's _context_
>> could
>> > > > > > > > > > > > > pollute their state store operations, as I myself
>> > > never gave
>> > > > > > > > > > > > > it a second thought until the current conversation
>> > > began. In
>> > > > > > > > > > > > > cases like that, we have actually set a trap for
>> these
>> > > > > > > > > > > > > people, and it seems better to dismantle the trap.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > As you noted, really the only people who would be
>> > > negatively
>> > > > > > > > > > > > > impacted are people who implement their own state
>> > > stores.
>> > > > > > > > > > > > > These folks will get the deprecation warning and
>> try to
>> > > > > > > > > > > > > adapt their stores to the new interface. If they
>> needed
>> > > > > > > > > > > > > access to the record context, they would find
>> it's now
>> > > > > > > > > > > > > missing. They'd ask us about it, and we'd have the
>> > > ability
>> > > > > > > > > > > > > to explain the lurking bug that they have had in
>> their
>> > > > > > > > > > > > > stores all along, as well as the new recommended
>> > > pattern
>> > > > > > > > > > > > > (just pass everything you need in the value). If
>> that's
>> > > > > > > > > > > > > unsatisfying, _then_ we should consider amending
>> the
>> > > API.
>> > > > > > > > > > > > > Thanks,
>> > > > > > > > > > > > > -John
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > On Thu, 2020-09-10 at 15:21 -0700, Sophie
>> Blee-Goldman
>> > > > > > > > > > > > > wrote:
>> > > > > > > > > > > > > > > Regarding your first sentence, "...the
>> processor
>> > > would
>> > > > > > null
>> > > > > > > > > > > > > > > out the record context...", this is not
>> possible,
>> > > since
>> > > > > > the
>> > > > > > > > > > > > > > > processor doesn't have write access to the
>> > > context. We
>> > > > > > could
>> > > > > > > > > > > > > > > add it,
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > Sorry, this was poorly phrased, I definitely
>> did not
>> > > mean
>> > > > > > to imply that
>> > > > > > > > > > > > > we
>> > > > > > > > > > > > > > should make the context modifiable by the
>> Processors
>> > > > > > themselves. I
>> > > > > > > > > > > > meant
>> > > > > > > > > > > > > > this should be handled by the internal
>> processing
>> > > > > > framework that deals
>> > > > > > > > > > > > > with
>> > > > > > > > > > > > > > passing records from one Processor to the next,
>> > > setting
>> > > > > > the record
>> > > > > > > > > > > > > context
>> > > > > > > > > > > > > > when a new record is picked up, invoking the
>> > > punctuators,
>> > > > > > etc. I
>> > > > > > > > > > > > believe
>> > > > > > > > > > > > > > this
>> > > > > > > > > > > > > > all currently happens in the StreamTask? It
>> already
>> > > can
>> > > > > > and does
>> > > > > > > > > > > > > overwrite
>> > > > > > > > > > > > > > the record context as new records are
>> processed, and
>> > > is
>> > > > > > also
>> > > > > > > > > > > > responsible
>> > > > > > > > > > > > > > for calling the punctuators, so it doesn't seem
>> like
>> > > a
>> > > > > > huge leap to
>> > > > > > > > > > > > just
>> > > > > > > > > > > > > say
>> > > > > > > > > > > > > > "null out the current record before punctuating"
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > To clarify, I was never advocating or even
>> > > considering to
>> > > > > > give the
>> > > > > > > > > > > > > > Processors
>> > > > > > > > > > > > > > write access to the record context. Sorry if my
>> last
>> > > > > > message (or all of
>> > > > > > > > > > > > > > them)
>> > > > > > > > > > > > > > was misleading. I just wanted to point out that
>> the
>> > > > > > punctuator concern
>> > > > > > > > > > > > is
>> > > > > > > > > > > > > > orthogonal to the question of whether we should
>> > > include
>> > > > > > the record
>> > > > > > > > > > > > > context
>> > > > > > > > > > > > > > in the StateStoreContext. It's definitely a real
>> > > problem,
>> > > > > > but it's a
>> > > > > > > > > > > > > > problem
>> > > > > > > > > > > > > > that exists at the Processor level and not just
>> the
>> > > > > > StateStore.
>> > > > > > > > > > > > > > So, I don't think there is any reason we *can't*
>> > > retain
>> > > > > > the record
>> > > > > > > > > > > > > context
>> > > > > > > > > > > > > > in the
>> > > > > > > > > > > > > > StateStoreContext, and if any users came along
>> with a
>> > > > > > clear use case
>> > > > > > > > > > > > I'd
>> > > > > > > > > > > > > > find
>> > > > > > > > > > > > > > that convincing. In the absence of any
>> examples, the
>> > > > > > conservative
>> > > > > > > > > > > > > approach
>> > > > > > > > > > > > > > sounds good to me.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > If it turns out that someone did need the record
>> > > context
>> > > > > > in their
>> > > > > > > > > > > > custom
>> > > > > > > > > > > > > > state
>> > > > > > > > > > > > > > store, I'm sure they'll submit a politely
>> worded bug
>> > > > > > report alerting us
>> > > > > > > > > > > > > > that we
>> > > > > > > > > > > > > > broke their application.
>> > > > > > > > > > > > > >
>> > > > > > > > > > > > > > On Thu, Sep 10, 2020 at 3:05 PM John Roesler <
>> > > > > > vvcep...@apache.org>
>> > > > > > > > > > > > > wrote:
>> > > > > > > > > > > > > > > Thanks, Sophie,
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > Yes, now that you point it out, I can see
>> that the
>> > > record
>> > > > > > > > > > > > > > > context itself should be nulled out by Streams
>> > > before
>> > > > > > > > > > > > > > > invoking punctuators. From that perspective,
>> we
>> > > don't
>> > > > > > need
>> > > > > > > > > > > > > > > to think about the second-order problem of
>> what's
>> > > in the
>> > > > > > > > > > > > > > > context for the state store when called from a
>> > > > > > punctuator.
>> > > > > > > > > > > > > > > Regarding your first sentence, "...the
>> processor
>> > > would
>> > > > > > null
>> > > > > > > > > > > > > > > out the record context...", this is not
>> possible,
>> > > since
>> > > > > > the
>> > > > > > > > > > > > > > > processor doesn't have write access to the
>> > > context. We
>> > > > > > could
>> > > > > > > > > > > > > > > add it, but then all kinds of strange effects
>> > > would ensue
>> > > > > > > > > > > > > > > when downstream processors execute but the
>> context
>> > > is
>> > > > > > empty,
>> > > > > > > > > > > > > > > etc. Better to just let the framework manage
>> the
>> > > record
>> > > > > > > > > > > > > > > context and keep it read-only for Processors.
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > Reading between the lines of your last reply,
>> it
>> > > sounds
>> > > > > > that
>> > > > > > > > > > > > > > > the disconnect may just have been a mutual
>> > > > > > misunderstanding
>> > > > > > > > > > > > > > > about whether or not Processors currently have
>> > > access to
>> > > > > > set
>> > > > > > > > > > > > > > > the record context. Since they do not, if we
>> > > wanted to
>> > > > > > add
>> > > > > > > > > > > > > > > the record context to StateStoreContext in a
>> > > well-defined
>> > > > > > > > > > > > > > > way, we'd also have to add the ability for
>> > > Processors to
>> > > > > > > > > > > > > > > manipulate it. But then, we're just creating a
>> > > > > > side-channel
>> > > > > > > > > > > > > > > for Processors to pass some information in
>> > > arguments to
>> > > > > > > > > > > > > > > "put()" and other information implicitly
>> through
>> > > the
>> > > > > > > > > > > > > > > context. It seems better just to go for a
>> single
>> > > channel
>> > > > > > for
>> > > > > > > > > > > > > > > now.
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > It sounds like you're basically in favor of
>> the
>> > > > > > conservative
>> > > > > > > > > > > > > > > approach, and you just wanted to understand
>> the
>> > > blockers
>> > > > > > > > > > > > > > > that I implied. Does my clarification make
>> sense?
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > Thanks,
>> > > > > > > > > > > > > > > -John
>> > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > On Thu, 2020-09-10 at 10:54 -0700, Sophie
>> > > Blee-Goldman
>> > > > > > > > > > > > > > > wrote:
>> > > > > > > > > > > > > > > > I was just thinking that the processor would
>> > > null out
>> > > > > > the record
>> > > > > > > > > > > > > context
>> > > > > > > > > > > > > > > > after it
>> > > > > > > > > > > > > > > > finished processing the record, so I'm not
>> sure I
>> > > > > > follow why this
>> > > > > > > > > > > > > would
>> > > > > > > > > > > > > > > not
>> > > > > > > > > > > > > > > > be
>> > > > > > > > > > > > > > > > possible? AFAIK we never call a punctuator
>> in the
>> > > > > > middle of
>> > > > > > > > > > > > > processing a
>> > > > > > > > > > > > > > > > record through the topology, and even if we
>> did,
>> > > we
>> > > > > > still know when
>> > > > > > > > > > > > > it is
>> > > > > > > > > > > > > > > > about
>> > > > > > > > > > > > > > > > to be called and could set it to null
>> beforehand.
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > I'm not trying to advocate for it here, I'm
>> in
>> > > > > > agreement that
>> > > > > > > > > > > > > anything
>> > > > > > > > > > > > > > > you
>> > > > > > > > > > > > > > > > want
>> > > > > > > > > > > > > > > > to access within the store can and should be
>> > > accessed
>> > > > > > within the
>> > > > > > > > > > > > > calling
>> > > > > > > > > > > > > > > > Processor/Punctuator before reaching the
>> store.
>> > > The
>> > > > > > "we can always
>> > > > > > > > > > > > > add it
>> > > > > > > > > > > > > > > > later if necessary" argument is also pretty
>> > > > > > convincing. Just trying
>> > > > > > > > > > > > > to
>> > > > > > > > > > > > > > > > understand
>> > > > > > > > > > > > > > > > why this wouldn't be possible.
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > FWIW, the question of "what is the current
>> > > record in
>> > > > > > the context
>> > > > > > > > > > > > of a
>> > > > > > > > > > > > > > > > Punctuator"
>> > > > > > > > > > > > > > > > exists independently of whether we want to
>> add
>> > > this to
>> > > > > > the
>> > > > > > > > > > > > > > > StateStoreContext
>> > > > > > > > > > > > > > > > or not. The full ProcessorContext,
>> including the
>> > > > > > current record
>> > > > > > > > > > > > > context,
>> > > > > > > > > > > > > > > is
>> > > > > > > > > > > > > > > > already available within a Punctuator, so
>> > > removing the
>> > > > > > current
>> > > > > > > > > > > > record
>> > > > > > > > > > > > > > > > context
>> > > > > > > > > > > > > > > > from the StateStoreContext does not solve
>> the
>> > > problem.
>> > > > > > Users can --
>> > > > > > > > > > > > > and
>> > > > > > > > > > > > > > > have
>> > > > > > > > > > > > > > > > (see KAFKA-9584 <
>> > > > > > https://issues.apache.org/jira/browse/KAFKA-9584
>> > > > > > > > > > > > > ;;)
>> > > > > > > > > > > > > --
>> > > > > > > > > > > > > > > hit
>> > > > > > > > > > > > > > > > such subtle bugs without ever invoking a
>> > > StateStore
>> > > > > > > > > > > > > > > > from their punctuator.
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > Again, I think I do agree that we should
>> leave
>> > > the
>> > > > > > current record
>> > > > > > > > > > > > > context
>> > > > > > > > > > > > > > > > off of
>> > > > > > > > > > > > > > > > the StateStoreContext, but I don't think the
>> > > > > > Punctuator argument
>> > > > > > > > > > > > > against
>> > > > > > > > > > > > > > > it
>> > > > > > > > > > > > > > > > is
>> > > > > > > > > > > > > > > > very convincing. It sounds to me like we
>> need to
>> > > > > > disallow access to
>> > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > current
>> > > > > > > > > > > > > > > > record context from within the Punctuator,
>> > > independent
>> > > > > > of anything
>> > > > > > > > > > > > > to do
>> > > > > > > > > > > > > > > > with
>> > > > > > > > > > > > > > > > state stores
>> > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > On Thu, Sep 10, 2020 at 7:12 AM John
>> Roesler <
>> > > > > > vvcep...@apache.org>
>> > > > > > > > > > > > > > > wrote:
>> > > > > > > > > > > > > > > > > Thanks for the thoughts, Sophie.
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > I agree that the extra information could
>> be
>> > > useful.
>> > > > > > My only
>> > > > > > > > > > > > > concern is
>> > > > > > > > > > > > > > > > > that it doesn’t seem like we can actually
>> > > supply
>> > > > > > that extra
>> > > > > > > > > > > > > information
>> > > > > > > > > > > > > > > > > correctly. So, then we have a situation
>> where
>> > > the
>> > > > > > system offers
>> > > > > > > > > > > > > useful
>> > > > > > > > > > > > > > > API
>> > > > > > > > > > > > > > > > > calls that are only correct in a narrow
>> range
>> > > of use
>> > > > > > cases.
>> > > > > > > > > > > > > Outside of
>> > > > > > > > > > > > > > > > > those use cases, you get incorrect
>> behavior.
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > If it were possible to null out the
>> context
>> > > before
>> > > > > > you put a
>> > > > > > > > > > > > > document
>> > > > > > > > > > > > > > > to
>> > > > > > > > > > > > > > > > > which the context doesn’t apply, then the
>> > > concern
>> > > > > > would be
>> > > > > > > > > > > > > mitigated.
>> > > > > > > > > > > > > > > But
>> > > > > > > > > > > > > > > > > it would still be pretty weird from the
>> > > perspective
>> > > > > > of the store
>> > > > > > > > > > > > > that
>> > > > > > > > > > > > > > > > > sometimes the context is populated and
>> other
>> > > times,
>> > > > > > it’s null.
>> > > > > > > > > > > > > > > > > But that seems moot, since it doesn’t seem
>> > > possible
>> > > > > > to null out
>> > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > context. Only the Processor could know
>> whether
>> > > it’s
>> > > > > > about to put
>> > > > > > > > > > > > a
>> > > > > > > > > > > > > > > document
>> > > > > > > > > > > > > > > > > different from the context or not. And it
>> > > would be
>> > > > > > inappropriate
>> > > > > > > > > > > > to
>> > > > > > > > > > > > > > > offer a
>> > > > > > > > > > > > > > > > > public ProcessorContext api to manage the
>> > > record
>> > > > > > context.
>> > > > > > > > > > > > > > > > > Ultimately, it still seems like if you
>> want to
>> > > store
>> > > > > > headers, you
>> > > > > > > > > > > > > can
>> > > > > > > > > > > > > > > > > store them explicitly, right? That
>> doesn’t seem
>> > > > > > onerous to me,
>> > > > > > > > > > > > and
>> > > > > > > > > > > > > it
>> > > > > > > > > > > > > > > kind
>> > > > > > > > > > > > > > > > > of seems better than relying on undefined
>> or
>> > > > > > asymmetrical
>> > > > > > > > > > > > behavior
>> > > > > > > > > > > > > in
>> > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > store itself.
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > Anyway, I’m not saying that we couldn’t
>> solve
>> > > these
>> > > > > > problems.
>> > > > > > > > > > > > Just
>> > > > > > > > > > > > > > > that it
>> > > > > > > > > > > > > > > > > seems a little that we can be
>> conservative and
>> > > avoid
>> > > > > > them for
>> > > > > > > > > > > > now.
>> > > > > > > > > > > > > If
>> > > > > > > > > > > > > > > it
>> > > > > > > > > > > > > > > > > turns out we really need to solve them,
>> we can
>> > > > > > always do it
>> > > > > > > > > > > > later.
>> > > > > > > > > > > > > > > > > Thanks,
>> > > > > > > > > > > > > > > > > John
>> > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > On Wed, Sep 9, 2020, at 22:46, Sophie
>> > > Blee-Goldman
>> > > > > > wrote:
>> > > > > > > > > > > > > > > > > > > If you were to call "put" from a
>> > > punctuator, or
>> > > > > > do a
>> > > > > > > > > > > > > > > > > > > `range()` query and then update one of
>> > > those
>> > > > > > records with
>> > > > > > > > > > > > > > > > > > > `put()`, you'd have a very subtle bug
>> on
>> > > your
>> > > > > > hands.
>> > > > > > > > > > > > > > > > > > Can you elaborate on this a bit? I agree
>> > > that the
>> > > > > > punctuator
>> > > > > > > > > > > > > case is
>> > > > > > > > > > > > > > > an
>> > > > > > > > > > > > > > > > > > obvious exemption to the assumption that
>> > > store
>> > > > > > invocations
>> > > > > > > > > > > > always
>> > > > > > > > > > > > > > > > > > have a corresponding "current record",
>> but I
>> > > don't
>> > > > > > understand
>> > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > second example. Are you envisioning a
>> > > scenario
>> > > > > > where the
>> > > > > > > > > > > > #process
>> > > > > > > > > > > > > > > > > > method performs a range query and then
>> > > updates
>> > > > > > records? Or were
>> > > > > > > > > > > > > > > > > > you just giving another example of the
>> > > punctuator
>> > > > > > case?
>> > > > > > > > > > > > > > > > > > I only bring it up because I agree that
>> the
>> > > > > > current record
>> > > > > > > > > > > > > > > information
>> > > > > > > > > > > > > > > > > could
>> > > > > > > > > > > > > > > > > > still be useful within the context of
>> the
>> > > store.
>> > > > > > As a non-user
>> > > > > > > > > > > > my
>> > > > > > > > > > > > > > > input
>> > > > > > > > > > > > > > > > > on
>> > > > > > > > > > > > > > > > > > this
>> > > > > > > > > > > > > > > > > > definitely has limited value, but it
>> just
>> > > isn't
>> > > > > > striking me as
>> > > > > > > > > > > > > > > obvious
>> > > > > > > > > > > > > > > > > that
>> > > > > > > > > > > > > > > > > > we
>> > > > > > > > > > > > > > > > > > should remove access to the current
>> record
>> > > context
>> > > > > > from the
>> > > > > > > > > > > > state
>> > > > > > > > > > > > > > > stores.
>> > > > > > > > > > > > > > > > > > If there is no current record, as in the
>> > > > > > punctuator case, we
>> > > > > > > > > > > > > should
>> > > > > > > > > > > > > > > just
>> > > > > > > > > > > > > > > > > > set
>> > > > > > > > > > > > > > > > > > the record context to null (or
>> > > Optional.empty,
>> > > > > > etc).
>> > > > > > > > > > > > > > > > > > That said, the put() always has to come
>> from
>> > > > > > somewhere, and
>> > > > > > > > > > > > that
>> > > > > > > > > > > > > > > > > > somewhere is always going to be either a
>> > > Processor
>> > > > > > or a
>> > > > > > > > > > > > > Punctuator,
>> > > > > > > > > > > > > > > both
>> > > > > > > > > > > > > > > > > > of which will still have access to the
>> full
>> > > > > > context. So
>> > > > > > > > > > > > > additional
>> > > > > > > > > > > > > > > info
>> > > > > > > > > > > > > > > > > > such as
>> > > > > > > > > > > > > > > > > > the timestamp can and should probably be
>> > > supplied
>> > > > > > to the store
>> > > > > > > > > > > > > before
>> > > > > > > > > > > > > > > > > > calling put(), rather than looked up by
>> the
>> > > store.
>> > > > > > But I can
>> > > > > > > > > > > > see
>> > > > > > > > > > > > > some
>> > > > > > > > > > > > > > > > > other
>> > > > > > > > > > > > > > > > > > things being useful, for example the
>> current
>> > > > > > record's headers.
>> > > > > > > > > > > > > Maybe
>> > > > > > > > > > > > > > > > > if/when
>> > > > > > > > > > > > > > > > > > we add better (or any) support for
>> headers in
>> > > > > > state stores this
>> > > > > > > > > > > > > will
>> > > > > > > > > > > > > > > be
>> > > > > > > > > > > > > > > > > > less true.
>> > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > Of course as John has made clear, it's
>> > > pretty hard
>> > > > > > to judge
>> > > > > > > > > > > > > without
>> > > > > > > > > > > > > > > > > > examples
>> > > > > > > > > > > > > > > > > > and more insight as to what actually
>> goes on
>> > > > > > within a custom
>> > > > > > > > > > > > > state
>> > > > > > > > > > > > > > > store
>> > > > > > > > > > > > > > > > > > On Wed, Sep 9, 2020 at 8:07 PM John
>> Roesler <
>> > > > > > > > > > > > vvcep...@apache.org
>> > > > > > > > > > > > > > > wrote:
>> > > > > > > > > > > > > > > > > > > Hi Paul,
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > It's good to hear from you!
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > I'm glad you're in favor of the
>> direction.
>> > > > > > Especially when
>> > > > > > > > > > > > > > > > > > > it comes to public API and usability
>> > > concens, I
>> > > > > > tend to
>> > > > > > > > > > > > > > > > > > > think that "the folks who matter" are
>> > > actually
>> > > > > > the folks who
>> > > > > > > > > > > > > > > > > > > have to use the APIs to accomplish
>> real
>> > > tasks.
>> > > > > > It can be
>> > > > > > > > > > > > > > > > > > > hard for me to be sure I'm thinking
>> > > clearly from
>> > > > > > that
>> > > > > > > > > > > > > > > > > > > perspective.
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > Funny story, I also started down this
>> road
>> > > a
>> > > > > > couple of times
>> > > > > > > > > > > > > > > > > > > already and backed them out before
>> the KIP
>> > > > > > because I was
>> > > > > > > > > > > > > > > > > > > afraid of the scope of the proposal.
>> > > > > > Unfortunately, needing
>> > > > > > > > > > > > > > > > > > > to make a new ProcessorContext kind of
>> > > forced my
>> > > > > > hand.
>> > > > > > > > > > > > > > > > > > > I see you've called me out about the
>> > > > > > ChangeLogging stores :)
>> > > > > > > > > > > > > > > > > > > In fact, I think these are the
>> main/only
>> > > reason
>> > > > > > that stores
>> > > > > > > > > > > > > > > > > > > might really need to invoke
>> "forward()". My
>> > > > > > secret plan was
>> > > > > > > > > > > > > > > > > > > to cheat and either accomplish
>> > > change-logging by
>> > > > > > a different
>> > > > > > > > > > > > > > > > > > > mechanism than implementing the store
>> > > interface,
>> > > > > > or by just
>> > > > > > > > > > > > > > > > > > > breaking encapsulation to sneak the
>> "real"
>> > > > > > ProcessorContext
>> > > > > > > > > > > > > > > > > > > into the ChangeLogging stores. But
>> those
>> > > are all
>> > > > > > > > > > > > > > > > > > > implementation details. I think the
>> key
>> > > question
>> > > > > > is whether
>> > > > > > > > > > > > > > > > > > > anyone else has a store
>> implementation that
>> > > > > > needs to call
>> > > > > > > > > > > > > > > > > > > "forward()". It's not what you
>> mentioned,
>> > > but
>> > > > > > since you
>> > > > > > > > > > > > > > > > > > > spoke up, I'll just ask: if you have
>> a use
>> > > case
>> > > > > > for calling
>> > > > > > > > > > > > > > > > > > > "forward()" in a store, please share
>> it.
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > Regarding the other record-specific
>> context
>> > > > > > methods, I think
>> > > > > > > > > > > > > > > > > > > you have a good point, but I also
>> can't
>> > > quite
>> > > > > > wrap my head
>> > > > > > > > > > > > > > > > > > > around how we can actually guarantee
>> it to
>> > > work
>> > > > > > in general.
>> > > > > > > > > > > > > > > > > > > For example, the case you cited,
>> where the
>> > > > > > implementation of
>> > > > > > > > > > > > > > > > > > > `KeyValueStore#put(key, value)` uses
>> the
>> > > context
>> > > > > > to augment
>> > > > > > > > > > > > > > > > > > > the record with timestamp
>> information. This
>> > > > > > relies on the
>> > > > > > > > > > > > > > > > > > > assumption that you would only call
>> > > "put()" from
>> > > > > > inside a
>> > > > > > > > > > > > > > > > > > > `Processor#process(key, value)` call
>> in
>> > > which
>> > > > > > the record
>> > > > > > > > > > > > > > > > > > > being processed is the same record
>> that
>> > > you're
>> > > > > > trying to put
>> > > > > > > > > > > > > > > > > > > into the store.
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > If you were to call "put" from a
>> > > punctuator, or
>> > > > > > do a
>> > > > > > > > > > > > > > > > > > > `range()` query and then update one of
>> > > those
>> > > > > > records with
>> > > > > > > > > > > > > > > > > > > `put()`, you'd have a very subtle bug
>> on
>> > > your
>> > > > > > hands. Right
>> > > > > > > > > > > > > > > > > > > now, the Streams component that
>> actually
>> > > calls
>> > > > > > the Processor
>> > > > > > > > > > > > > > > > > > > takes care to set the right record
>> context
>> > > > > > before invoking
>> > > > > > > > > > > > > > > > > > > the method, and in the case of
>> caching,
>> > > etc., it
>> > > > > > also takes
>> > > > > > > > > > > > > > > > > > > care to swap out the old context and
>> keep
>> > > it
>> > > > > > somewhere safe.
>> > > > > > > > > > > > > > > > > > > But when it comes to public API
>> Processors
>> > > > > > calling methods
>> > > > > > > > > > > > > > > > > > > on StateStores, there's no
>> opportunity for
>> > > any
>> > > > > > component to
>> > > > > > > > > > > > > > > > > > > make sure the context is always
>> correct.
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > In the face of that situation, it
>> seemed
>> > > better
>> > > > > > to just move
>> > > > > > > > > > > > > > > > > > > in the direction of a "normal" data
>> store.
>> > > I.e.,
>> > > > > > when you
>> > > > > > > > > > > > > > > > > > > use a HashMap or RocksDB or other
>> "state
>> > > > > > stores", you don't
>> > > > > > > > > > > > > > > > > > > expect them to automatically know
>> extra
>> > > stuff
>> > > > > > about the
>> > > > > > > > > > > > > > > > > > > record you're storing. If you need
>> them to
>> > > know
>> > > > > > something,
>> > > > > > > > > > > > > > > > > > > you just put it in the value.
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > All of that said, I'm just reasoning
>> from
>> > > first
>> > > > > > principles
>> > > > > > > > > > > > > > > > > > > here. To really know if this is a
>> mistake
>> > > or
>> > > > > > not, I need to
>> > > > > > > > > > > > > > > > > > > be in your place. So please push back
>> if
>> > > you
>> > > > > > think what I
>> > > > > > > > > > > > > > > > > > > said is nonsense. My personal plan
>> was to
>> > > keep
>> > > > > > an eye out
>> > > > > > > > > > > > > > > > > > > during the period where the old API
>> was
>> > > still
>> > > > > > present, but
>> > > > > > > > > > > > > > > > > > > deprecated, to see if people were
>> > > struggling to
>> > > > > > use the new
>> > > > > > > > > > > > > > > > > > > API. If so, then we'd have a chance to
>> > > address
>> > > > > > it before
>> > > > > > > > > > > > > > > > > > > dropping the old API. But it's even
>> better
>> > > if
>> > > > > > you can help
>> > > > > > > > > > > > > > > > > > > think it through now.
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > It did also cross my mind to _not_
>> add the
>> > > > > > > > > > > > > > > > > > > StateStoreContext, but just to
>> continue to
>> > > punt
>> > > > > > on the
>> > > > > > > > > > > > > > > > > > > question by just dropping in the new
>> > > > > > ProcessorContext to the
>> > > > > > > > > > > > > > > > > > > new init method. If StateStoreContext
>> > > seems too
>> > > > > > bold, we can
>> > > > > > > > > > > > > > > > > > > go that direction. But if we actually
>> add
>> > > some
>> > > > > > methods to
>> > > > > > > > > > > > > > > > > > > StateStoreContext, I'd like to be
>> able to
>> > > ensure
>> > > > > > they would
>> > > > > > > > > > > > > > > > > > > be well defined. I think the current
>> > > situation
>> > > > > > was more of
>> > > > > > > > > > > > > > > > > > > an oversight than a choice.
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > Thanks again for your reply,
>> > > > > > > > > > > > > > > > > > > -John
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > On Wed, 2020-09-09 at 21:23 -0500,
>> Paul
>> > > Whalen
>> > > > > > wrote:
>> > > > > > > > > > > > > > > > > > > > John,
>> > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > It's exciting to see this KIP head
>> in
>> > > this
>> > > > > > direction!  In
>> > > > > > > > > > > > the
>> > > > > > > > > > > > > > > last
>> > > > > > > > > > > > > > > > > year
>> > > > > > > > > > > > > > > > > > > or
>> > > > > > > > > > > > > > > > > > > > so I've tried to sketch out some
>> > > usability
>> > > > > > improvements for
>> > > > > > > > > > > > > > > custom
>> > > > > > > > > > > > > > > > > state
>> > > > > > > > > > > > > > > > > > > > stores, and I also ended up
>> splitting
>> > > out the
>> > > > > > > > > > > > > StateStoreContext
>> > > > > > > > > > > > > > > from
>> > > > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > > > ProcessorContext in an attempt to
>> > > facilitate
>> > > > > > what I was
>> > > > > > > > > > > > > doing.  I
>> > > > > > > > > > > > > > > > > sort of
>> > > > > > > > > > > > > > > > > > > > abandoned it when I realized how
>> large
>> > > the
>> > > > > > ideal change
>> > > > > > > > > > > > might
>> > > > > > > > > > > > > > > have
>> > > > > > > > > > > > > > > > > to be,
>> > > > > > > > > > > > > > > > > > > > but it's great to see that there is
>> other
>> > > > > > interest in
>> > > > > > > > > > > > moving
>> > > > > > > > > > > > > in
>> > > > > > > > > > > > > > > this
>> > > > > > > > > > > > > > > > > > > > direction (from the folks that
>> matter :)
>> > > ).
>> > > > > > > > > > > > > > > > > > > > Having taken a stab at it myself, I
>> have
>> > > a
>> > > > > > comment/question
>> > > > > > > > > > > > > on
>> > > > > > > > > > > > > > > this
>> > > > > > > > > > > > > > > > > > > bullet
>> > > > > > > > > > > > > > > > > > > > about StateStoreContext:
>> > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > It does *not*  include anything
>> > > processor- or
>> > > > > > record-
>> > > > > > > > > > > > > specific,
>> > > > > > > > > > > > > > > like
>> > > > > > > > > > > > > > > > > > > > > `forward()` or any information
>> about
>> > > the
>> > > > > > "current"
>> > > > > > > > > > > > record,
>> > > > > > > > > > > > > > > which is
>> > > > > > > > > > > > > > > > > > > only a
>> > > > > > > > > > > > > > > > > > > > > well-defined in the context of the
>> > > > > > Processor. Processors
>> > > > > > > > > > > > > > > process
>> > > > > > > > > > > > > > > > > one
>> > > > > > > > > > > > > > > > > > > record
>> > > > > > > > > > > > > > > > > > > > > at a time, but state stores may be
>> > > used to
>> > > > > > store and
>> > > > > > > > > > > > fetch
>> > > > > > > > > > > > > many
>> > > > > > > > > > > > > > > > > > > records, so
>> > > > > > > > > > > > > > > > > > > > > there is no "current record".
>> > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > I totally agree that
>> record-specific or
>> > > > > > processor-specific
>> > > > > > > > > > > > > > > context
>> > > > > > > > > > > > > > > > > in a
>> > > > > > > > > > > > > > > > > > > > state store is often not
>> well-defined
>> > > and it
>> > > > > > would be good
>> > > > > > > > > > > > to
>> > > > > > > > > > > > > > > > > separate
>> > > > > > > > > > > > > > > > > > > that
>> > > > > > > > > > > > > > > > > > > > out, but sometimes it (at least
>> > > > > > record-specific context) is
>> > > > > > > > > > > > > > > actually
>> > > > > > > > > > > > > > > > > > > > useful, for example, passing the
>> record's
>> > > > > > timestamp through
>> > > > > > > > > > > > > to
>> > > > > > > > > > > > > > > the
>> > > > > > > > > > > > > > > > > > > > underlying storage (or changelog
>> topic):
>> > > > > > > > > > > > > > > > > > > >
>> > >
>> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java#L121
>> > > > > > > > > > > > > > > > > > > > You could have the writer client of
>> the
>> > > state
>> > > > > > store pass
>> > > > > > > > > > > > this
>> > > > > > > > > > > > > > > > > through,
>> > > > > > > > > > > > > > > > > > > but
>> > > > > > > > > > > > > > > > > > > > it would be nice to be able to write
>> > > state
>> > > > > > stores where the
>> > > > > > > > > > > > > > > client
>> > > > > > > > > > > > > > > > > did
>> > > > > > > > > > > > > > > > > > > not
>> > > > > > > > > > > > > > > > > > > > have this responsibility.  I'm not
>> sure
>> > > if the
>> > > > > > solution is
>> > > > > > > > > > > > > to add
>> > > > > > > > > > > > > > > > > some
>> > > > > > > > > > > > > > > > > > > > things back to StateStoreContext, or
>> > > make yet
>> > > > > > another
>> > > > > > > > > > > > context
>> > > > > > > > > > > > > > > that
>> > > > > > > > > > > > > > > > > > > > represents record-specific context
>> while
>> > > > > > inside a state
>> > > > > > > > > > > > > store.
>> > > > > > > > > > > > > > > > > > > > Best,
>> > > > > > > > > > > > > > > > > > > > Paul
>> > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > On Wed, Sep 9, 2020 at 5:43 PM John
>> > > Roesler <
>> > > > > > > > > > > > > j...@vvcephei.org>
>> > > > > > > > > > > > > > > > > wrote:
>> > > > > > > > > > > > > > > > > > > > > Hello all,
>> > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > I've been slowly pushing KIP-478
>> > > forward
>> > > > > > over the last
>> > > > > > > > > > > > > year,
>> > > > > > > > > > > > > > > > > > > > > and I'm happy to say that we're
>> making
>> > > good
>> > > > > > progress now.
>> > > > > > > > > > > > > > > > > > > > > However, several issues with the
>> > > original
>> > > > > > design have
>> > > > > > > > > > > > come
>> > > > > > > > > > > > > > > > > > > > > to light.
>> > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > The major changes:
>> > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > We discovered that the original
>> plan
>> > > of just
>> > > > > > adding
>> > > > > > > > > > > > generic
>> > > > > > > > > > > > > > > > > > > > > parameters to ProcessorContext
>> was too
>> > > > > > disruptive, so we
>> > > > > > > > > > > > > are
>> > > > > > > > > > > > > > > > > > > > > now adding a new
>> api.ProcessorContext.
>> > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > That choice forces us to add a new
>> > > > > > StateStore.init method
>> > > > > > > > > > > > > > > > > > > > > for the new context, but
>> > > ProcessorContext
>> > > > > > really isn't
>> > > > > > > > > > > > > ideal
>> > > > > > > > > > > > > > > > > > > > > for state stores to begin with,
>> so I'm
>> > > > > > proposing a new
>> > > > > > > > > > > > > > > > > > > > > StateStoreContext for this
>> purpose. In
>> > > a
>> > > > > > nutshell, there
>> > > > > > > > > > > > > are
>> > > > > > > > > > > > > > > > > > > > > quite a few methods in
>> > > ProcessorContext that
>> > > > > > actually
>> > > > > > > > > > > > > should
>> > > > > > > > > > > > > > > > > > > > > never be called from inside a
>> > > StateStore.
>> > > > > > > > > > > > > > > > > > > > > Also, since there is a new
>> > > ProcessorContext
>> > > > > > interface, we
>> > > > > > > > > > > > > > > > > > > > > need a new MockProcessorContext
>> > > > > > implementation in the
>> > > > > > > > > > > > test-
>> > > > > > > > > > > > > > > > > > > > > utils module.
>> > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > > The changeset for the KIP
>> document is
>> > > here:
>> > > > > > > > > > > > > > > > > > > > >
>> > >
>> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=14&selectedPageVersions=10
>> > > > > > > > > > > > > > > > > > > > > And the KIP itself is here:
>> > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > >
>> > >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-478+-+Strongly+typed+Processor+API
>> > > > > > > > > > > > > > > > > > > > > If you have any concerns, please
>> let
>> > > me know!
>> > > > > > > > > > > > > > > > > > > > > Thanks,
>> > > > > > > > > > > > > > > > > > > > > -John
>> > > > > > > > > > > > > > > > > > > > >
>> > > > > > > > > > > > > > > > > > > > >
>>
>>

Reply via email to