John, I totally agree that adding a method to Processor is cumbersome and not a good path. I was imagining maybe a separate interface that could be used in the appropriate context, but I don't think that makes too much sense - it's just too far away from what Kafka Streams is. I was originally more interested in the "why" Optional than the "how" (I think my original reply overplayed the "optional as an argument" concern). But you've convinced me that there is a perfectly legitimate "why". We should make sure that it's clear why it's Optional, but I suppose that goes without saying. It's a nice opportunity to make the API reflect more what is actually going on under the hood.
Thanks! Paul On Tue, Sep 29, 2020 at 10:05 PM Sophie Blee-Goldman <sop...@confluent.io> wrote: > FWIW, while I'm really not a fan of Optional in general, I agree that its > usage > here seems appropriate. Even for those rare software developers who > carefully > read all the docs several times over, I think it wouldn't be too hard to > miss a > note about the RecordMetadata possibly being null. > > Especially because it's not that obvious why at first glance, and takes a > bit of > thinking to realize that records originating from a Punctuator wouldn't > have a > "current record". This is something that has definitely confused users > today. > > It's on us to improve the education here -- and an Optional<RecordMetadata> > would naturally raise awareness of this subtlety > > On Tue, Sep 29, 2020 at 7:40 PM Sophie Blee-Goldman <sop...@confluent.io> > wrote: > > > Does my reply address your concerns? > > > > > > Yes; also, I definitely misread part of the proposal earlier and thought > > you had put > > the timestamp field in RecordMetadata. Sorry for not giving things a > > closer look > > before responding! I'm not sure my original message made much sense given > > the misunderstanding, but thanks for responding anyway :P > > > > Having given the proposal a second pass, I agree, it's very elegant. +1 > > > > On Tue, Sep 29, 2020 at 6:50 PM John Roesler <vvcep...@apache.org> > wrote: > > > >> Thanks for the reply, Sophie, > >> > >> I think I may have summarized too much in my prior reply. > >> > >> In the currently proposed KIP, any caller of forward() must > >> supply a Record, which consists of: > >> * key > >> * value > >> * timestamp > >> * headers (with a convenience constructor that sets empty > >> headers) > >> > >> These aren't what I was referring to as potentially being > >> undefined downstream, since thanks to the introduction of > >> Record, they are, as you're advocating, required to be > >> defined everywhere, even when forwarding from a punctuator. > >> > >> So to be clear, the intent of this change is actually to > >> _enforce_ that timestamp would never be undefined (which it > >> currently can be). Also, since punctuators _are_ going to > >> have to "make up" a timestamp going forward, we should note > >> that the "punctuate" method currently passes in a good > >> timestamp that they can use: for system-time punctuations, > >> they receive the current system time, and for stream-time > >> punctuations, they get the current stream time. > >> > >> The potentially undefined RecordMetadata only contains these > >> fields: > >> * topic > >> * partition > >> * offset > >> > >> These fields aren't required (or even used) in a Sink, and > >> it doesn't seem like they would be important to many > >> applications. Furthermore, it doesn't _seem_ like you'd even > >> want to set these fields. They seem purely informational and > >> only useful in the context when you are actually processing > >> a real input record. It doesn't sound like you were asking > >> for it, but just to put it on the record, I think if we were > >> to require values for the metadata from punctuators, people > >> would mostly just make up their own dummy values, to no > >> one's benefit. > >> > >> I should also note that with the current > >> Record/RecordMetadata split, we will have the freedom to > >> move fields into the Record class (or even add new fields) > >> if we want them to become "data" as opposed to "metadata" in > >> the future. > >> > >> Thanks for your reply; I was similarly floored when I > >> realized the true nature of the current situation. Does my > >> reply address your concerns? > >> > >> Thanks, > >> -John > >> > >> On Tue, 2020-09-29 at 18:34 -0700, Sophie Blee-Goldman > >> wrote: > >> > > However, the record metadata is only defined when the parent > forwards > >> > > while processing a > >> > > >> > real record, not when it calls forward from the punctuator > >> > > >> > > >> > Can we take a step back for a second...why wouldn't you be required to > >> set > >> > the RecordContext > >> > yourself when calling forward from a Punctuator? I think I agree with > >> Paul > >> > here, it seems kind of > >> > absurd not to enforce that the RecordContext be present inside the > >> > process() method. > >> > > >> > The original problem with Punctuators, as I understood it, was that > all > >> of > >> > the RecordContext > >> > fields were exposed automatically to both the Processor and any > >> Punctuator, > >> > due to being > >> > direct methods on the ProcessorContext. We can't control which > >> > ProcessorContext methods > >> > someone will call from with a Punctuator vs from a Processor. The best > >> we > >> > could do was > >> > set these "nonsense" fields to null when inside a Punctuator, or set > >> them > >> > to some dummy > >> > values as you pointed out. > >> > > >> > But then you proposed the solution of a separate RecordContext which > is > >> not > >> > attached to the > >> > ProcessorContext at all. This seemed to solve the above problem very > >> > neatly: we only pass > >> > in the RecordContext to the process() method, so we don't have to > worry > >> > about people trying > >> > to access these fields from within a Punctuator. The fields aren't > >> > accessible unless they're > >> > defined. > >> > > >> > So what happens when someone wants to forward something from within a > >> > Punctuator? I > >> > don't think it's reasonable to let the timestamp field be undefined, > >> ever. > >> > What if the Punctuator > >> > forwards directly to a sink, or directly to some windowing logic. Are > we > >> > supposed to add > >> > handling for the RecordContext == null case to every processor? Or are > >> we > >> > just going to > >> > assume the implicit restriction that users will only forward records > >> from a > >> > Punctuator to > >> > downstream processors that know how to handle and/or set the > >> RecordContext > >> > if it's > >> > undefined. That seems to throw away a lot of the awesome safety added > in > >> > this KIP > >> > > >> > Apologies for the rant. But I feel pretty strongly that allowing to > >> forward > >> > records from a > >> > Punctuator without a defined RecordContext would be asking for > trouble. > >> > Imo, if you > >> > want to forward from a Punctuator, you need to store the info you need > >> in > >> > order to > >> > set the timestamp, or make one up yourself > >> > > >> > (the one alternative I can think of here is that maybe we could pass > in > >> the > >> > current > >> > partition time, so users can at least put in a reasonable estimate for > >> the > >> > timestamp > >> > that won't cause it to get dropped and won't potentially lurch the > >> > streamtime far into > >> > the future. This would be similar to what we do in the > >> TimestampExtractor) > >> > > >> > On Tue, Sep 29, 2020 at 6:06 PM John Roesler <vvcep...@apache.org> > >> wrote: > >> > > >> > > Oh, I guess one other thing I should have mentioned is that I’ve > >> recently > >> > > discovered that in cases where the context is undefined, we > currently > >> just > >> > > fill in dummy values for the context. So there’s a good chance that > >> real > >> > > applications in use are depending on undefined context without even > >> > > realizing it. What I’m hoping to do is just make the situation > >> explicit and > >> > > get rid of the dummy values. > >> > > > >> > > Thanks, > >> > > John > >> > > > >> > > On Tue, Sep 29, 2020, at 20:01, John Roesler wrote: > >> > > > Thanks for the review, Paul! > >> > > > > >> > > > I had read some of that debate before. There seems to be some > >> subtext > >> > > > there, because they advise against using Optional in cases like > >> this, > >> > > > but there doesn’t seem to be a specific reason why it’s > >> inappropriate. > >> > > > I got the impression they were just afraid that people would go > >> > > > overboard and make everything Optional. > >> > > > > >> > > > I could also make two methods, but it seemed like it might be an > >> > > > unfortunate way to handle the issue, since Processor is just > about a > >> > > > Function as-is, but the two-method approach would require people > to > >> > > > implement both methods. > >> > > > > >> > > > To your question, this is something that’s only recently became > >> clear > >> > > > to me. Imagine you have a parent processor that calls forward both > >> from > >> > > > process and a punctuator. The child will have process() invoked in > >> both > >> > > > cases, and won’t be able to distinguish them. However, the record > >> > > > metadata is only defined when the parent forwards while > processing a > >> > > > real record, not when it calls forward from the punctuator. > >> > > > > >> > > > This is why I wanted to make the metadata Optional, to advertise > >> that > >> > > > the metadata might be undefined if any ancestor processor ever > calls > >> > > > forward from a punctuator. We could remove the Optional and > instead > >> > > > just document that the argument might be null. > >> > > > > >> > > > With that context in place, what’s your take? > >> > > > > >> > > > Thanks, > >> > > > John > >> > > > > >> > > > On Tue, Sep 29, 2020, at 19:09, Paul Whalen wrote: > >> > > > > Looks pretty good to me, though the Processor#process(Record, > >> > > > > Optional<RecordMetadata>) signature caught my eye. There's some > >> > > debate > >> > > > > ( > >> > > > > > >> > > > >> > https://stackoverflow.com/questions/31922866/why-should-java-8s-optional-not-be-used-in-arguments > >> > > ) > >> > > > > about whether to use Optionals in arguments, and while that's a > >> bit of > >> > > a > >> > > > > religious debate in the abstract, it did make me wonder whether > it > >> > > makes > >> > > > > sense in this specific case. When is it actually not present? > I > >> was > >> > > > > under > >> > > > > the impression that we should always have access to it in > >> process(), > >> > > and > >> > > > > that the concern about metadata being undefined was about having > >> > > access > >> > > > > to > >> > > > > record metadata in the ProcessorContext held for use inside a > >> > > > > Punctuator. > >> > > > > > >> > > > > If that's not the case and it is truly optional in process(), is > >> there > >> > > an > >> > > > > opportunity for an alternate interface for the cases when we > >> don't get > >> > > it, > >> > > > > rather than force the branching on implementers of the > interface? > >> > > > > > >> > > > > Apologies if I've missed something, I took a look at the PR and > I > >> > > didn't > >> > > > > see any spots where I thought it would be empty. Perhaps an > >> example > >> > > of a > >> > > > > Punctuator using (and not using) the new API would clear things > >> up. > >> > > > > > >> > > > > Best, > >> > > > > Paul > >> > > > > > >> > > > > On Tue, Sep 29, 2020 at 4:10 PM John Roesler < > vvcep...@apache.org > >> > > >> > > wrote: > >> > > > > > Hello again, all, > >> > > > > > > >> > > > > > Thanks for the latest round of discussion. I've taken the > >> > > > > > recent feedback and come up with an updated KIP that seems > >> > > > > > actually quite a bit nicer than the prior proposal. > >> > > > > > > >> > > > > > The specific diff on the KIP is here: > >> > > > > > > >> > > > > > > >> > > > >> > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=15&selectedPageVersions=14 > >> > > > > > These changes are implemented in this POC PR: > >> > > > > > https://github.com/apache/kafka/pull/9346 > >> > > > > > > >> > > > > > The basic idea is that, building on the recent conversaion, > >> > > > > > we would transition away from the current API where we get > >> > > > > > only key/value in the process() method and other "data" > >> > > > > > comes in the ProcessorContext along with the "metadata". > >> > > > > > > >> > > > > > Instead, we formalize what is "data" and what is "metadata", > >> > > > > > and pass it all in to the process method: > >> > > > > > Processor#process(Record, Optional<RecordMetadata>) > >> > > > > > > >> > > > > > Also, you forward the whole data class instead of mutating > >> > > > > > the ProcessorContext fields and also calling forward: > >> > > > > > ProcessorContext#forward(Record) > >> > > > > > > >> > > > > > The Record class itself ships with methods like > >> > > > > > record#withValue(NewV newValue) > >> > > > > > that make a shallow copy of the input Record, enabling > >> > > > > > Processors to safely handle the record without polluting the > >> > > > > > context of their parents and siblings. > >> > > > > > > >> > > > > > This proposal has a number of key benefits: > >> > > > > > * As we've discovered in KAFKA-9584, it's unsafe to mutate > >> > > > > > the Headers via the ProcessorContext. This proposal offers a > >> > > > > > way to safely forward changes only to downstream processors. > >> > > > > > * The new API has symmetry (each processor's input is the > >> > > > > > output of its parent processor) > >> > > > > > * The API makes clear that the record metadata isn't always > >> > > > > > defined (for example, in a punctuation, there is no current > >> > > > > > topic/partition/offset) > >> > > > > > * The API enables punctuators to forward well defined > >> > > > > > headers downstream, which is currently not possible. > >> > > > > > > >> > > > > > Unless their are objections, I'll go ahead and re-finalize > >> > > > > > this KIP and update that PR to a mergeable state. > >> > > > > > > >> > > > > > Thanks, all, > >> > > > > > -John > >> > > > > > > >> > > > > > > >> > > > > > On Thu, 2020-09-24 at 09:41 -0700, Matthias J. Sax wrote: > >> > > > > > > Interesting proposal. However, I am not totally convinced, > >> because > >> > > I see > >> > > > > > > a fundamental difference between "data" and "metadata". > >> > > > > > > > >> > > > > > > Topic/partition/offset are "metadata" in the strong sense > and > >> they > >> > > are > >> > > > > > > immutable. > >> > > > > > > > >> > > > > > > On the other hand there is "primary" data like key and > value, > >> as > >> > > well as > >> > > > > > > "secondary" data like timestamp and headers. The issue seems > >> that > >> > > we > >> > > > > > > treat "secondary data" more like metadata atm? > >> > > > > > > > >> > > > > > > Thus, promoting timestamp and headers into a first class > >> citizen > >> > > roll > >> > > > > > > make sense to me (my original proposal about `RecordContext` > >> would > >> > > still > >> > > > > > > fall short with this regard). However, putting both (data > and > >> > > metadata) > >> > > > > > > into a `Record` abstraction might go too far? > >> > > > > > > > >> > > > > > > I am also a little bit concerned about `Record.copy()` > >> because it > >> > > might > >> > > > > > > be a trap: Users might assume it does a full deep copy of > the > >> > > record, > >> > > > > > > however, it would not. It would only create a new `Record` > >> object > >> > > as > >> > > > > > > wrapper that points to the same key/value/header objects as > >> the > >> > > input > >> > > > > > > record. > >> > > > > > > > >> > > > > > > With the current `context.forward(key, value)` we don't have > >> this > >> > > "deep > >> > > > > > > copy" issue -- it's pretty clear what is happening. > >> > > > > > > > >> > > > > > > Instead of `To.all().withTimestamp()` we could also add > >> > > > > > > `context.forward(key, value, timestamp)` etc (just wondering > >> about > >> > > the > >> > > > > > > exposition in overload)? > >> > > > > > > > >> > > > > > > Also, `Record.withValue` etc sounds odd? Should a record not > >> be > >> > > > > > > immutable? So, we could have something like > >> > > > > > > > >> > > > > > > > >> > > > >> > `RecordFactory.withKeyValue(...).withTimestamp(...).withHeaders(...).build()`. > >> > > > > > > But it looks rather verbose? > >> > > > > > > > >> > > > > > > The other question is of course, to what extend to we want > to > >> keep > >> > > the > >> > > > > > > distinction between "primary" and "secondary" data? To me, > >> it's a > >> > > > > > > question of easy of use? > >> > > > > > > > >> > > > > > > Just putting all this out to move the discussion forward. > >> Don't > >> > > have a > >> > > > > > > concrete proposal atm. > >> > > > > > > > >> > > > > > > > >> > > > > > > -Matthias > >> > > > > > > > >> > > > > > > > >> > > > > > > On 9/14/20 9:24 AM, John Roesler wrote: > >> > > > > > > > Thanks for this thought, Matthias! > >> > > > > > > > > >> > > > > > > > To be honest, it's bugged me quite a bit that _all_ the > >> > > > > > > > record information hasn't been an argument to `process`. I > >> > > > > > > > suppose I was trying to be conservative in this proposal, > >> > > > > > > > but then again, if we're adding new Processor and > >> > > > > > > > ProcessorContext interfaces, then this is the time to make > >> > > > > > > > such a change. > >> > > > > > > > > >> > > > > > > > To be unambiguous, I think this is what we're talking > about: > >> > > > > > > > ProcessorContext: > >> > > > > > > > * applicationId > >> > > > > > > > * taskId > >> > > > > > > > * appConfigs > >> > > > > > > > * appConfigsWithPrefix > >> > > > > > > > * keySerde > >> > > > > > > > * valueSerde > >> > > > > > > > * stateDir > >> > > > > > > > * metrics > >> > > > > > > > * schedule > >> > > > > > > > * commit > >> > > > > > > > * forward > >> > > > > > > > > >> > > > > > > > StateStoreContext: > >> > > > > > > > * applicationId > >> > > > > > > > * taskId > >> > > > > > > > * appConfigs > >> > > > > > > > * appConfigsWithPrefix > >> > > > > > > > * keySerde > >> > > > > > > > * valueSerde > >> > > > > > > > * stateDir > >> > > > > > > > * metrics > >> > > > > > > > * register > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > RecordContext > >> > > > > > > > * topic > >> > > > > > > > * partition > >> > > > > > > > * offset > >> > > > > > > > * timestamp > >> > > > > > > > * headers > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > Your proposal sounds good to me as-is. Just to cover the > >> > > > > > > > bases, though, I'm wondering if we should push the idea > just > >> > > > > > > > a little farther. Instead of decomposing > key,value,context, > >> > > > > > > > we could just keep them all in one object, like this: > >> > > > > > > > > >> > > > > > > > Record: > >> > > > > > > > * key > >> > > > > > > > * value > >> > > > > > > > * topic > >> > > > > > > > * partition > >> > > > > > > > * offset > >> > > > > > > > * timestamp > >> > > > > > > > * headers > >> > > > > > > > > >> > > > > > > > Then, we could have: > >> > > > > > > > Processor#process(Record) > >> > > > > > > > ProcessorContext#forward(Record, To) > >> > > > > > > > > >> > > > > > > > Viewed from this perspective, a record has three > properties > >> > > > > > > > that people may specify in their processors: key, value, > and > >> > > > > > > > timestamp. > >> > > > > > > > > >> > > > > > > > We could deprecate `To#withTimestamp` and enable people to > >> > > > > > > > specify the timestamp along with the key and value when > they > >> > > > > > > > forward a record. > >> > > > > > > > > >> > > > > > > > E.g., > >> > > > > > > > RecordBuilder toForward = RecordBuilder.copy(record) > >> > > > > > > > toForward.withKey(newKey) > >> > > > > > > > toForward.withValue(newValue) > >> > > > > > > > toForward.withTimestamp(newTimestamp) > >> > > > > > > > Record newRecord = toForward.build() > >> > > > > > > > context.forward(newRecord, To.child("child1")) > >> > > > > > > > > >> > > > > > > > Or, the more compact common case: > >> > > > > > > > current: > >> > > > > > > > context.forward(key, "newValue") > >> > > > > > > > proposed: > >> > > > > > > > > context.forward(copy(record).withValue("newValue").build()) > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > It's slightly more verbose, but also more extensible. This > >> > > > > > > > would give us a clean path to add header support in PAPI > as > >> > > > > > > > well, simply by adding `withHeaders` in RecordBuilder. > >> > > > > > > > > >> > > > > > > > It's also more symmetrical, since the recipient of > `forward` > >> > > > > > > > would just get the sent `Record`. Whereas today, the > sender > >> > > > > > > > puts the timestamp in `To`, but the recipient gets in in > its > >> > > > > > > > own `ProcessorContext`. > >> > > > > > > > > >> > > > > > > > WDYT? > >> > > > > > > > -John > >> > > > > > > > > >> > > > > > > > On Fri, 2020-09-11 at 12:30 -0700, Matthias J. Sax wrote: > >> > > > > > > > > I think separating the different contexts make sense. > >> > > > > > > > > > >> > > > > > > > > In fact, we could even go one step further and remove > the > >> > > record > >> > > > > > context > >> > > > > > > > > from the processor context completely and we add a third > >> > > parameter to > >> > > > > > > > > `process(key, value, recordContext)`. This would make it > >> clear > >> > > that > >> > > > > > the > >> > > > > > > > > context is for the input record only and it's not > >> possible to > >> > > pass > >> > > > > > it to > >> > > > > > > > > a `punctuate` callback. > >> > > > > > > > > > >> > > > > > > > > For the stores and changelogging: I think there are two > >> cases. > >> > > (1) > >> > > > > > You > >> > > > > > > > > use a plain key-value store. For this case, it seems you > >> do > >> > > not care > >> > > > > > > > > about the timestamp and thus does not care what > timestamp > >> is > >> > > set in > >> > > > > > the > >> > > > > > > > > changelog records. (We can set anything we want, as it's > >> not > >> > > > > > relevant at > >> > > > > > > > > all -- the timestamp is ignored on read anyway.) (2) The > >> other > >> > > case > >> > > > > > is, > >> > > > > > > > > that one does care about timestamps, and for this case > >> should > >> > > use > >> > > > > > > > > TimestampedKeyValueStore. The passed timestamp will be > >> set on > >> > > the > >> > > > > > > > > changelog records for this case. > >> > > > > > > > > > >> > > > > > > > > Thus, for both cases, accessing the record context does > >> not > >> > > seems to > >> > > > > > be > >> > > > > > > > > a requirement. And providing access to the processor > >> context > >> > > to, eg., > >> > > > > > > > > `forward()` or similar seems safe. > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > -Matthias > >> > > > > > > > > > >> > > > > > > > > On 9/10/20 7:25 PM, John Roesler wrote: > >> > > > > > > > > > Thanks for the reply, Paul! > >> > > > > > > > > > > >> > > > > > > > > > I certainly intend to make sure that the changelogging > >> layer > >> > > > > > > > > > continues to work the way it does now, by hook or by > >> crook. > >> > > > > > > > > > I think the easiest path for me is to just "cheat" and > >> get > >> > > > > > > > > > the real ProcessorContext into the ChangeLoggingStore > >> > > > > > > > > > implementation somehow. I'll tag you on the PR when I > >> create > >> > > > > > > > > > it, so you have an opportunity to express a preference > >> about > >> > > > > > > > > > the implementation choice, and maybe even compile/test > >> > > > > > > > > > against it to make sure your stuff still works. > >> > > > > > > > > > > >> > > > > > > > > > Regarding this: > >> > > > > > > > > > > >> > > > > > > > > > > we have an interest in making a state store with a > >> richer > >> > > > > > > > > > > way of querying its data (like perhaps getting all > >> values > >> > > > > > > > > > > associated with a secondary key), while still > >> ultimately > >> > > > > > > > > > > writing to the changelog topic for later > restoration. > >> > > > > > > > > > > >> > > > > > > > > > This is very intriguing to me. On the side, I've been > >> > > > > > > > > > preparing a couple of ideas related to this topic. I > >> don't > >> > > > > > > > > > think I have a coherent enough thought to even express > >> it in > >> > > > > > > > > > a Jira right now, but when I do, I'll tag you on it > >> also to > >> > > > > > > > > > see what you think. > >> > > > > > > > > > > >> > > > > > > > > > Whenever you're ready to share the usability > improvement > >> > > > > > > > > > ideas, I'm very interested to see what you've come up > >> with. > >> > > > > > > > > > > >> > > > > > > > > > Thanks, > >> > > > > > > > > > -John > >> > > > > > > > > > > >> > > > > > > > > > On Thu, 2020-09-10 at 21:02 -0500, Paul Whalen wrote: > >> > > > > > > > > > > > when you use a HashMap or RocksDB or other "state > >> > > stores", you > >> > > > > > don't > >> > > > > > > > > > > > expect them to automatically know extra stuff > about > >> the > >> > > record > >> > > > > > you're > >> > > > > > > > > > > > storing. > >> > > > > > > > > > > > >> > > > > > > > > > > So, I don't think there is any reason we *can't* > >> retain the > >> > > > > > record context > >> > > > > > > > > > > > in the StateStoreContext, and if any users came > >> along > >> > > with a > >> > > > > > clear use case > >> > > > > > > > > > > > I'd find that convincing. > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > I agree with the principle of being conservative > with > >> the > >> > > > > > StateStoreContext > >> > > > > > > > > > > API. Regarding user expectations or a clear use > >> case, the > >> > > only > >> > > > > > > > > > > counterpoint I would offer is that we sort of have > >> that > >> > > use case > >> > > > > > already, > >> > > > > > > > > > > which is the example I gave of the change logging > >> store > >> > > using the > >> > > > > > > > > > > timestamp. I am curious if this functionality will > be > >> > > retained > >> > > > > > when using > >> > > > > > > > > > > built in state stores, or will a low-level processor > >> get a > >> > > > > > KeyValueStore > >> > > > > > > > > > > that no longer writes to the changelog topic with > the > >> > > record's > >> > > > > > timestamp. > >> > > > > > > > > > > While I personally don't care much about that > >> functionality > >> > > > > > specifically, I > >> > > > > > > > > > > have a general desire for custom state stores to > >> easily do > >> > > the > >> > > > > > things that > >> > > > > > > > > > > built in state stores do. > >> > > > > > > > > > > > >> > > > > > > > > > > It genuinely did not occur to me that users might be > >> > > looking up > >> > > > > > and/or > >> > > > > > > > > > > > updating records of other keys from within a > >> Processor. > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > I'm glad you said this Sophie, because it gives me > an > >> > > > > > opportunity to say > >> > > > > > > > > > > that this is actually a *huge* use case for my team. > >> The > >> > > state > >> > > > > > store > >> > > > > > > > > > > usability improvements I was referring to in my > >> previous > >> > > message > >> > > > > > were about > >> > > > > > > > > > > enabling the user to write custom stores while still > >> easily > >> > > > > > hooking into > >> > > > > > > > > > > the ability to write to a changelog topic. I think > >> that is > >> > > > > > technically > >> > > > > > > > > > > possible now, but I don't think it's trivial. > >> > > Specifically, we > >> > > > > > have an > >> > > > > > > > > > > interest in making a state store with a richer way > of > >> > > querying > >> > > > > > its data > >> > > > > > > > > > > (like perhaps getting all values associated with a > >> > > secondary > >> > > > > > key), while > >> > > > > > > > > > > still ultimately writing to the changelog topic for > >> later > >> > > > > > restoration. > >> > > > > > > > > > > We recognize that this use case throws away some of > >> what > >> > > kafka > >> > > > > > streams > >> > > > > > > > > > > (especially the DSL) is good at - easy > >> parallelizability by > >> > > > > > partitioning > >> > > > > > > > > > > all processing by key - and that our business logic > >> would > >> > > > > > completely fall > >> > > > > > > > > > > apart if we were consuming from multi-partition > >> topics with > >> > > > > > multiple > >> > > > > > > > > > > consumers. But we have found that using the low > level > >> > > processor > >> > > > > > API is > >> > > > > > > > > > > good for the very simple stream processing > primitives > >> it > >> > > > > > provides: handling > >> > > > > > > > > > > the plumbing of consuming from multiple kafka topics > >> and > >> > > > > > potentially > >> > > > > > > > > > > updating persistent local state in a reliable way. > >> That in > >> > > > > > itself has > >> > > > > > > > > > > proven to be a worthwhile programming model. > >> > > > > > > > > > > > >> > > > > > > > > > > Since I got off track a bit, let me summarize: I > don't > >> > > > > > particularly care > >> > > > > > > > > > > about the record context being available to state > >> store > >> > > > > > implementations, > >> > > > > > > > > > > and I think this KIP is headed in the right > direction > >> in > >> > > that > >> > > > > > regard. But > >> > > > > > > > > > > more generally, I wanted to express the importance > of > >> > > > > > maintaining a > >> > > > > > > > > > > powerful and flexible StateStore interface. > >> > > > > > > > > > > > >> > > > > > > > > > > Thanks! > >> > > > > > > > > > > Paul > >> > > > > > > > > > > > >> > > > > > > > > > > On Thu, Sep 10, 2020 at 6:11 PM Sophie Blee-Goldman > < > >> > > > > > sop...@confluent.io> > >> > > > > > > > > > > wrote: > >> > > > > > > > > > > > >> > > > > > > > > > > > Aha, I did misinterpret the example in your > previous > >> > > response > >> > > > > > regarding the > >> > > > > > > > > > > > range query after all. I thought you just meant a > >> > > time-range > >> > > > > > query inside a > >> > > > > > > > > > > > punctuator. It genuinely did not occur to me that > >> users > >> > > might > >> > > > > > be looking up > >> > > > > > > > > > > > and/or updating records of other keys from within > a > >> > > Processor. > >> > > > > > Sorry for > >> > > > > > > > > > > > being closed minded > >> > > > > > > > > > > > > >> > > > > > > > > > > > I won't drag out this discussion any further by > >> asking > >> > > whether > >> > > > > > that might > >> > > > > > > > > > > > be > >> > > > > > > > > > > > a valid use case or just a lurking bug in itself > :) > >> > > > > > > > > > > > > >> > > > > > > > > > > > Thanks for humoring me. The current proposal for > >> KIP-478 > >> > > > > > sounds good to me > >> > > > > > > > > > > > On Thu, Sep 10, 2020 at 3:43 PM John Roesler < > >> > > > > > vvcep...@apache.org> wrote: > >> > > > > > > > > > > > > Ah, thanks Sophie, > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > I'm sorry for misinterpreting your resonse. Yes, > >> we > >> > > > > > > > > > > > > absolutely can and should clear the context > before > >> > > > > > > > > > > > > punctuating. > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > My secondary concern is maybe more far-fetched. > I > >> was > >> > > > > > > > > > > > > thinking that inside process(key,value), a > >> Processor > >> > > might > >> > > > > > > > > > > > > do a get/put of a _different_ key. Consider, for > >> > > example, > >> > > > > > > > > > > > > the way that Suppress processors work. When they > >> get a > >> > > > > > > > > > > > > record, they add it to the store and then do a > >> range > >> > > scan > >> > > > > > > > > > > > > and possibly forward a _different_ record. Of > >> course, > >> > > this > >> > > > > > > > > > > > > is an operation that is deeply coupled to the > >> > > internals, and > >> > > > > > > > > > > > > the Suppress processor accordingly actually does > >> get > >> > > access > >> > > > > > > > > > > > > to the internal context so that it can set the > >> context > >> > > > > > > > > > > > > before forwarding. > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > Still, it seems like I've had a handful of > >> > > conversations > >> > > > > > > > > > > > > with people over the years in which they tell me > >> they > >> > > are > >> > > > > > > > > > > > > using state stores in a way that transcends the > >> "get > >> > > and put > >> > > > > > > > > > > > > the currently processing record" access > pattern. I > >> > > doubt > >> > > > > > > > > > > > > that those folks would even have considered the > >> > > possiblity > >> > > > > > > > > > > > > that the currently processing record's _context_ > >> could > >> > > > > > > > > > > > > pollute their state store operations, as I > myself > >> > > never gave > >> > > > > > > > > > > > > it a second thought until the current > conversation > >> > > began. In > >> > > > > > > > > > > > > cases like that, we have actually set a trap for > >> these > >> > > > > > > > > > > > > people, and it seems better to dismantle the > trap. > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > As you noted, really the only people who would > be > >> > > negatively > >> > > > > > > > > > > > > impacted are people who implement their own > state > >> > > stores. > >> > > > > > > > > > > > > These folks will get the deprecation warning and > >> try to > >> > > > > > > > > > > > > adapt their stores to the new interface. If they > >> needed > >> > > > > > > > > > > > > access to the record context, they would find > >> it's now > >> > > > > > > > > > > > > missing. They'd ask us about it, and we'd have > the > >> > > ability > >> > > > > > > > > > > > > to explain the lurking bug that they have had in > >> their > >> > > > > > > > > > > > > stores all along, as well as the new recommended > >> > > pattern > >> > > > > > > > > > > > > (just pass everything you need in the value). If > >> that's > >> > > > > > > > > > > > > unsatisfying, _then_ we should consider amending > >> the > >> > > API. > >> > > > > > > > > > > > > Thanks, > >> > > > > > > > > > > > > -John > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > On Thu, 2020-09-10 at 15:21 -0700, Sophie > >> Blee-Goldman > >> > > > > > > > > > > > > wrote: > >> > > > > > > > > > > > > > > Regarding your first sentence, "...the > >> processor > >> > > would > >> > > > > > null > >> > > > > > > > > > > > > > > out the record context...", this is not > >> possible, > >> > > since > >> > > > > > the > >> > > > > > > > > > > > > > > processor doesn't have write access to the > >> > > context. We > >> > > > > > could > >> > > > > > > > > > > > > > > add it, > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > Sorry, this was poorly phrased, I definitely > >> did not > >> > > mean > >> > > > > > to imply that > >> > > > > > > > > > > > > we > >> > > > > > > > > > > > > > should make the context modifiable by the > >> Processors > >> > > > > > themselves. I > >> > > > > > > > > > > > meant > >> > > > > > > > > > > > > > this should be handled by the internal > >> processing > >> > > > > > framework that deals > >> > > > > > > > > > > > > with > >> > > > > > > > > > > > > > passing records from one Processor to the > next, > >> > > setting > >> > > > > > the record > >> > > > > > > > > > > > > context > >> > > > > > > > > > > > > > when a new record is picked up, invoking the > >> > > punctuators, > >> > > > > > etc. I > >> > > > > > > > > > > > believe > >> > > > > > > > > > > > > > this > >> > > > > > > > > > > > > > all currently happens in the StreamTask? It > >> already > >> > > can > >> > > > > > and does > >> > > > > > > > > > > > > overwrite > >> > > > > > > > > > > > > > the record context as new records are > >> processed, and > >> > > is > >> > > > > > also > >> > > > > > > > > > > > responsible > >> > > > > > > > > > > > > > for calling the punctuators, so it doesn't > seem > >> like > >> > > a > >> > > > > > huge leap to > >> > > > > > > > > > > > just > >> > > > > > > > > > > > > say > >> > > > > > > > > > > > > > "null out the current record before > punctuating" > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > To clarify, I was never advocating or even > >> > > considering to > >> > > > > > give the > >> > > > > > > > > > > > > > Processors > >> > > > > > > > > > > > > > write access to the record context. Sorry if > my > >> last > >> > > > > > message (or all of > >> > > > > > > > > > > > > > them) > >> > > > > > > > > > > > > > was misleading. I just wanted to point out > that > >> the > >> > > > > > punctuator concern > >> > > > > > > > > > > > is > >> > > > > > > > > > > > > > orthogonal to the question of whether we > should > >> > > include > >> > > > > > the record > >> > > > > > > > > > > > > context > >> > > > > > > > > > > > > > in the StateStoreContext. It's definitely a > real > >> > > problem, > >> > > > > > but it's a > >> > > > > > > > > > > > > > problem > >> > > > > > > > > > > > > > that exists at the Processor level and not > just > >> the > >> > > > > > StateStore. > >> > > > > > > > > > > > > > So, I don't think there is any reason we > *can't* > >> > > retain > >> > > > > > the record > >> > > > > > > > > > > > > context > >> > > > > > > > > > > > > > in the > >> > > > > > > > > > > > > > StateStoreContext, and if any users came along > >> with a > >> > > > > > clear use case > >> > > > > > > > > > > > I'd > >> > > > > > > > > > > > > > find > >> > > > > > > > > > > > > > that convincing. In the absence of any > >> examples, the > >> > > > > > conservative > >> > > > > > > > > > > > > approach > >> > > > > > > > > > > > > > sounds good to me. > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > If it turns out that someone did need the > record > >> > > context > >> > > > > > in their > >> > > > > > > > > > > > custom > >> > > > > > > > > > > > > > state > >> > > > > > > > > > > > > > store, I'm sure they'll submit a politely > >> worded bug > >> > > > > > report alerting us > >> > > > > > > > > > > > > > that we > >> > > > > > > > > > > > > > broke their application. > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > On Thu, Sep 10, 2020 at 3:05 PM John Roesler < > >> > > > > > vvcep...@apache.org> > >> > > > > > > > > > > > > wrote: > >> > > > > > > > > > > > > > > Thanks, Sophie, > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Yes, now that you point it out, I can see > >> that the > >> > > record > >> > > > > > > > > > > > > > > context itself should be nulled out by > Streams > >> > > before > >> > > > > > > > > > > > > > > invoking punctuators. From that perspective, > >> we > >> > > don't > >> > > > > > need > >> > > > > > > > > > > > > > > to think about the second-order problem of > >> what's > >> > > in the > >> > > > > > > > > > > > > > > context for the state store when called > from a > >> > > > > > punctuator. > >> > > > > > > > > > > > > > > Regarding your first sentence, "...the > >> processor > >> > > would > >> > > > > > null > >> > > > > > > > > > > > > > > out the record context...", this is not > >> possible, > >> > > since > >> > > > > > the > >> > > > > > > > > > > > > > > processor doesn't have write access to the > >> > > context. We > >> > > > > > could > >> > > > > > > > > > > > > > > add it, but then all kinds of strange > effects > >> > > would ensue > >> > > > > > > > > > > > > > > when downstream processors execute but the > >> context > >> > > is > >> > > > > > empty, > >> > > > > > > > > > > > > > > etc. Better to just let the framework manage > >> the > >> > > record > >> > > > > > > > > > > > > > > context and keep it read-only for > Processors. > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Reading between the lines of your last > reply, > >> it > >> > > sounds > >> > > > > > that > >> > > > > > > > > > > > > > > the disconnect may just have been a mutual > >> > > > > > misunderstanding > >> > > > > > > > > > > > > > > about whether or not Processors currently > have > >> > > access to > >> > > > > > set > >> > > > > > > > > > > > > > > the record context. Since they do not, if we > >> > > wanted to > >> > > > > > add > >> > > > > > > > > > > > > > > the record context to StateStoreContext in a > >> > > well-defined > >> > > > > > > > > > > > > > > way, we'd also have to add the ability for > >> > > Processors to > >> > > > > > > > > > > > > > > manipulate it. But then, we're just > creating a > >> > > > > > side-channel > >> > > > > > > > > > > > > > > for Processors to pass some information in > >> > > arguments to > >> > > > > > > > > > > > > > > "put()" and other information implicitly > >> through > >> > > the > >> > > > > > > > > > > > > > > context. It seems better just to go for a > >> single > >> > > channel > >> > > > > > for > >> > > > > > > > > > > > > > > now. > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > It sounds like you're basically in favor of > >> the > >> > > > > > conservative > >> > > > > > > > > > > > > > > approach, and you just wanted to understand > >> the > >> > > blockers > >> > > > > > > > > > > > > > > that I implied. Does my clarification make > >> sense? > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Thanks, > >> > > > > > > > > > > > > > > -John > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > On Thu, 2020-09-10 at 10:54 -0700, Sophie > >> > > Blee-Goldman > >> > > > > > > > > > > > > > > wrote: > >> > > > > > > > > > > > > > > > I was just thinking that the processor > would > >> > > null out > >> > > > > > the record > >> > > > > > > > > > > > > context > >> > > > > > > > > > > > > > > > after it > >> > > > > > > > > > > > > > > > finished processing the record, so I'm not > >> sure I > >> > > > > > follow why this > >> > > > > > > > > > > > > would > >> > > > > > > > > > > > > > > not > >> > > > > > > > > > > > > > > > be > >> > > > > > > > > > > > > > > > possible? AFAIK we never call a punctuator > >> in the > >> > > > > > middle of > >> > > > > > > > > > > > > processing a > >> > > > > > > > > > > > > > > > record through the topology, and even if > we > >> did, > >> > > we > >> > > > > > still know when > >> > > > > > > > > > > > > it is > >> > > > > > > > > > > > > > > > about > >> > > > > > > > > > > > > > > > to be called and could set it to null > >> beforehand. > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > I'm not trying to advocate for it here, > I'm > >> in > >> > > > > > agreement that > >> > > > > > > > > > > > > anything > >> > > > > > > > > > > > > > > you > >> > > > > > > > > > > > > > > > want > >> > > > > > > > > > > > > > > > to access within the store can and should > be > >> > > accessed > >> > > > > > within the > >> > > > > > > > > > > > > calling > >> > > > > > > > > > > > > > > > Processor/Punctuator before reaching the > >> store. > >> > > The > >> > > > > > "we can always > >> > > > > > > > > > > > > add it > >> > > > > > > > > > > > > > > > later if necessary" argument is also > pretty > >> > > > > > convincing. Just trying > >> > > > > > > > > > > > > to > >> > > > > > > > > > > > > > > > understand > >> > > > > > > > > > > > > > > > why this wouldn't be possible. > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > FWIW, the question of "what is the current > >> > > record in > >> > > > > > the context > >> > > > > > > > > > > > of a > >> > > > > > > > > > > > > > > > Punctuator" > >> > > > > > > > > > > > > > > > exists independently of whether we want to > >> add > >> > > this to > >> > > > > > the > >> > > > > > > > > > > > > > > StateStoreContext > >> > > > > > > > > > > > > > > > or not. The full ProcessorContext, > >> including the > >> > > > > > current record > >> > > > > > > > > > > > > context, > >> > > > > > > > > > > > > > > is > >> > > > > > > > > > > > > > > > already available within a Punctuator, so > >> > > removing the > >> > > > > > current > >> > > > > > > > > > > > record > >> > > > > > > > > > > > > > > > context > >> > > > > > > > > > > > > > > > from the StateStoreContext does not solve > >> the > >> > > problem. > >> > > > > > Users can -- > >> > > > > > > > > > > > > and > >> > > > > > > > > > > > > > > have > >> > > > > > > > > > > > > > > > (see KAFKA-9584 < > >> > > > > > https://issues.apache.org/jira/browse/KAFKA-9584 > >> > > > > > > > > > > > > ;;) > >> > > > > > > > > > > > > -- > >> > > > > > > > > > > > > > > hit > >> > > > > > > > > > > > > > > > such subtle bugs without ever invoking a > >> > > StateStore > >> > > > > > > > > > > > > > > > from their punctuator. > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > Again, I think I do agree that we should > >> leave > >> > > the > >> > > > > > current record > >> > > > > > > > > > > > > context > >> > > > > > > > > > > > > > > > off of > >> > > > > > > > > > > > > > > > the StateStoreContext, but I don't think > the > >> > > > > > Punctuator argument > >> > > > > > > > > > > > > against > >> > > > > > > > > > > > > > > it > >> > > > > > > > > > > > > > > > is > >> > > > > > > > > > > > > > > > very convincing. It sounds to me like we > >> need to > >> > > > > > disallow access to > >> > > > > > > > > > > > > the > >> > > > > > > > > > > > > > > > current > >> > > > > > > > > > > > > > > > record context from within the Punctuator, > >> > > independent > >> > > > > > of anything > >> > > > > > > > > > > > > to do > >> > > > > > > > > > > > > > > > with > >> > > > > > > > > > > > > > > > state stores > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > On Thu, Sep 10, 2020 at 7:12 AM John > >> Roesler < > >> > > > > > vvcep...@apache.org> > >> > > > > > > > > > > > > > > wrote: > >> > > > > > > > > > > > > > > > > Thanks for the thoughts, Sophie. > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > I agree that the extra information could > >> be > >> > > useful. > >> > > > > > My only > >> > > > > > > > > > > > > concern is > >> > > > > > > > > > > > > > > > > that it doesn’t seem like we can > actually > >> > > supply > >> > > > > > that extra > >> > > > > > > > > > > > > information > >> > > > > > > > > > > > > > > > > correctly. So, then we have a situation > >> where > >> > > the > >> > > > > > system offers > >> > > > > > > > > > > > > useful > >> > > > > > > > > > > > > > > API > >> > > > > > > > > > > > > > > > > calls that are only correct in a narrow > >> range > >> > > of use > >> > > > > > cases. > >> > > > > > > > > > > > > Outside of > >> > > > > > > > > > > > > > > > > those use cases, you get incorrect > >> behavior. > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > If it were possible to null out the > >> context > >> > > before > >> > > > > > you put a > >> > > > > > > > > > > > > document > >> > > > > > > > > > > > > > > to > >> > > > > > > > > > > > > > > > > which the context doesn’t apply, then > the > >> > > concern > >> > > > > > would be > >> > > > > > > > > > > > > mitigated. > >> > > > > > > > > > > > > > > But > >> > > > > > > > > > > > > > > > > it would still be pretty weird from the > >> > > perspective > >> > > > > > of the store > >> > > > > > > > > > > > > that > >> > > > > > > > > > > > > > > > > sometimes the context is populated and > >> other > >> > > times, > >> > > > > > it’s null. > >> > > > > > > > > > > > > > > > > But that seems moot, since it doesn’t > seem > >> > > possible > >> > > > > > to null out > >> > > > > > > > > > > > the > >> > > > > > > > > > > > > > > > > context. Only the Processor could know > >> whether > >> > > it’s > >> > > > > > about to put > >> > > > > > > > > > > > a > >> > > > > > > > > > > > > > > document > >> > > > > > > > > > > > > > > > > different from the context or not. And > it > >> > > would be > >> > > > > > inappropriate > >> > > > > > > > > > > > to > >> > > > > > > > > > > > > > > offer a > >> > > > > > > > > > > > > > > > > public ProcessorContext api to manage > the > >> > > record > >> > > > > > context. > >> > > > > > > > > > > > > > > > > Ultimately, it still seems like if you > >> want to > >> > > store > >> > > > > > headers, you > >> > > > > > > > > > > > > can > >> > > > > > > > > > > > > > > > > store them explicitly, right? That > >> doesn’t seem > >> > > > > > onerous to me, > >> > > > > > > > > > > > and > >> > > > > > > > > > > > > it > >> > > > > > > > > > > > > > > kind > >> > > > > > > > > > > > > > > > > of seems better than relying on > undefined > >> or > >> > > > > > asymmetrical > >> > > > > > > > > > > > behavior > >> > > > > > > > > > > > > in > >> > > > > > > > > > > > > > > the > >> > > > > > > > > > > > > > > > > store itself. > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > Anyway, I’m not saying that we couldn’t > >> solve > >> > > these > >> > > > > > problems. > >> > > > > > > > > > > > Just > >> > > > > > > > > > > > > > > that it > >> > > > > > > > > > > > > > > > > seems a little that we can be > >> conservative and > >> > > avoid > >> > > > > > them for > >> > > > > > > > > > > > now. > >> > > > > > > > > > > > > If > >> > > > > > > > > > > > > > > it > >> > > > > > > > > > > > > > > > > turns out we really need to solve them, > >> we can > >> > > > > > always do it > >> > > > > > > > > > > > later. > >> > > > > > > > > > > > > > > > > Thanks, > >> > > > > > > > > > > > > > > > > John > >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > On Wed, Sep 9, 2020, at 22:46, Sophie > >> > > Blee-Goldman > >> > > > > > wrote: > >> > > > > > > > > > > > > > > > > > > If you were to call "put" from a > >> > > punctuator, or > >> > > > > > do a > >> > > > > > > > > > > > > > > > > > > `range()` query and then update one > of > >> > > those > >> > > > > > records with > >> > > > > > > > > > > > > > > > > > > `put()`, you'd have a very subtle > bug > >> on > >> > > your > >> > > > > > hands. > >> > > > > > > > > > > > > > > > > > Can you elaborate on this a bit? I > agree > >> > > that the > >> > > > > > punctuator > >> > > > > > > > > > > > > case is > >> > > > > > > > > > > > > > > an > >> > > > > > > > > > > > > > > > > > obvious exemption to the assumption > that > >> > > store > >> > > > > > invocations > >> > > > > > > > > > > > always > >> > > > > > > > > > > > > > > > > > have a corresponding "current record", > >> but I > >> > > don't > >> > > > > > understand > >> > > > > > > > > > > > the > >> > > > > > > > > > > > > > > > > > second example. Are you envisioning a > >> > > scenario > >> > > > > > where the > >> > > > > > > > > > > > #process > >> > > > > > > > > > > > > > > > > > method performs a range query and then > >> > > updates > >> > > > > > records? Or were > >> > > > > > > > > > > > > > > > > > you just giving another example of the > >> > > punctuator > >> > > > > > case? > >> > > > > > > > > > > > > > > > > > I only bring it up because I agree > that > >> the > >> > > > > > current record > >> > > > > > > > > > > > > > > information > >> > > > > > > > > > > > > > > > > could > >> > > > > > > > > > > > > > > > > > still be useful within the context of > >> the > >> > > store. > >> > > > > > As a non-user > >> > > > > > > > > > > > my > >> > > > > > > > > > > > > > > input > >> > > > > > > > > > > > > > > > > on > >> > > > > > > > > > > > > > > > > > this > >> > > > > > > > > > > > > > > > > > definitely has limited value, but it > >> just > >> > > isn't > >> > > > > > striking me as > >> > > > > > > > > > > > > > > obvious > >> > > > > > > > > > > > > > > > > that > >> > > > > > > > > > > > > > > > > > we > >> > > > > > > > > > > > > > > > > > should remove access to the current > >> record > >> > > context > >> > > > > > from the > >> > > > > > > > > > > > state > >> > > > > > > > > > > > > > > stores. > >> > > > > > > > > > > > > > > > > > If there is no current record, as in > the > >> > > > > > punctuator case, we > >> > > > > > > > > > > > > should > >> > > > > > > > > > > > > > > just > >> > > > > > > > > > > > > > > > > > set > >> > > > > > > > > > > > > > > > > > the record context to null (or > >> > > Optional.empty, > >> > > > > > etc). > >> > > > > > > > > > > > > > > > > > That said, the put() always has to > come > >> from > >> > > > > > somewhere, and > >> > > > > > > > > > > > that > >> > > > > > > > > > > > > > > > > > somewhere is always going to be > either a > >> > > Processor > >> > > > > > or a > >> > > > > > > > > > > > > Punctuator, > >> > > > > > > > > > > > > > > both > >> > > > > > > > > > > > > > > > > > of which will still have access to the > >> full > >> > > > > > context. So > >> > > > > > > > > > > > > additional > >> > > > > > > > > > > > > > > info > >> > > > > > > > > > > > > > > > > > such as > >> > > > > > > > > > > > > > > > > > the timestamp can and should probably > be > >> > > supplied > >> > > > > > to the store > >> > > > > > > > > > > > > before > >> > > > > > > > > > > > > > > > > > calling put(), rather than looked up > by > >> the > >> > > store. > >> > > > > > But I can > >> > > > > > > > > > > > see > >> > > > > > > > > > > > > some > >> > > > > > > > > > > > > > > > > other > >> > > > > > > > > > > > > > > > > > things being useful, for example the > >> current > >> > > > > > record's headers. > >> > > > > > > > > > > > > Maybe > >> > > > > > > > > > > > > > > > > if/when > >> > > > > > > > > > > > > > > > > > we add better (or any) support for > >> headers in > >> > > > > > state stores this > >> > > > > > > > > > > > > will > >> > > > > > > > > > > > > > > be > >> > > > > > > > > > > > > > > > > > less true. > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > Of course as John has made clear, it's > >> > > pretty hard > >> > > > > > to judge > >> > > > > > > > > > > > > without > >> > > > > > > > > > > > > > > > > > examples > >> > > > > > > > > > > > > > > > > > and more insight as to what actually > >> goes on > >> > > > > > within a custom > >> > > > > > > > > > > > > state > >> > > > > > > > > > > > > > > store > >> > > > > > > > > > > > > > > > > > On Wed, Sep 9, 2020 at 8:07 PM John > >> Roesler < > >> > > > > > > > > > > > vvcep...@apache.org > >> > > > > > > > > > > > > > > wrote: > >> > > > > > > > > > > > > > > > > > > Hi Paul, > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > It's good to hear from you! > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > I'm glad you're in favor of the > >> direction. > >> > > > > > Especially when > >> > > > > > > > > > > > > > > > > > > it comes to public API and usability > >> > > concens, I > >> > > > > > tend to > >> > > > > > > > > > > > > > > > > > > think that "the folks who matter" > are > >> > > actually > >> > > > > > the folks who > >> > > > > > > > > > > > > > > > > > > have to use the APIs to accomplish > >> real > >> > > tasks. > >> > > > > > It can be > >> > > > > > > > > > > > > > > > > > > hard for me to be sure I'm thinking > >> > > clearly from > >> > > > > > that > >> > > > > > > > > > > > > > > > > > > perspective. > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > Funny story, I also started down > this > >> road > >> > > a > >> > > > > > couple of times > >> > > > > > > > > > > > > > > > > > > already and backed them out before > >> the KIP > >> > > > > > because I was > >> > > > > > > > > > > > > > > > > > > afraid of the scope of the proposal. > >> > > > > > Unfortunately, needing > >> > > > > > > > > > > > > > > > > > > to make a new ProcessorContext kind > of > >> > > forced my > >> > > > > > hand. > >> > > > > > > > > > > > > > > > > > > I see you've called me out about the > >> > > > > > ChangeLogging stores :) > >> > > > > > > > > > > > > > > > > > > In fact, I think these are the > >> main/only > >> > > reason > >> > > > > > that stores > >> > > > > > > > > > > > > > > > > > > might really need to invoke > >> "forward()". My > >> > > > > > secret plan was > >> > > > > > > > > > > > > > > > > > > to cheat and either accomplish > >> > > change-logging by > >> > > > > > a different > >> > > > > > > > > > > > > > > > > > > mechanism than implementing the > store > >> > > interface, > >> > > > > > or by just > >> > > > > > > > > > > > > > > > > > > breaking encapsulation to sneak the > >> "real" > >> > > > > > ProcessorContext > >> > > > > > > > > > > > > > > > > > > into the ChangeLogging stores. But > >> those > >> > > are all > >> > > > > > > > > > > > > > > > > > > implementation details. I think the > >> key > >> > > question > >> > > > > > is whether > >> > > > > > > > > > > > > > > > > > > anyone else has a store > >> implementation that > >> > > > > > needs to call > >> > > > > > > > > > > > > > > > > > > "forward()". It's not what you > >> mentioned, > >> > > but > >> > > > > > since you > >> > > > > > > > > > > > > > > > > > > spoke up, I'll just ask: if you have > >> a use > >> > > case > >> > > > > > for calling > >> > > > > > > > > > > > > > > > > > > "forward()" in a store, please share > >> it. > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > Regarding the other record-specific > >> context > >> > > > > > methods, I think > >> > > > > > > > > > > > > > > > > > > you have a good point, but I also > >> can't > >> > > quite > >> > > > > > wrap my head > >> > > > > > > > > > > > > > > > > > > around how we can actually guarantee > >> it to > >> > > work > >> > > > > > in general. > >> > > > > > > > > > > > > > > > > > > For example, the case you cited, > >> where the > >> > > > > > implementation of > >> > > > > > > > > > > > > > > > > > > `KeyValueStore#put(key, value)` uses > >> the > >> > > context > >> > > > > > to augment > >> > > > > > > > > > > > > > > > > > > the record with timestamp > >> information. This > >> > > > > > relies on the > >> > > > > > > > > > > > > > > > > > > assumption that you would only call > >> > > "put()" from > >> > > > > > inside a > >> > > > > > > > > > > > > > > > > > > `Processor#process(key, value)` call > >> in > >> > > which > >> > > > > > the record > >> > > > > > > > > > > > > > > > > > > being processed is the same record > >> that > >> > > you're > >> > > > > > trying to put > >> > > > > > > > > > > > > > > > > > > into the store. > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > If you were to call "put" from a > >> > > punctuator, or > >> > > > > > do a > >> > > > > > > > > > > > > > > > > > > `range()` query and then update one > of > >> > > those > >> > > > > > records with > >> > > > > > > > > > > > > > > > > > > `put()`, you'd have a very subtle > bug > >> on > >> > > your > >> > > > > > hands. Right > >> > > > > > > > > > > > > > > > > > > now, the Streams component that > >> actually > >> > > calls > >> > > > > > the Processor > >> > > > > > > > > > > > > > > > > > > takes care to set the right record > >> context > >> > > > > > before invoking > >> > > > > > > > > > > > > > > > > > > the method, and in the case of > >> caching, > >> > > etc., it > >> > > > > > also takes > >> > > > > > > > > > > > > > > > > > > care to swap out the old context and > >> keep > >> > > it > >> > > > > > somewhere safe. > >> > > > > > > > > > > > > > > > > > > But when it comes to public API > >> Processors > >> > > > > > calling methods > >> > > > > > > > > > > > > > > > > > > on StateStores, there's no > >> opportunity for > >> > > any > >> > > > > > component to > >> > > > > > > > > > > > > > > > > > > make sure the context is always > >> correct. > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > In the face of that situation, it > >> seemed > >> > > better > >> > > > > > to just move > >> > > > > > > > > > > > > > > > > > > in the direction of a "normal" data > >> store. > >> > > I.e., > >> > > > > > when you > >> > > > > > > > > > > > > > > > > > > use a HashMap or RocksDB or other > >> "state > >> > > > > > stores", you don't > >> > > > > > > > > > > > > > > > > > > expect them to automatically know > >> extra > >> > > stuff > >> > > > > > about the > >> > > > > > > > > > > > > > > > > > > record you're storing. If you need > >> them to > >> > > know > >> > > > > > something, > >> > > > > > > > > > > > > > > > > > > you just put it in the value. > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > All of that said, I'm just reasoning > >> from > >> > > first > >> > > > > > principles > >> > > > > > > > > > > > > > > > > > > here. To really know if this is a > >> mistake > >> > > or > >> > > > > > not, I need to > >> > > > > > > > > > > > > > > > > > > be in your place. So please push > back > >> if > >> > > you > >> > > > > > think what I > >> > > > > > > > > > > > > > > > > > > said is nonsense. My personal plan > >> was to > >> > > keep > >> > > > > > an eye out > >> > > > > > > > > > > > > > > > > > > during the period where the old API > >> was > >> > > still > >> > > > > > present, but > >> > > > > > > > > > > > > > > > > > > deprecated, to see if people were > >> > > struggling to > >> > > > > > use the new > >> > > > > > > > > > > > > > > > > > > API. If so, then we'd have a chance > to > >> > > address > >> > > > > > it before > >> > > > > > > > > > > > > > > > > > > dropping the old API. But it's even > >> better > >> > > if > >> > > > > > you can help > >> > > > > > > > > > > > > > > > > > > think it through now. > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > It did also cross my mind to _not_ > >> add the > >> > > > > > > > > > > > > > > > > > > StateStoreContext, but just to > >> continue to > >> > > punt > >> > > > > > on the > >> > > > > > > > > > > > > > > > > > > question by just dropping in the new > >> > > > > > ProcessorContext to the > >> > > > > > > > > > > > > > > > > > > new init method. If > StateStoreContext > >> > > seems too > >> > > > > > bold, we can > >> > > > > > > > > > > > > > > > > > > go that direction. But if we > actually > >> add > >> > > some > >> > > > > > methods to > >> > > > > > > > > > > > > > > > > > > StateStoreContext, I'd like to be > >> able to > >> > > ensure > >> > > > > > they would > >> > > > > > > > > > > > > > > > > > > be well defined. I think the current > >> > > situation > >> > > > > > was more of > >> > > > > > > > > > > > > > > > > > > an oversight than a choice. > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > Thanks again for your reply, > >> > > > > > > > > > > > > > > > > > > -John > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > On Wed, 2020-09-09 at 21:23 -0500, > >> Paul > >> > > Whalen > >> > > > > > wrote: > >> > > > > > > > > > > > > > > > > > > > John, > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > It's exciting to see this KIP head > >> in > >> > > this > >> > > > > > direction! In > >> > > > > > > > > > > > the > >> > > > > > > > > > > > > > > last > >> > > > > > > > > > > > > > > > > year > >> > > > > > > > > > > > > > > > > > > or > >> > > > > > > > > > > > > > > > > > > > so I've tried to sketch out some > >> > > usability > >> > > > > > improvements for > >> > > > > > > > > > > > > > > custom > >> > > > > > > > > > > > > > > > > state > >> > > > > > > > > > > > > > > > > > > > stores, and I also ended up > >> splitting > >> > > out the > >> > > > > > > > > > > > > StateStoreContext > >> > > > > > > > > > > > > > > from > >> > > > > > > > > > > > > > > > > the > >> > > > > > > > > > > > > > > > > > > > ProcessorContext in an attempt to > >> > > facilitate > >> > > > > > what I was > >> > > > > > > > > > > > > doing. I > >> > > > > > > > > > > > > > > > > sort of > >> > > > > > > > > > > > > > > > > > > > abandoned it when I realized how > >> large > >> > > the > >> > > > > > ideal change > >> > > > > > > > > > > > might > >> > > > > > > > > > > > > > > have > >> > > > > > > > > > > > > > > > > to be, > >> > > > > > > > > > > > > > > > > > > > but it's great to see that there > is > >> other > >> > > > > > interest in > >> > > > > > > > > > > > moving > >> > > > > > > > > > > > > in > >> > > > > > > > > > > > > > > this > >> > > > > > > > > > > > > > > > > > > > direction (from the folks that > >> matter :) > >> > > ). > >> > > > > > > > > > > > > > > > > > > > Having taken a stab at it myself, > I > >> have > >> > > a > >> > > > > > comment/question > >> > > > > > > > > > > > > on > >> > > > > > > > > > > > > > > this > >> > > > > > > > > > > > > > > > > > > bullet > >> > > > > > > > > > > > > > > > > > > > about StateStoreContext: > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > It does *not* include anything > >> > > processor- or > >> > > > > > record- > >> > > > > > > > > > > > > specific, > >> > > > > > > > > > > > > > > like > >> > > > > > > > > > > > > > > > > > > > > `forward()` or any information > >> about > >> > > the > >> > > > > > "current" > >> > > > > > > > > > > > record, > >> > > > > > > > > > > > > > > which is > >> > > > > > > > > > > > > > > > > > > only a > >> > > > > > > > > > > > > > > > > > > > > well-defined in the context of > the > >> > > > > > Processor. Processors > >> > > > > > > > > > > > > > > process > >> > > > > > > > > > > > > > > > > one > >> > > > > > > > > > > > > > > > > > > record > >> > > > > > > > > > > > > > > > > > > > > at a time, but state stores may > be > >> > > used to > >> > > > > > store and > >> > > > > > > > > > > > fetch > >> > > > > > > > > > > > > many > >> > > > > > > > > > > > > > > > > > > records, so > >> > > > > > > > > > > > > > > > > > > > > there is no "current record". > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > I totally agree that > >> record-specific or > >> > > > > > processor-specific > >> > > > > > > > > > > > > > > context > >> > > > > > > > > > > > > > > > > in a > >> > > > > > > > > > > > > > > > > > > > state store is often not > >> well-defined > >> > > and it > >> > > > > > would be good > >> > > > > > > > > > > > to > >> > > > > > > > > > > > > > > > > separate > >> > > > > > > > > > > > > > > > > > > that > >> > > > > > > > > > > > > > > > > > > > out, but sometimes it (at least > >> > > > > > record-specific context) is > >> > > > > > > > > > > > > > > actually > >> > > > > > > > > > > > > > > > > > > > useful, for example, passing the > >> record's > >> > > > > > timestamp through > >> > > > > > > > > > > > > to > >> > > > > > > > > > > > > > > the > >> > > > > > > > > > > > > > > > > > > > underlying storage (or changelog > >> topic): > >> > > > > > > > > > > > > > > > > > > > > >> > > > >> > https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java#L121 > >> > > > > > > > > > > > > > > > > > > > You could have the writer client > of > >> the > >> > > state > >> > > > > > store pass > >> > > > > > > > > > > > this > >> > > > > > > > > > > > > > > > > through, > >> > > > > > > > > > > > > > > > > > > but > >> > > > > > > > > > > > > > > > > > > > it would be nice to be able to > write > >> > > state > >> > > > > > stores where the > >> > > > > > > > > > > > > > > client > >> > > > > > > > > > > > > > > > > did > >> > > > > > > > > > > > > > > > > > > not > >> > > > > > > > > > > > > > > > > > > > have this responsibility. I'm not > >> sure > >> > > if the > >> > > > > > solution is > >> > > > > > > > > > > > > to add > >> > > > > > > > > > > > > > > > > some > >> > > > > > > > > > > > > > > > > > > > things back to StateStoreContext, > or > >> > > make yet > >> > > > > > another > >> > > > > > > > > > > > context > >> > > > > > > > > > > > > > > that > >> > > > > > > > > > > > > > > > > > > > represents record-specific context > >> while > >> > > > > > inside a state > >> > > > > > > > > > > > > store. > >> > > > > > > > > > > > > > > > > > > > Best, > >> > > > > > > > > > > > > > > > > > > > Paul > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > On Wed, Sep 9, 2020 at 5:43 PM > John > >> > > Roesler < > >> > > > > > > > > > > > > j...@vvcephei.org> > >> > > > > > > > > > > > > > > > > wrote: > >> > > > > > > > > > > > > > > > > > > > > Hello all, > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > I've been slowly pushing KIP-478 > >> > > forward > >> > > > > > over the last > >> > > > > > > > > > > > > year, > >> > > > > > > > > > > > > > > > > > > > > and I'm happy to say that we're > >> making > >> > > good > >> > > > > > progress now. > >> > > > > > > > > > > > > > > > > > > > > However, several issues with the > >> > > original > >> > > > > > design have > >> > > > > > > > > > > > come > >> > > > > > > > > > > > > > > > > > > > > to light. > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > The major changes: > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > We discovered that the original > >> plan > >> > > of just > >> > > > > > adding > >> > > > > > > > > > > > generic > >> > > > > > > > > > > > > > > > > > > > > parameters to ProcessorContext > >> was too > >> > > > > > disruptive, so we > >> > > > > > > > > > > > > are > >> > > > > > > > > > > > > > > > > > > > > now adding a new > >> api.ProcessorContext. > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > That choice forces us to add a > new > >> > > > > > StateStore.init method > >> > > > > > > > > > > > > > > > > > > > > for the new context, but > >> > > ProcessorContext > >> > > > > > really isn't > >> > > > > > > > > > > > > ideal > >> > > > > > > > > > > > > > > > > > > > > for state stores to begin with, > >> so I'm > >> > > > > > proposing a new > >> > > > > > > > > > > > > > > > > > > > > StateStoreContext for this > >> purpose. In > >> > > a > >> > > > > > nutshell, there > >> > > > > > > > > > > > > are > >> > > > > > > > > > > > > > > > > > > > > quite a few methods in > >> > > ProcessorContext that > >> > > > > > actually > >> > > > > > > > > > > > > should > >> > > > > > > > > > > > > > > > > > > > > never be called from inside a > >> > > StateStore. > >> > > > > > > > > > > > > > > > > > > > > Also, since there is a new > >> > > ProcessorContext > >> > > > > > interface, we > >> > > > > > > > > > > > > > > > > > > > > need a new MockProcessorContext > >> > > > > > implementation in the > >> > > > > > > > > > > > test- > >> > > > > > > > > > > > > > > > > > > > > utils module. > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > The changeset for the KIP > >> document is > >> > > here: > >> > > > > > > > > > > > > > > > > > > > > > >> > > > >> > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=14&selectedPageVersions=10 > >> > > > > > > > > > > > > > > > > > > > > And the KIP itself is here: > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-478+-+Strongly+typed+Processor+API > >> > > > > > > > > > > > > > > > > > > > > If you have any concerns, please > >> let > >> > > me know! > >> > > > > > > > > > > > > > > > > > > > > Thanks, > >> > > > > > > > > > > > > > > > > > > > > -John > >> > > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > >> > >> >