FWIW, while I'm really not a fan of Optional in general, I agree that its usage here seems appropriate. Even for those rare software developers who carefully read all the docs several times over, I think it wouldn't be too hard to miss a note about the RecordMetadata possibly being null.
Especially because it's not that obvious why at first glance, and takes a bit of thinking to realize that records originating from a Punctuator wouldn't have a "current record". This is something that has definitely confused users today. It's on us to improve the education here -- and an Optional<RecordMetadata> would naturally raise awareness of this subtlety On Tue, Sep 29, 2020 at 7:40 PM Sophie Blee-Goldman <sop...@confluent.io> wrote: > Does my reply address your concerns? > > > Yes; also, I definitely misread part of the proposal earlier and thought > you had put > the timestamp field in RecordMetadata. Sorry for not giving things a > closer look > before responding! I'm not sure my original message made much sense given > the misunderstanding, but thanks for responding anyway :P > > Having given the proposal a second pass, I agree, it's very elegant. +1 > > On Tue, Sep 29, 2020 at 6:50 PM John Roesler <vvcep...@apache.org> wrote: > >> Thanks for the reply, Sophie, >> >> I think I may have summarized too much in my prior reply. >> >> In the currently proposed KIP, any caller of forward() must >> supply a Record, which consists of: >> * key >> * value >> * timestamp >> * headers (with a convenience constructor that sets empty >> headers) >> >> These aren't what I was referring to as potentially being >> undefined downstream, since thanks to the introduction of >> Record, they are, as you're advocating, required to be >> defined everywhere, even when forwarding from a punctuator. >> >> So to be clear, the intent of this change is actually to >> _enforce_ that timestamp would never be undefined (which it >> currently can be). Also, since punctuators _are_ going to >> have to "make up" a timestamp going forward, we should note >> that the "punctuate" method currently passes in a good >> timestamp that they can use: for system-time punctuations, >> they receive the current system time, and for stream-time >> punctuations, they get the current stream time. >> >> The potentially undefined RecordMetadata only contains these >> fields: >> * topic >> * partition >> * offset >> >> These fields aren't required (or even used) in a Sink, and >> it doesn't seem like they would be important to many >> applications. Furthermore, it doesn't _seem_ like you'd even >> want to set these fields. They seem purely informational and >> only useful in the context when you are actually processing >> a real input record. It doesn't sound like you were asking >> for it, but just to put it on the record, I think if we were >> to require values for the metadata from punctuators, people >> would mostly just make up their own dummy values, to no >> one's benefit. >> >> I should also note that with the current >> Record/RecordMetadata split, we will have the freedom to >> move fields into the Record class (or even add new fields) >> if we want them to become "data" as opposed to "metadata" in >> the future. >> >> Thanks for your reply; I was similarly floored when I >> realized the true nature of the current situation. Does my >> reply address your concerns? >> >> Thanks, >> -John >> >> On Tue, 2020-09-29 at 18:34 -0700, Sophie Blee-Goldman >> wrote: >> > > However, the record metadata is only defined when the parent forwards >> > > while processing a >> > >> > real record, not when it calls forward from the punctuator >> > >> > >> > Can we take a step back for a second...why wouldn't you be required to >> set >> > the RecordContext >> > yourself when calling forward from a Punctuator? I think I agree with >> Paul >> > here, it seems kind of >> > absurd not to enforce that the RecordContext be present inside the >> > process() method. >> > >> > The original problem with Punctuators, as I understood it, was that all >> of >> > the RecordContext >> > fields were exposed automatically to both the Processor and any >> Punctuator, >> > due to being >> > direct methods on the ProcessorContext. We can't control which >> > ProcessorContext methods >> > someone will call from with a Punctuator vs from a Processor. The best >> we >> > could do was >> > set these "nonsense" fields to null when inside a Punctuator, or set >> them >> > to some dummy >> > values as you pointed out. >> > >> > But then you proposed the solution of a separate RecordContext which is >> not >> > attached to the >> > ProcessorContext at all. This seemed to solve the above problem very >> > neatly: we only pass >> > in the RecordContext to the process() method, so we don't have to worry >> > about people trying >> > to access these fields from within a Punctuator. The fields aren't >> > accessible unless they're >> > defined. >> > >> > So what happens when someone wants to forward something from within a >> > Punctuator? I >> > don't think it's reasonable to let the timestamp field be undefined, >> ever. >> > What if the Punctuator >> > forwards directly to a sink, or directly to some windowing logic. Are we >> > supposed to add >> > handling for the RecordContext == null case to every processor? Or are >> we >> > just going to >> > assume the implicit restriction that users will only forward records >> from a >> > Punctuator to >> > downstream processors that know how to handle and/or set the >> RecordContext >> > if it's >> > undefined. That seems to throw away a lot of the awesome safety added in >> > this KIP >> > >> > Apologies for the rant. But I feel pretty strongly that allowing to >> forward >> > records from a >> > Punctuator without a defined RecordContext would be asking for trouble. >> > Imo, if you >> > want to forward from a Punctuator, you need to store the info you need >> in >> > order to >> > set the timestamp, or make one up yourself >> > >> > (the one alternative I can think of here is that maybe we could pass in >> the >> > current >> > partition time, so users can at least put in a reasonable estimate for >> the >> > timestamp >> > that won't cause it to get dropped and won't potentially lurch the >> > streamtime far into >> > the future. This would be similar to what we do in the >> TimestampExtractor) >> > >> > On Tue, Sep 29, 2020 at 6:06 PM John Roesler <vvcep...@apache.org> >> wrote: >> > >> > > Oh, I guess one other thing I should have mentioned is that I’ve >> recently >> > > discovered that in cases where the context is undefined, we currently >> just >> > > fill in dummy values for the context. So there’s a good chance that >> real >> > > applications in use are depending on undefined context without even >> > > realizing it. What I’m hoping to do is just make the situation >> explicit and >> > > get rid of the dummy values. >> > > >> > > Thanks, >> > > John >> > > >> > > On Tue, Sep 29, 2020, at 20:01, John Roesler wrote: >> > > > Thanks for the review, Paul! >> > > > >> > > > I had read some of that debate before. There seems to be some >> subtext >> > > > there, because they advise against using Optional in cases like >> this, >> > > > but there doesn’t seem to be a specific reason why it’s >> inappropriate. >> > > > I got the impression they were just afraid that people would go >> > > > overboard and make everything Optional. >> > > > >> > > > I could also make two methods, but it seemed like it might be an >> > > > unfortunate way to handle the issue, since Processor is just about a >> > > > Function as-is, but the two-method approach would require people to >> > > > implement both methods. >> > > > >> > > > To your question, this is something that’s only recently became >> clear >> > > > to me. Imagine you have a parent processor that calls forward both >> from >> > > > process and a punctuator. The child will have process() invoked in >> both >> > > > cases, and won’t be able to distinguish them. However, the record >> > > > metadata is only defined when the parent forwards while processing a >> > > > real record, not when it calls forward from the punctuator. >> > > > >> > > > This is why I wanted to make the metadata Optional, to advertise >> that >> > > > the metadata might be undefined if any ancestor processor ever calls >> > > > forward from a punctuator. We could remove the Optional and instead >> > > > just document that the argument might be null. >> > > > >> > > > With that context in place, what’s your take? >> > > > >> > > > Thanks, >> > > > John >> > > > >> > > > On Tue, Sep 29, 2020, at 19:09, Paul Whalen wrote: >> > > > > Looks pretty good to me, though the Processor#process(Record, >> > > > > Optional<RecordMetadata>) signature caught my eye. There's some >> > > debate >> > > > > ( >> > > > > >> > > >> https://stackoverflow.com/questions/31922866/why-should-java-8s-optional-not-be-used-in-arguments >> > > ) >> > > > > about whether to use Optionals in arguments, and while that's a >> bit of >> > > a >> > > > > religious debate in the abstract, it did make me wonder whether it >> > > makes >> > > > > sense in this specific case. When is it actually not present? I >> was >> > > > > under >> > > > > the impression that we should always have access to it in >> process(), >> > > and >> > > > > that the concern about metadata being undefined was about having >> > > access >> > > > > to >> > > > > record metadata in the ProcessorContext held for use inside a >> > > > > Punctuator. >> > > > > >> > > > > If that's not the case and it is truly optional in process(), is >> there >> > > an >> > > > > opportunity for an alternate interface for the cases when we >> don't get >> > > it, >> > > > > rather than force the branching on implementers of the interface? >> > > > > >> > > > > Apologies if I've missed something, I took a look at the PR and I >> > > didn't >> > > > > see any spots where I thought it would be empty. Perhaps an >> example >> > > of a >> > > > > Punctuator using (and not using) the new API would clear things >> up. >> > > > > >> > > > > Best, >> > > > > Paul >> > > > > >> > > > > On Tue, Sep 29, 2020 at 4:10 PM John Roesler <vvcep...@apache.org >> > >> > > wrote: >> > > > > > Hello again, all, >> > > > > > >> > > > > > Thanks for the latest round of discussion. I've taken the >> > > > > > recent feedback and come up with an updated KIP that seems >> > > > > > actually quite a bit nicer than the prior proposal. >> > > > > > >> > > > > > The specific diff on the KIP is here: >> > > > > > >> > > > > > >> > > >> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=15&selectedPageVersions=14 >> > > > > > These changes are implemented in this POC PR: >> > > > > > https://github.com/apache/kafka/pull/9346 >> > > > > > >> > > > > > The basic idea is that, building on the recent conversaion, >> > > > > > we would transition away from the current API where we get >> > > > > > only key/value in the process() method and other "data" >> > > > > > comes in the ProcessorContext along with the "metadata". >> > > > > > >> > > > > > Instead, we formalize what is "data" and what is "metadata", >> > > > > > and pass it all in to the process method: >> > > > > > Processor#process(Record, Optional<RecordMetadata>) >> > > > > > >> > > > > > Also, you forward the whole data class instead of mutating >> > > > > > the ProcessorContext fields and also calling forward: >> > > > > > ProcessorContext#forward(Record) >> > > > > > >> > > > > > The Record class itself ships with methods like >> > > > > > record#withValue(NewV newValue) >> > > > > > that make a shallow copy of the input Record, enabling >> > > > > > Processors to safely handle the record without polluting the >> > > > > > context of their parents and siblings. >> > > > > > >> > > > > > This proposal has a number of key benefits: >> > > > > > * As we've discovered in KAFKA-9584, it's unsafe to mutate >> > > > > > the Headers via the ProcessorContext. This proposal offers a >> > > > > > way to safely forward changes only to downstream processors. >> > > > > > * The new API has symmetry (each processor's input is the >> > > > > > output of its parent processor) >> > > > > > * The API makes clear that the record metadata isn't always >> > > > > > defined (for example, in a punctuation, there is no current >> > > > > > topic/partition/offset) >> > > > > > * The API enables punctuators to forward well defined >> > > > > > headers downstream, which is currently not possible. >> > > > > > >> > > > > > Unless their are objections, I'll go ahead and re-finalize >> > > > > > this KIP and update that PR to a mergeable state. >> > > > > > >> > > > > > Thanks, all, >> > > > > > -John >> > > > > > >> > > > > > >> > > > > > On Thu, 2020-09-24 at 09:41 -0700, Matthias J. Sax wrote: >> > > > > > > Interesting proposal. However, I am not totally convinced, >> because >> > > I see >> > > > > > > a fundamental difference between "data" and "metadata". >> > > > > > > >> > > > > > > Topic/partition/offset are "metadata" in the strong sense and >> they >> > > are >> > > > > > > immutable. >> > > > > > > >> > > > > > > On the other hand there is "primary" data like key and value, >> as >> > > well as >> > > > > > > "secondary" data like timestamp and headers. The issue seems >> that >> > > we >> > > > > > > treat "secondary data" more like metadata atm? >> > > > > > > >> > > > > > > Thus, promoting timestamp and headers into a first class >> citizen >> > > roll >> > > > > > > make sense to me (my original proposal about `RecordContext` >> would >> > > still >> > > > > > > fall short with this regard). However, putting both (data and >> > > metadata) >> > > > > > > into a `Record` abstraction might go too far? >> > > > > > > >> > > > > > > I am also a little bit concerned about `Record.copy()` >> because it >> > > might >> > > > > > > be a trap: Users might assume it does a full deep copy of the >> > > record, >> > > > > > > however, it would not. It would only create a new `Record` >> object >> > > as >> > > > > > > wrapper that points to the same key/value/header objects as >> the >> > > input >> > > > > > > record. >> > > > > > > >> > > > > > > With the current `context.forward(key, value)` we don't have >> this >> > > "deep >> > > > > > > copy" issue -- it's pretty clear what is happening. >> > > > > > > >> > > > > > > Instead of `To.all().withTimestamp()` we could also add >> > > > > > > `context.forward(key, value, timestamp)` etc (just wondering >> about >> > > the >> > > > > > > exposition in overload)? >> > > > > > > >> > > > > > > Also, `Record.withValue` etc sounds odd? Should a record not >> be >> > > > > > > immutable? So, we could have something like >> > > > > > > >> > > > > > > >> > > >> `RecordFactory.withKeyValue(...).withTimestamp(...).withHeaders(...).build()`. >> > > > > > > But it looks rather verbose? >> > > > > > > >> > > > > > > The other question is of course, to what extend to we want to >> keep >> > > the >> > > > > > > distinction between "primary" and "secondary" data? To me, >> it's a >> > > > > > > question of easy of use? >> > > > > > > >> > > > > > > Just putting all this out to move the discussion forward. >> Don't >> > > have a >> > > > > > > concrete proposal atm. >> > > > > > > >> > > > > > > >> > > > > > > -Matthias >> > > > > > > >> > > > > > > >> > > > > > > On 9/14/20 9:24 AM, John Roesler wrote: >> > > > > > > > Thanks for this thought, Matthias! >> > > > > > > > >> > > > > > > > To be honest, it's bugged me quite a bit that _all_ the >> > > > > > > > record information hasn't been an argument to `process`. I >> > > > > > > > suppose I was trying to be conservative in this proposal, >> > > > > > > > but then again, if we're adding new Processor and >> > > > > > > > ProcessorContext interfaces, then this is the time to make >> > > > > > > > such a change. >> > > > > > > > >> > > > > > > > To be unambiguous, I think this is what we're talking about: >> > > > > > > > ProcessorContext: >> > > > > > > > * applicationId >> > > > > > > > * taskId >> > > > > > > > * appConfigs >> > > > > > > > * appConfigsWithPrefix >> > > > > > > > * keySerde >> > > > > > > > * valueSerde >> > > > > > > > * stateDir >> > > > > > > > * metrics >> > > > > > > > * schedule >> > > > > > > > * commit >> > > > > > > > * forward >> > > > > > > > >> > > > > > > > StateStoreContext: >> > > > > > > > * applicationId >> > > > > > > > * taskId >> > > > > > > > * appConfigs >> > > > > > > > * appConfigsWithPrefix >> > > > > > > > * keySerde >> > > > > > > > * valueSerde >> > > > > > > > * stateDir >> > > > > > > > * metrics >> > > > > > > > * register >> > > > > > > > >> > > > > > > > >> > > > > > > > RecordContext >> > > > > > > > * topic >> > > > > > > > * partition >> > > > > > > > * offset >> > > > > > > > * timestamp >> > > > > > > > * headers >> > > > > > > > >> > > > > > > > >> > > > > > > > Your proposal sounds good to me as-is. Just to cover the >> > > > > > > > bases, though, I'm wondering if we should push the idea just >> > > > > > > > a little farther. Instead of decomposing key,value,context, >> > > > > > > > we could just keep them all in one object, like this: >> > > > > > > > >> > > > > > > > Record: >> > > > > > > > * key >> > > > > > > > * value >> > > > > > > > * topic >> > > > > > > > * partition >> > > > > > > > * offset >> > > > > > > > * timestamp >> > > > > > > > * headers >> > > > > > > > >> > > > > > > > Then, we could have: >> > > > > > > > Processor#process(Record) >> > > > > > > > ProcessorContext#forward(Record, To) >> > > > > > > > >> > > > > > > > Viewed from this perspective, a record has three properties >> > > > > > > > that people may specify in their processors: key, value, and >> > > > > > > > timestamp. >> > > > > > > > >> > > > > > > > We could deprecate `To#withTimestamp` and enable people to >> > > > > > > > specify the timestamp along with the key and value when they >> > > > > > > > forward a record. >> > > > > > > > >> > > > > > > > E.g., >> > > > > > > > RecordBuilder toForward = RecordBuilder.copy(record) >> > > > > > > > toForward.withKey(newKey) >> > > > > > > > toForward.withValue(newValue) >> > > > > > > > toForward.withTimestamp(newTimestamp) >> > > > > > > > Record newRecord = toForward.build() >> > > > > > > > context.forward(newRecord, To.child("child1")) >> > > > > > > > >> > > > > > > > Or, the more compact common case: >> > > > > > > > current: >> > > > > > > > context.forward(key, "newValue") >> > > > > > > > proposed: >> > > > > > > > context.forward(copy(record).withValue("newValue").build()) >> > > > > > > > >> > > > > > > > >> > > > > > > > It's slightly more verbose, but also more extensible. This >> > > > > > > > would give us a clean path to add header support in PAPI as >> > > > > > > > well, simply by adding `withHeaders` in RecordBuilder. >> > > > > > > > >> > > > > > > > It's also more symmetrical, since the recipient of `forward` >> > > > > > > > would just get the sent `Record`. Whereas today, the sender >> > > > > > > > puts the timestamp in `To`, but the recipient gets in in its >> > > > > > > > own `ProcessorContext`. >> > > > > > > > >> > > > > > > > WDYT? >> > > > > > > > -John >> > > > > > > > >> > > > > > > > On Fri, 2020-09-11 at 12:30 -0700, Matthias J. Sax wrote: >> > > > > > > > > I think separating the different contexts make sense. >> > > > > > > > > >> > > > > > > > > In fact, we could even go one step further and remove the >> > > record >> > > > > > context >> > > > > > > > > from the processor context completely and we add a third >> > > parameter to >> > > > > > > > > `process(key, value, recordContext)`. This would make it >> clear >> > > that >> > > > > > the >> > > > > > > > > context is for the input record only and it's not >> possible to >> > > pass >> > > > > > it to >> > > > > > > > > a `punctuate` callback. >> > > > > > > > > >> > > > > > > > > For the stores and changelogging: I think there are two >> cases. >> > > (1) >> > > > > > You >> > > > > > > > > use a plain key-value store. For this case, it seems you >> do >> > > not care >> > > > > > > > > about the timestamp and thus does not care what timestamp >> is >> > > set in >> > > > > > the >> > > > > > > > > changelog records. (We can set anything we want, as it's >> not >> > > > > > relevant at >> > > > > > > > > all -- the timestamp is ignored on read anyway.) (2) The >> other >> > > case >> > > > > > is, >> > > > > > > > > that one does care about timestamps, and for this case >> should >> > > use >> > > > > > > > > TimestampedKeyValueStore. The passed timestamp will be >> set on >> > > the >> > > > > > > > > changelog records for this case. >> > > > > > > > > >> > > > > > > > > Thus, for both cases, accessing the record context does >> not >> > > seems to >> > > > > > be >> > > > > > > > > a requirement. And providing access to the processor >> context >> > > to, eg., >> > > > > > > > > `forward()` or similar seems safe. >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > -Matthias >> > > > > > > > > >> > > > > > > > > On 9/10/20 7:25 PM, John Roesler wrote: >> > > > > > > > > > Thanks for the reply, Paul! >> > > > > > > > > > >> > > > > > > > > > I certainly intend to make sure that the changelogging >> layer >> > > > > > > > > > continues to work the way it does now, by hook or by >> crook. >> > > > > > > > > > I think the easiest path for me is to just "cheat" and >> get >> > > > > > > > > > the real ProcessorContext into the ChangeLoggingStore >> > > > > > > > > > implementation somehow. I'll tag you on the PR when I >> create >> > > > > > > > > > it, so you have an opportunity to express a preference >> about >> > > > > > > > > > the implementation choice, and maybe even compile/test >> > > > > > > > > > against it to make sure your stuff still works. >> > > > > > > > > > >> > > > > > > > > > Regarding this: >> > > > > > > > > > >> > > > > > > > > > > we have an interest in making a state store with a >> richer >> > > > > > > > > > > way of querying its data (like perhaps getting all >> values >> > > > > > > > > > > associated with a secondary key), while still >> ultimately >> > > > > > > > > > > writing to the changelog topic for later restoration. >> > > > > > > > > > >> > > > > > > > > > This is very intriguing to me. On the side, I've been >> > > > > > > > > > preparing a couple of ideas related to this topic. I >> don't >> > > > > > > > > > think I have a coherent enough thought to even express >> it in >> > > > > > > > > > a Jira right now, but when I do, I'll tag you on it >> also to >> > > > > > > > > > see what you think. >> > > > > > > > > > >> > > > > > > > > > Whenever you're ready to share the usability improvement >> > > > > > > > > > ideas, I'm very interested to see what you've come up >> with. >> > > > > > > > > > >> > > > > > > > > > Thanks, >> > > > > > > > > > -John >> > > > > > > > > > >> > > > > > > > > > On Thu, 2020-09-10 at 21:02 -0500, Paul Whalen wrote: >> > > > > > > > > > > > when you use a HashMap or RocksDB or other "state >> > > stores", you >> > > > > > don't >> > > > > > > > > > > > expect them to automatically know extra stuff about >> the >> > > record >> > > > > > you're >> > > > > > > > > > > > storing. >> > > > > > > > > > > >> > > > > > > > > > > So, I don't think there is any reason we *can't* >> retain the >> > > > > > record context >> > > > > > > > > > > > in the StateStoreContext, and if any users came >> along >> > > with a >> > > > > > clear use case >> > > > > > > > > > > > I'd find that convincing. >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > I agree with the principle of being conservative with >> the >> > > > > > StateStoreContext >> > > > > > > > > > > API. Regarding user expectations or a clear use >> case, the >> > > only >> > > > > > > > > > > counterpoint I would offer is that we sort of have >> that >> > > use case >> > > > > > already, >> > > > > > > > > > > which is the example I gave of the change logging >> store >> > > using the >> > > > > > > > > > > timestamp. I am curious if this functionality will be >> > > retained >> > > > > > when using >> > > > > > > > > > > built in state stores, or will a low-level processor >> get a >> > > > > > KeyValueStore >> > > > > > > > > > > that no longer writes to the changelog topic with the >> > > record's >> > > > > > timestamp. >> > > > > > > > > > > While I personally don't care much about that >> functionality >> > > > > > specifically, I >> > > > > > > > > > > have a general desire for custom state stores to >> easily do >> > > the >> > > > > > things that >> > > > > > > > > > > built in state stores do. >> > > > > > > > > > > >> > > > > > > > > > > It genuinely did not occur to me that users might be >> > > looking up >> > > > > > and/or >> > > > > > > > > > > > updating records of other keys from within a >> Processor. >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > I'm glad you said this Sophie, because it gives me an >> > > > > > opportunity to say >> > > > > > > > > > > that this is actually a *huge* use case for my team. >> The >> > > state >> > > > > > store >> > > > > > > > > > > usability improvements I was referring to in my >> previous >> > > message >> > > > > > were about >> > > > > > > > > > > enabling the user to write custom stores while still >> easily >> > > > > > hooking into >> > > > > > > > > > > the ability to write to a changelog topic. I think >> that is >> > > > > > technically >> > > > > > > > > > > possible now, but I don't think it's trivial. >> > > Specifically, we >> > > > > > have an >> > > > > > > > > > > interest in making a state store with a richer way of >> > > querying >> > > > > > its data >> > > > > > > > > > > (like perhaps getting all values associated with a >> > > secondary >> > > > > > key), while >> > > > > > > > > > > still ultimately writing to the changelog topic for >> later >> > > > > > restoration. >> > > > > > > > > > > We recognize that this use case throws away some of >> what >> > > kafka >> > > > > > streams >> > > > > > > > > > > (especially the DSL) is good at - easy >> parallelizability by >> > > > > > partitioning >> > > > > > > > > > > all processing by key - and that our business logic >> would >> > > > > > completely fall >> > > > > > > > > > > apart if we were consuming from multi-partition >> topics with >> > > > > > multiple >> > > > > > > > > > > consumers. But we have found that using the low level >> > > processor >> > > > > > API is >> > > > > > > > > > > good for the very simple stream processing primitives >> it >> > > > > > provides: handling >> > > > > > > > > > > the plumbing of consuming from multiple kafka topics >> and >> > > > > > potentially >> > > > > > > > > > > updating persistent local state in a reliable way. >> That in >> > > > > > itself has >> > > > > > > > > > > proven to be a worthwhile programming model. >> > > > > > > > > > > >> > > > > > > > > > > Since I got off track a bit, let me summarize: I don't >> > > > > > particularly care >> > > > > > > > > > > about the record context being available to state >> store >> > > > > > implementations, >> > > > > > > > > > > and I think this KIP is headed in the right direction >> in >> > > that >> > > > > > regard. But >> > > > > > > > > > > more generally, I wanted to express the importance of >> > > > > > maintaining a >> > > > > > > > > > > powerful and flexible StateStore interface. >> > > > > > > > > > > >> > > > > > > > > > > Thanks! >> > > > > > > > > > > Paul >> > > > > > > > > > > >> > > > > > > > > > > On Thu, Sep 10, 2020 at 6:11 PM Sophie Blee-Goldman < >> > > > > > sop...@confluent.io> >> > > > > > > > > > > wrote: >> > > > > > > > > > > >> > > > > > > > > > > > Aha, I did misinterpret the example in your previous >> > > response >> > > > > > regarding the >> > > > > > > > > > > > range query after all. I thought you just meant a >> > > time-range >> > > > > > query inside a >> > > > > > > > > > > > punctuator. It genuinely did not occur to me that >> users >> > > might >> > > > > > be looking up >> > > > > > > > > > > > and/or updating records of other keys from within a >> > > Processor. >> > > > > > Sorry for >> > > > > > > > > > > > being closed minded >> > > > > > > > > > > > >> > > > > > > > > > > > I won't drag out this discussion any further by >> asking >> > > whether >> > > > > > that might >> > > > > > > > > > > > be >> > > > > > > > > > > > a valid use case or just a lurking bug in itself :) >> > > > > > > > > > > > >> > > > > > > > > > > > Thanks for humoring me. The current proposal for >> KIP-478 >> > > > > > sounds good to me >> > > > > > > > > > > > On Thu, Sep 10, 2020 at 3:43 PM John Roesler < >> > > > > > vvcep...@apache.org> wrote: >> > > > > > > > > > > > > Ah, thanks Sophie, >> > > > > > > > > > > > > >> > > > > > > > > > > > > I'm sorry for misinterpreting your resonse. Yes, >> we >> > > > > > > > > > > > > absolutely can and should clear the context before >> > > > > > > > > > > > > punctuating. >> > > > > > > > > > > > > >> > > > > > > > > > > > > My secondary concern is maybe more far-fetched. I >> was >> > > > > > > > > > > > > thinking that inside process(key,value), a >> Processor >> > > might >> > > > > > > > > > > > > do a get/put of a _different_ key. Consider, for >> > > example, >> > > > > > > > > > > > > the way that Suppress processors work. When they >> get a >> > > > > > > > > > > > > record, they add it to the store and then do a >> range >> > > scan >> > > > > > > > > > > > > and possibly forward a _different_ record. Of >> course, >> > > this >> > > > > > > > > > > > > is an operation that is deeply coupled to the >> > > internals, and >> > > > > > > > > > > > > the Suppress processor accordingly actually does >> get >> > > access >> > > > > > > > > > > > > to the internal context so that it can set the >> context >> > > > > > > > > > > > > before forwarding. >> > > > > > > > > > > > > >> > > > > > > > > > > > > Still, it seems like I've had a handful of >> > > conversations >> > > > > > > > > > > > > with people over the years in which they tell me >> they >> > > are >> > > > > > > > > > > > > using state stores in a way that transcends the >> "get >> > > and put >> > > > > > > > > > > > > the currently processing record" access pattern. I >> > > doubt >> > > > > > > > > > > > > that those folks would even have considered the >> > > possiblity >> > > > > > > > > > > > > that the currently processing record's _context_ >> could >> > > > > > > > > > > > > pollute their state store operations, as I myself >> > > never gave >> > > > > > > > > > > > > it a second thought until the current conversation >> > > began. In >> > > > > > > > > > > > > cases like that, we have actually set a trap for >> these >> > > > > > > > > > > > > people, and it seems better to dismantle the trap. >> > > > > > > > > > > > > >> > > > > > > > > > > > > As you noted, really the only people who would be >> > > negatively >> > > > > > > > > > > > > impacted are people who implement their own state >> > > stores. >> > > > > > > > > > > > > These folks will get the deprecation warning and >> try to >> > > > > > > > > > > > > adapt their stores to the new interface. If they >> needed >> > > > > > > > > > > > > access to the record context, they would find >> it's now >> > > > > > > > > > > > > missing. They'd ask us about it, and we'd have the >> > > ability >> > > > > > > > > > > > > to explain the lurking bug that they have had in >> their >> > > > > > > > > > > > > stores all along, as well as the new recommended >> > > pattern >> > > > > > > > > > > > > (just pass everything you need in the value). If >> that's >> > > > > > > > > > > > > unsatisfying, _then_ we should consider amending >> the >> > > API. >> > > > > > > > > > > > > Thanks, >> > > > > > > > > > > > > -John >> > > > > > > > > > > > > >> > > > > > > > > > > > > On Thu, 2020-09-10 at 15:21 -0700, Sophie >> Blee-Goldman >> > > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > > Regarding your first sentence, "...the >> processor >> > > would >> > > > > > null >> > > > > > > > > > > > > > > out the record context...", this is not >> possible, >> > > since >> > > > > > the >> > > > > > > > > > > > > > > processor doesn't have write access to the >> > > context. We >> > > > > > could >> > > > > > > > > > > > > > > add it, >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > Sorry, this was poorly phrased, I definitely >> did not >> > > mean >> > > > > > to imply that >> > > > > > > > > > > > > we >> > > > > > > > > > > > > > should make the context modifiable by the >> Processors >> > > > > > themselves. I >> > > > > > > > > > > > meant >> > > > > > > > > > > > > > this should be handled by the internal >> processing >> > > > > > framework that deals >> > > > > > > > > > > > > with >> > > > > > > > > > > > > > passing records from one Processor to the next, >> > > setting >> > > > > > the record >> > > > > > > > > > > > > context >> > > > > > > > > > > > > > when a new record is picked up, invoking the >> > > punctuators, >> > > > > > etc. I >> > > > > > > > > > > > believe >> > > > > > > > > > > > > > this >> > > > > > > > > > > > > > all currently happens in the StreamTask? It >> already >> > > can >> > > > > > and does >> > > > > > > > > > > > > overwrite >> > > > > > > > > > > > > > the record context as new records are >> processed, and >> > > is >> > > > > > also >> > > > > > > > > > > > responsible >> > > > > > > > > > > > > > for calling the punctuators, so it doesn't seem >> like >> > > a >> > > > > > huge leap to >> > > > > > > > > > > > just >> > > > > > > > > > > > > say >> > > > > > > > > > > > > > "null out the current record before punctuating" >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > To clarify, I was never advocating or even >> > > considering to >> > > > > > give the >> > > > > > > > > > > > > > Processors >> > > > > > > > > > > > > > write access to the record context. Sorry if my >> last >> > > > > > message (or all of >> > > > > > > > > > > > > > them) >> > > > > > > > > > > > > > was misleading. I just wanted to point out that >> the >> > > > > > punctuator concern >> > > > > > > > > > > > is >> > > > > > > > > > > > > > orthogonal to the question of whether we should >> > > include >> > > > > > the record >> > > > > > > > > > > > > context >> > > > > > > > > > > > > > in the StateStoreContext. It's definitely a real >> > > problem, >> > > > > > but it's a >> > > > > > > > > > > > > > problem >> > > > > > > > > > > > > > that exists at the Processor level and not just >> the >> > > > > > StateStore. >> > > > > > > > > > > > > > So, I don't think there is any reason we *can't* >> > > retain >> > > > > > the record >> > > > > > > > > > > > > context >> > > > > > > > > > > > > > in the >> > > > > > > > > > > > > > StateStoreContext, and if any users came along >> with a >> > > > > > clear use case >> > > > > > > > > > > > I'd >> > > > > > > > > > > > > > find >> > > > > > > > > > > > > > that convincing. In the absence of any >> examples, the >> > > > > > conservative >> > > > > > > > > > > > > approach >> > > > > > > > > > > > > > sounds good to me. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > If it turns out that someone did need the record >> > > context >> > > > > > in their >> > > > > > > > > > > > custom >> > > > > > > > > > > > > > state >> > > > > > > > > > > > > > store, I'm sure they'll submit a politely >> worded bug >> > > > > > report alerting us >> > > > > > > > > > > > > > that we >> > > > > > > > > > > > > > broke their application. >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > On Thu, Sep 10, 2020 at 3:05 PM John Roesler < >> > > > > > vvcep...@apache.org> >> > > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > > Thanks, Sophie, >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Yes, now that you point it out, I can see >> that the >> > > record >> > > > > > > > > > > > > > > context itself should be nulled out by Streams >> > > before >> > > > > > > > > > > > > > > invoking punctuators. From that perspective, >> we >> > > don't >> > > > > > need >> > > > > > > > > > > > > > > to think about the second-order problem of >> what's >> > > in the >> > > > > > > > > > > > > > > context for the state store when called from a >> > > > > > punctuator. >> > > > > > > > > > > > > > > Regarding your first sentence, "...the >> processor >> > > would >> > > > > > null >> > > > > > > > > > > > > > > out the record context...", this is not >> possible, >> > > since >> > > > > > the >> > > > > > > > > > > > > > > processor doesn't have write access to the >> > > context. We >> > > > > > could >> > > > > > > > > > > > > > > add it, but then all kinds of strange effects >> > > would ensue >> > > > > > > > > > > > > > > when downstream processors execute but the >> context >> > > is >> > > > > > empty, >> > > > > > > > > > > > > > > etc. Better to just let the framework manage >> the >> > > record >> > > > > > > > > > > > > > > context and keep it read-only for Processors. >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Reading between the lines of your last reply, >> it >> > > sounds >> > > > > > that >> > > > > > > > > > > > > > > the disconnect may just have been a mutual >> > > > > > misunderstanding >> > > > > > > > > > > > > > > about whether or not Processors currently have >> > > access to >> > > > > > set >> > > > > > > > > > > > > > > the record context. Since they do not, if we >> > > wanted to >> > > > > > add >> > > > > > > > > > > > > > > the record context to StateStoreContext in a >> > > well-defined >> > > > > > > > > > > > > > > way, we'd also have to add the ability for >> > > Processors to >> > > > > > > > > > > > > > > manipulate it. But then, we're just creating a >> > > > > > side-channel >> > > > > > > > > > > > > > > for Processors to pass some information in >> > > arguments to >> > > > > > > > > > > > > > > "put()" and other information implicitly >> through >> > > the >> > > > > > > > > > > > > > > context. It seems better just to go for a >> single >> > > channel >> > > > > > for >> > > > > > > > > > > > > > > now. >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > It sounds like you're basically in favor of >> the >> > > > > > conservative >> > > > > > > > > > > > > > > approach, and you just wanted to understand >> the >> > > blockers >> > > > > > > > > > > > > > > that I implied. Does my clarification make >> sense? >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Thanks, >> > > > > > > > > > > > > > > -John >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > On Thu, 2020-09-10 at 10:54 -0700, Sophie >> > > Blee-Goldman >> > > > > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > > > I was just thinking that the processor would >> > > null out >> > > > > > the record >> > > > > > > > > > > > > context >> > > > > > > > > > > > > > > > after it >> > > > > > > > > > > > > > > > finished processing the record, so I'm not >> sure I >> > > > > > follow why this >> > > > > > > > > > > > > would >> > > > > > > > > > > > > > > not >> > > > > > > > > > > > > > > > be >> > > > > > > > > > > > > > > > possible? AFAIK we never call a punctuator >> in the >> > > > > > middle of >> > > > > > > > > > > > > processing a >> > > > > > > > > > > > > > > > record through the topology, and even if we >> did, >> > > we >> > > > > > still know when >> > > > > > > > > > > > > it is >> > > > > > > > > > > > > > > > about >> > > > > > > > > > > > > > > > to be called and could set it to null >> beforehand. >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > I'm not trying to advocate for it here, I'm >> in >> > > > > > agreement that >> > > > > > > > > > > > > anything >> > > > > > > > > > > > > > > you >> > > > > > > > > > > > > > > > want >> > > > > > > > > > > > > > > > to access within the store can and should be >> > > accessed >> > > > > > within the >> > > > > > > > > > > > > calling >> > > > > > > > > > > > > > > > Processor/Punctuator before reaching the >> store. >> > > The >> > > > > > "we can always >> > > > > > > > > > > > > add it >> > > > > > > > > > > > > > > > later if necessary" argument is also pretty >> > > > > > convincing. Just trying >> > > > > > > > > > > > > to >> > > > > > > > > > > > > > > > understand >> > > > > > > > > > > > > > > > why this wouldn't be possible. >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > FWIW, the question of "what is the current >> > > record in >> > > > > > the context >> > > > > > > > > > > > of a >> > > > > > > > > > > > > > > > Punctuator" >> > > > > > > > > > > > > > > > exists independently of whether we want to >> add >> > > this to >> > > > > > the >> > > > > > > > > > > > > > > StateStoreContext >> > > > > > > > > > > > > > > > or not. The full ProcessorContext, >> including the >> > > > > > current record >> > > > > > > > > > > > > context, >> > > > > > > > > > > > > > > is >> > > > > > > > > > > > > > > > already available within a Punctuator, so >> > > removing the >> > > > > > current >> > > > > > > > > > > > record >> > > > > > > > > > > > > > > > context >> > > > > > > > > > > > > > > > from the StateStoreContext does not solve >> the >> > > problem. >> > > > > > Users can -- >> > > > > > > > > > > > > and >> > > > > > > > > > > > > > > have >> > > > > > > > > > > > > > > > (see KAFKA-9584 < >> > > > > > https://issues.apache.org/jira/browse/KAFKA-9584 >> > > > > > > > > > > > > ;;) >> > > > > > > > > > > > > -- >> > > > > > > > > > > > > > > hit >> > > > > > > > > > > > > > > > such subtle bugs without ever invoking a >> > > StateStore >> > > > > > > > > > > > > > > > from their punctuator. >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > Again, I think I do agree that we should >> leave >> > > the >> > > > > > current record >> > > > > > > > > > > > > context >> > > > > > > > > > > > > > > > off of >> > > > > > > > > > > > > > > > the StateStoreContext, but I don't think the >> > > > > > Punctuator argument >> > > > > > > > > > > > > against >> > > > > > > > > > > > > > > it >> > > > > > > > > > > > > > > > is >> > > > > > > > > > > > > > > > very convincing. It sounds to me like we >> need to >> > > > > > disallow access to >> > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > current >> > > > > > > > > > > > > > > > record context from within the Punctuator, >> > > independent >> > > > > > of anything >> > > > > > > > > > > > > to do >> > > > > > > > > > > > > > > > with >> > > > > > > > > > > > > > > > state stores >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > On Thu, Sep 10, 2020 at 7:12 AM John >> Roesler < >> > > > > > vvcep...@apache.org> >> > > > > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > > > > Thanks for the thoughts, Sophie. >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > I agree that the extra information could >> be >> > > useful. >> > > > > > My only >> > > > > > > > > > > > > concern is >> > > > > > > > > > > > > > > > > that it doesn’t seem like we can actually >> > > supply >> > > > > > that extra >> > > > > > > > > > > > > information >> > > > > > > > > > > > > > > > > correctly. So, then we have a situation >> where >> > > the >> > > > > > system offers >> > > > > > > > > > > > > useful >> > > > > > > > > > > > > > > API >> > > > > > > > > > > > > > > > > calls that are only correct in a narrow >> range >> > > of use >> > > > > > cases. >> > > > > > > > > > > > > Outside of >> > > > > > > > > > > > > > > > > those use cases, you get incorrect >> behavior. >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > If it were possible to null out the >> context >> > > before >> > > > > > you put a >> > > > > > > > > > > > > document >> > > > > > > > > > > > > > > to >> > > > > > > > > > > > > > > > > which the context doesn’t apply, then the >> > > concern >> > > > > > would be >> > > > > > > > > > > > > mitigated. >> > > > > > > > > > > > > > > But >> > > > > > > > > > > > > > > > > it would still be pretty weird from the >> > > perspective >> > > > > > of the store >> > > > > > > > > > > > > that >> > > > > > > > > > > > > > > > > sometimes the context is populated and >> other >> > > times, >> > > > > > it’s null. >> > > > > > > > > > > > > > > > > But that seems moot, since it doesn’t seem >> > > possible >> > > > > > to null out >> > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > context. Only the Processor could know >> whether >> > > it’s >> > > > > > about to put >> > > > > > > > > > > > a >> > > > > > > > > > > > > > > document >> > > > > > > > > > > > > > > > > different from the context or not. And it >> > > would be >> > > > > > inappropriate >> > > > > > > > > > > > to >> > > > > > > > > > > > > > > offer a >> > > > > > > > > > > > > > > > > public ProcessorContext api to manage the >> > > record >> > > > > > context. >> > > > > > > > > > > > > > > > > Ultimately, it still seems like if you >> want to >> > > store >> > > > > > headers, you >> > > > > > > > > > > > > can >> > > > > > > > > > > > > > > > > store them explicitly, right? That >> doesn’t seem >> > > > > > onerous to me, >> > > > > > > > > > > > and >> > > > > > > > > > > > > it >> > > > > > > > > > > > > > > kind >> > > > > > > > > > > > > > > > > of seems better than relying on undefined >> or >> > > > > > asymmetrical >> > > > > > > > > > > > behavior >> > > > > > > > > > > > > in >> > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > store itself. >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > Anyway, I’m not saying that we couldn’t >> solve >> > > these >> > > > > > problems. >> > > > > > > > > > > > Just >> > > > > > > > > > > > > > > that it >> > > > > > > > > > > > > > > > > seems a little that we can be >> conservative and >> > > avoid >> > > > > > them for >> > > > > > > > > > > > now. >> > > > > > > > > > > > > If >> > > > > > > > > > > > > > > it >> > > > > > > > > > > > > > > > > turns out we really need to solve them, >> we can >> > > > > > always do it >> > > > > > > > > > > > later. >> > > > > > > > > > > > > > > > > Thanks, >> > > > > > > > > > > > > > > > > John >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > On Wed, Sep 9, 2020, at 22:46, Sophie >> > > Blee-Goldman >> > > > > > wrote: >> > > > > > > > > > > > > > > > > > > If you were to call "put" from a >> > > punctuator, or >> > > > > > do a >> > > > > > > > > > > > > > > > > > > `range()` query and then update one of >> > > those >> > > > > > records with >> > > > > > > > > > > > > > > > > > > `put()`, you'd have a very subtle bug >> on >> > > your >> > > > > > hands. >> > > > > > > > > > > > > > > > > > Can you elaborate on this a bit? I agree >> > > that the >> > > > > > punctuator >> > > > > > > > > > > > > case is >> > > > > > > > > > > > > > > an >> > > > > > > > > > > > > > > > > > obvious exemption to the assumption that >> > > store >> > > > > > invocations >> > > > > > > > > > > > always >> > > > > > > > > > > > > > > > > > have a corresponding "current record", >> but I >> > > don't >> > > > > > understand >> > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > second example. Are you envisioning a >> > > scenario >> > > > > > where the >> > > > > > > > > > > > #process >> > > > > > > > > > > > > > > > > > method performs a range query and then >> > > updates >> > > > > > records? Or were >> > > > > > > > > > > > > > > > > > you just giving another example of the >> > > punctuator >> > > > > > case? >> > > > > > > > > > > > > > > > > > I only bring it up because I agree that >> the >> > > > > > current record >> > > > > > > > > > > > > > > information >> > > > > > > > > > > > > > > > > could >> > > > > > > > > > > > > > > > > > still be useful within the context of >> the >> > > store. >> > > > > > As a non-user >> > > > > > > > > > > > my >> > > > > > > > > > > > > > > input >> > > > > > > > > > > > > > > > > on >> > > > > > > > > > > > > > > > > > this >> > > > > > > > > > > > > > > > > > definitely has limited value, but it >> just >> > > isn't >> > > > > > striking me as >> > > > > > > > > > > > > > > obvious >> > > > > > > > > > > > > > > > > that >> > > > > > > > > > > > > > > > > > we >> > > > > > > > > > > > > > > > > > should remove access to the current >> record >> > > context >> > > > > > from the >> > > > > > > > > > > > state >> > > > > > > > > > > > > > > stores. >> > > > > > > > > > > > > > > > > > If there is no current record, as in the >> > > > > > punctuator case, we >> > > > > > > > > > > > > should >> > > > > > > > > > > > > > > just >> > > > > > > > > > > > > > > > > > set >> > > > > > > > > > > > > > > > > > the record context to null (or >> > > Optional.empty, >> > > > > > etc). >> > > > > > > > > > > > > > > > > > That said, the put() always has to come >> from >> > > > > > somewhere, and >> > > > > > > > > > > > that >> > > > > > > > > > > > > > > > > > somewhere is always going to be either a >> > > Processor >> > > > > > or a >> > > > > > > > > > > > > Punctuator, >> > > > > > > > > > > > > > > both >> > > > > > > > > > > > > > > > > > of which will still have access to the >> full >> > > > > > context. So >> > > > > > > > > > > > > additional >> > > > > > > > > > > > > > > info >> > > > > > > > > > > > > > > > > > such as >> > > > > > > > > > > > > > > > > > the timestamp can and should probably be >> > > supplied >> > > > > > to the store >> > > > > > > > > > > > > before >> > > > > > > > > > > > > > > > > > calling put(), rather than looked up by >> the >> > > store. >> > > > > > But I can >> > > > > > > > > > > > see >> > > > > > > > > > > > > some >> > > > > > > > > > > > > > > > > other >> > > > > > > > > > > > > > > > > > things being useful, for example the >> current >> > > > > > record's headers. >> > > > > > > > > > > > > Maybe >> > > > > > > > > > > > > > > > > if/when >> > > > > > > > > > > > > > > > > > we add better (or any) support for >> headers in >> > > > > > state stores this >> > > > > > > > > > > > > will >> > > > > > > > > > > > > > > be >> > > > > > > > > > > > > > > > > > less true. >> > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > Of course as John has made clear, it's >> > > pretty hard >> > > > > > to judge >> > > > > > > > > > > > > without >> > > > > > > > > > > > > > > > > > examples >> > > > > > > > > > > > > > > > > > and more insight as to what actually >> goes on >> > > > > > within a custom >> > > > > > > > > > > > > state >> > > > > > > > > > > > > > > store >> > > > > > > > > > > > > > > > > > On Wed, Sep 9, 2020 at 8:07 PM John >> Roesler < >> > > > > > > > > > > > vvcep...@apache.org >> > > > > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > > > > > > Hi Paul, >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > It's good to hear from you! >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > I'm glad you're in favor of the >> direction. >> > > > > > Especially when >> > > > > > > > > > > > > > > > > > > it comes to public API and usability >> > > concens, I >> > > > > > tend to >> > > > > > > > > > > > > > > > > > > think that "the folks who matter" are >> > > actually >> > > > > > the folks who >> > > > > > > > > > > > > > > > > > > have to use the APIs to accomplish >> real >> > > tasks. >> > > > > > It can be >> > > > > > > > > > > > > > > > > > > hard for me to be sure I'm thinking >> > > clearly from >> > > > > > that >> > > > > > > > > > > > > > > > > > > perspective. >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > Funny story, I also started down this >> road >> > > a >> > > > > > couple of times >> > > > > > > > > > > > > > > > > > > already and backed them out before >> the KIP >> > > > > > because I was >> > > > > > > > > > > > > > > > > > > afraid of the scope of the proposal. >> > > > > > Unfortunately, needing >> > > > > > > > > > > > > > > > > > > to make a new ProcessorContext kind of >> > > forced my >> > > > > > hand. >> > > > > > > > > > > > > > > > > > > I see you've called me out about the >> > > > > > ChangeLogging stores :) >> > > > > > > > > > > > > > > > > > > In fact, I think these are the >> main/only >> > > reason >> > > > > > that stores >> > > > > > > > > > > > > > > > > > > might really need to invoke >> "forward()". My >> > > > > > secret plan was >> > > > > > > > > > > > > > > > > > > to cheat and either accomplish >> > > change-logging by >> > > > > > a different >> > > > > > > > > > > > > > > > > > > mechanism than implementing the store >> > > interface, >> > > > > > or by just >> > > > > > > > > > > > > > > > > > > breaking encapsulation to sneak the >> "real" >> > > > > > ProcessorContext >> > > > > > > > > > > > > > > > > > > into the ChangeLogging stores. But >> those >> > > are all >> > > > > > > > > > > > > > > > > > > implementation details. I think the >> key >> > > question >> > > > > > is whether >> > > > > > > > > > > > > > > > > > > anyone else has a store >> implementation that >> > > > > > needs to call >> > > > > > > > > > > > > > > > > > > "forward()". It's not what you >> mentioned, >> > > but >> > > > > > since you >> > > > > > > > > > > > > > > > > > > spoke up, I'll just ask: if you have >> a use >> > > case >> > > > > > for calling >> > > > > > > > > > > > > > > > > > > "forward()" in a store, please share >> it. >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > Regarding the other record-specific >> context >> > > > > > methods, I think >> > > > > > > > > > > > > > > > > > > you have a good point, but I also >> can't >> > > quite >> > > > > > wrap my head >> > > > > > > > > > > > > > > > > > > around how we can actually guarantee >> it to >> > > work >> > > > > > in general. >> > > > > > > > > > > > > > > > > > > For example, the case you cited, >> where the >> > > > > > implementation of >> > > > > > > > > > > > > > > > > > > `KeyValueStore#put(key, value)` uses >> the >> > > context >> > > > > > to augment >> > > > > > > > > > > > > > > > > > > the record with timestamp >> information. This >> > > > > > relies on the >> > > > > > > > > > > > > > > > > > > assumption that you would only call >> > > "put()" from >> > > > > > inside a >> > > > > > > > > > > > > > > > > > > `Processor#process(key, value)` call >> in >> > > which >> > > > > > the record >> > > > > > > > > > > > > > > > > > > being processed is the same record >> that >> > > you're >> > > > > > trying to put >> > > > > > > > > > > > > > > > > > > into the store. >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > If you were to call "put" from a >> > > punctuator, or >> > > > > > do a >> > > > > > > > > > > > > > > > > > > `range()` query and then update one of >> > > those >> > > > > > records with >> > > > > > > > > > > > > > > > > > > `put()`, you'd have a very subtle bug >> on >> > > your >> > > > > > hands. Right >> > > > > > > > > > > > > > > > > > > now, the Streams component that >> actually >> > > calls >> > > > > > the Processor >> > > > > > > > > > > > > > > > > > > takes care to set the right record >> context >> > > > > > before invoking >> > > > > > > > > > > > > > > > > > > the method, and in the case of >> caching, >> > > etc., it >> > > > > > also takes >> > > > > > > > > > > > > > > > > > > care to swap out the old context and >> keep >> > > it >> > > > > > somewhere safe. >> > > > > > > > > > > > > > > > > > > But when it comes to public API >> Processors >> > > > > > calling methods >> > > > > > > > > > > > > > > > > > > on StateStores, there's no >> opportunity for >> > > any >> > > > > > component to >> > > > > > > > > > > > > > > > > > > make sure the context is always >> correct. >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > In the face of that situation, it >> seemed >> > > better >> > > > > > to just move >> > > > > > > > > > > > > > > > > > > in the direction of a "normal" data >> store. >> > > I.e., >> > > > > > when you >> > > > > > > > > > > > > > > > > > > use a HashMap or RocksDB or other >> "state >> > > > > > stores", you don't >> > > > > > > > > > > > > > > > > > > expect them to automatically know >> extra >> > > stuff >> > > > > > about the >> > > > > > > > > > > > > > > > > > > record you're storing. If you need >> them to >> > > know >> > > > > > something, >> > > > > > > > > > > > > > > > > > > you just put it in the value. >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > All of that said, I'm just reasoning >> from >> > > first >> > > > > > principles >> > > > > > > > > > > > > > > > > > > here. To really know if this is a >> mistake >> > > or >> > > > > > not, I need to >> > > > > > > > > > > > > > > > > > > be in your place. So please push back >> if >> > > you >> > > > > > think what I >> > > > > > > > > > > > > > > > > > > said is nonsense. My personal plan >> was to >> > > keep >> > > > > > an eye out >> > > > > > > > > > > > > > > > > > > during the period where the old API >> was >> > > still >> > > > > > present, but >> > > > > > > > > > > > > > > > > > > deprecated, to see if people were >> > > struggling to >> > > > > > use the new >> > > > > > > > > > > > > > > > > > > API. If so, then we'd have a chance to >> > > address >> > > > > > it before >> > > > > > > > > > > > > > > > > > > dropping the old API. But it's even >> better >> > > if >> > > > > > you can help >> > > > > > > > > > > > > > > > > > > think it through now. >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > It did also cross my mind to _not_ >> add the >> > > > > > > > > > > > > > > > > > > StateStoreContext, but just to >> continue to >> > > punt >> > > > > > on the >> > > > > > > > > > > > > > > > > > > question by just dropping in the new >> > > > > > ProcessorContext to the >> > > > > > > > > > > > > > > > > > > new init method. If StateStoreContext >> > > seems too >> > > > > > bold, we can >> > > > > > > > > > > > > > > > > > > go that direction. But if we actually >> add >> > > some >> > > > > > methods to >> > > > > > > > > > > > > > > > > > > StateStoreContext, I'd like to be >> able to >> > > ensure >> > > > > > they would >> > > > > > > > > > > > > > > > > > > be well defined. I think the current >> > > situation >> > > > > > was more of >> > > > > > > > > > > > > > > > > > > an oversight than a choice. >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > Thanks again for your reply, >> > > > > > > > > > > > > > > > > > > -John >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > On Wed, 2020-09-09 at 21:23 -0500, >> Paul >> > > Whalen >> > > > > > wrote: >> > > > > > > > > > > > > > > > > > > > John, >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > It's exciting to see this KIP head >> in >> > > this >> > > > > > direction! In >> > > > > > > > > > > > the >> > > > > > > > > > > > > > > last >> > > > > > > > > > > > > > > > > year >> > > > > > > > > > > > > > > > > > > or >> > > > > > > > > > > > > > > > > > > > so I've tried to sketch out some >> > > usability >> > > > > > improvements for >> > > > > > > > > > > > > > > custom >> > > > > > > > > > > > > > > > > state >> > > > > > > > > > > > > > > > > > > > stores, and I also ended up >> splitting >> > > out the >> > > > > > > > > > > > > StateStoreContext >> > > > > > > > > > > > > > > from >> > > > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > > > ProcessorContext in an attempt to >> > > facilitate >> > > > > > what I was >> > > > > > > > > > > > > doing. I >> > > > > > > > > > > > > > > > > sort of >> > > > > > > > > > > > > > > > > > > > abandoned it when I realized how >> large >> > > the >> > > > > > ideal change >> > > > > > > > > > > > might >> > > > > > > > > > > > > > > have >> > > > > > > > > > > > > > > > > to be, >> > > > > > > > > > > > > > > > > > > > but it's great to see that there is >> other >> > > > > > interest in >> > > > > > > > > > > > moving >> > > > > > > > > > > > > in >> > > > > > > > > > > > > > > this >> > > > > > > > > > > > > > > > > > > > direction (from the folks that >> matter :) >> > > ). >> > > > > > > > > > > > > > > > > > > > Having taken a stab at it myself, I >> have >> > > a >> > > > > > comment/question >> > > > > > > > > > > > > on >> > > > > > > > > > > > > > > this >> > > > > > > > > > > > > > > > > > > bullet >> > > > > > > > > > > > > > > > > > > > about StateStoreContext: >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > It does *not* include anything >> > > processor- or >> > > > > > record- >> > > > > > > > > > > > > specific, >> > > > > > > > > > > > > > > like >> > > > > > > > > > > > > > > > > > > > > `forward()` or any information >> about >> > > the >> > > > > > "current" >> > > > > > > > > > > > record, >> > > > > > > > > > > > > > > which is >> > > > > > > > > > > > > > > > > > > only a >> > > > > > > > > > > > > > > > > > > > > well-defined in the context of the >> > > > > > Processor. Processors >> > > > > > > > > > > > > > > process >> > > > > > > > > > > > > > > > > one >> > > > > > > > > > > > > > > > > > > record >> > > > > > > > > > > > > > > > > > > > > at a time, but state stores may be >> > > used to >> > > > > > store and >> > > > > > > > > > > > fetch >> > > > > > > > > > > > > many >> > > > > > > > > > > > > > > > > > > records, so >> > > > > > > > > > > > > > > > > > > > > there is no "current record". >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > I totally agree that >> record-specific or >> > > > > > processor-specific >> > > > > > > > > > > > > > > context >> > > > > > > > > > > > > > > > > in a >> > > > > > > > > > > > > > > > > > > > state store is often not >> well-defined >> > > and it >> > > > > > would be good >> > > > > > > > > > > > to >> > > > > > > > > > > > > > > > > separate >> > > > > > > > > > > > > > > > > > > that >> > > > > > > > > > > > > > > > > > > > out, but sometimes it (at least >> > > > > > record-specific context) is >> > > > > > > > > > > > > > > actually >> > > > > > > > > > > > > > > > > > > > useful, for example, passing the >> record's >> > > > > > timestamp through >> > > > > > > > > > > > > to >> > > > > > > > > > > > > > > the >> > > > > > > > > > > > > > > > > > > > underlying storage (or changelog >> topic): >> > > > > > > > > > > > > > > > > > > > >> > > >> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java#L121 >> > > > > > > > > > > > > > > > > > > > You could have the writer client of >> the >> > > state >> > > > > > store pass >> > > > > > > > > > > > this >> > > > > > > > > > > > > > > > > through, >> > > > > > > > > > > > > > > > > > > but >> > > > > > > > > > > > > > > > > > > > it would be nice to be able to write >> > > state >> > > > > > stores where the >> > > > > > > > > > > > > > > client >> > > > > > > > > > > > > > > > > did >> > > > > > > > > > > > > > > > > > > not >> > > > > > > > > > > > > > > > > > > > have this responsibility. I'm not >> sure >> > > if the >> > > > > > solution is >> > > > > > > > > > > > > to add >> > > > > > > > > > > > > > > > > some >> > > > > > > > > > > > > > > > > > > > things back to StateStoreContext, or >> > > make yet >> > > > > > another >> > > > > > > > > > > > context >> > > > > > > > > > > > > > > that >> > > > > > > > > > > > > > > > > > > > represents record-specific context >> while >> > > > > > inside a state >> > > > > > > > > > > > > store. >> > > > > > > > > > > > > > > > > > > > Best, >> > > > > > > > > > > > > > > > > > > > Paul >> > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > On Wed, Sep 9, 2020 at 5:43 PM John >> > > Roesler < >> > > > > > > > > > > > > j...@vvcephei.org> >> > > > > > > > > > > > > > > > > wrote: >> > > > > > > > > > > > > > > > > > > > > Hello all, >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > I've been slowly pushing KIP-478 >> > > forward >> > > > > > over the last >> > > > > > > > > > > > > year, >> > > > > > > > > > > > > > > > > > > > > and I'm happy to say that we're >> making >> > > good >> > > > > > progress now. >> > > > > > > > > > > > > > > > > > > > > However, several issues with the >> > > original >> > > > > > design have >> > > > > > > > > > > > come >> > > > > > > > > > > > > > > > > > > > > to light. >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > The major changes: >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > We discovered that the original >> plan >> > > of just >> > > > > > adding >> > > > > > > > > > > > generic >> > > > > > > > > > > > > > > > > > > > > parameters to ProcessorContext >> was too >> > > > > > disruptive, so we >> > > > > > > > > > > > > are >> > > > > > > > > > > > > > > > > > > > > now adding a new >> api.ProcessorContext. >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > That choice forces us to add a new >> > > > > > StateStore.init method >> > > > > > > > > > > > > > > > > > > > > for the new context, but >> > > ProcessorContext >> > > > > > really isn't >> > > > > > > > > > > > > ideal >> > > > > > > > > > > > > > > > > > > > > for state stores to begin with, >> so I'm >> > > > > > proposing a new >> > > > > > > > > > > > > > > > > > > > > StateStoreContext for this >> purpose. In >> > > a >> > > > > > nutshell, there >> > > > > > > > > > > > > are >> > > > > > > > > > > > > > > > > > > > > quite a few methods in >> > > ProcessorContext that >> > > > > > actually >> > > > > > > > > > > > > should >> > > > > > > > > > > > > > > > > > > > > never be called from inside a >> > > StateStore. >> > > > > > > > > > > > > > > > > > > > > Also, since there is a new >> > > ProcessorContext >> > > > > > interface, we >> > > > > > > > > > > > > > > > > > > > > need a new MockProcessorContext >> > > > > > implementation in the >> > > > > > > > > > > > test- >> > > > > > > > > > > > > > > > > > > > > utils module. >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > The changeset for the KIP >> document is >> > > here: >> > > > > > > > > > > > > > > > > > > > > >> > > >> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=14&selectedPageVersions=10 >> > > > > > > > > > > > > > > > > > > > > And the KIP itself is here: >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-478+-+Strongly+typed+Processor+API >> > > > > > > > > > > > > > > > > > > > > If you have any concerns, please >> let >> > > me know! >> > > > > > > > > > > > > > > > > > > > > Thanks, >> > > > > > > > > > > > > > > > > > > > > -John >> > > > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > >> >>