Thanks John. SGTM.
On 10/1/20 2:50 PM, John Roesler wrote: > Hello again, all, > > I'm sorry to make another tweak to this KIP, but during the > implementation of the design we've just agreed on, I > realized that Processors would almost never need to > reference the RecordMetadata. Therefore, I'm proposing to > streamline the API by moving the Optional<RecordMetadata> to > the new ProcessorContext as a method, rather than making it > a method argument to Processor#process. > > The change is visible here: > https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=16&selectedPageVersions=15 > > All of the same semantics and considerations we discussed > still apply, it's just that Processor implementers won't > have to think about it unless they actually _need_ the > topic/partition/offset information from the RecordMetadata. > > Also, the PR for this part of the KIP is now available here: > https://github.com/apache/kafka/pull/9361 > > I know it's a bit on the heavy side; I've annotated the PR > to try and ease the reviewer's job. I'd greatly appreciate > it if anyone can take the time to review. > > Thanks, > -John > > On Wed, 2020-09-30 at 10:16 -0500, John Roesler wrote: >> Thanks, Matthias! >> >> I can certainly document it. I didn't bother because the old >> Processor, Supplier, and Context will themselves be >> deprecated, so any method that handles them won't be able to >> avoid the deprecation warning. Nevertheless, it doesn't hurt >> just to explicitly deprecated those methods. >> >> Thanks, >> -John >> >> On Wed, 2020-09-30 at 08:10 -0700, Matthias J. Sax wrote: >>> Thanks John. I like the proposal. >>> >>> Btw: I was just going over the KIP and realized that we add new methods >>> to `StreamBuilder`, `Topology`, and `KStream` that take the new >>> `ProcessorSupplier` class -- should we also deprecate the corresponding >>> existing ones that take the old `ProcessorSupplier`? >>> >>> >>> -Matthias >>> >>> >>> On 9/30/20 7:46 AM, John Roesler wrote: >>>> Thanks Paul and Sophie, >>>> >>>> Your feedback certainly underscores the need to be explicit >>>> in the javadoc about why that parameter is Optional. Getting >>>> this kind of feedback before the release is exactly the kind >>>> of outcome we hope to get from the KIP process! >>>> >>>> Thanks, >>>> -John >>>> >>>> On Tue, 2020-09-29 at 22:32 -0500, Paul Whalen wrote: >>>>> John, I totally agree that adding a method to Processor is cumbersome and >>>>> not a good path. I was imagining maybe a separate interface that could be >>>>> used in the appropriate context, but I don't think that makes too much >>>>> sense - it's just too far away from what Kafka Streams is. I was >>>>> originally more interested in the "why" Optional than the "how" (I think >>>>> my >>>>> original reply overplayed the "optional as an argument" concern). But >>>>> you've convinced me that there is a perfectly legitimate "why". We should >>>>> make sure that it's clear why it's Optional, but I suppose that goes >>>>> without saying. It's a nice opportunity to make the API reflect more what >>>>> is actually going on under the hood. >>>>> >>>>> Thanks! >>>>> Paul >>>>> >>>>> On Tue, Sep 29, 2020 at 10:05 PM Sophie Blee-Goldman <sop...@confluent.io> >>>>> wrote: >>>>> >>>>>> FWIW, while I'm really not a fan of Optional in general, I agree that its >>>>>> usage >>>>>> here seems appropriate. Even for those rare software developers who >>>>>> carefully >>>>>> read all the docs several times over, I think it wouldn't be too hard to >>>>>> miss a >>>>>> note about the RecordMetadata possibly being null. >>>>>> >>>>>> Especially because it's not that obvious why at first glance, and takes a >>>>>> bit of >>>>>> thinking to realize that records originating from a Punctuator wouldn't >>>>>> have a >>>>>> "current record". This is something that has definitely confused users >>>>>> today. >>>>>> >>>>>> It's on us to improve the education here -- and an >>>>>> Optional<RecordMetadata> >>>>>> would naturally raise awareness of this subtlety >>>>>> >>>>>> On Tue, Sep 29, 2020 at 7:40 PM Sophie Blee-Goldman <sop...@confluent.io> >>>>>> wrote: >>>>>> >>>>>>> Does my reply address your concerns? >>>>>>> >>>>>>> >>>>>>> Yes; also, I definitely misread part of the proposal earlier and thought >>>>>>> you had put >>>>>>> the timestamp field in RecordMetadata. Sorry for not giving things a >>>>>>> closer look >>>>>>> before responding! I'm not sure my original message made much sense >>>>>>> given >>>>>>> the misunderstanding, but thanks for responding anyway :P >>>>>>> >>>>>>> Having given the proposal a second pass, I agree, it's very elegant. +1 >>>>>>> >>>>>>> On Tue, Sep 29, 2020 at 6:50 PM John Roesler <vvcep...@apache.org> >>>>>> wrote: >>>>>>>> Thanks for the reply, Sophie, >>>>>>>> >>>>>>>> I think I may have summarized too much in my prior reply. >>>>>>>> >>>>>>>> In the currently proposed KIP, any caller of forward() must >>>>>>>> supply a Record, which consists of: >>>>>>>> * key >>>>>>>> * value >>>>>>>> * timestamp >>>>>>>> * headers (with a convenience constructor that sets empty >>>>>>>> headers) >>>>>>>> >>>>>>>> These aren't what I was referring to as potentially being >>>>>>>> undefined downstream, since thanks to the introduction of >>>>>>>> Record, they are, as you're advocating, required to be >>>>>>>> defined everywhere, even when forwarding from a punctuator. >>>>>>>> >>>>>>>> So to be clear, the intent of this change is actually to >>>>>>>> _enforce_ that timestamp would never be undefined (which it >>>>>>>> currently can be). Also, since punctuators _are_ going to >>>>>>>> have to "make up" a timestamp going forward, we should note >>>>>>>> that the "punctuate" method currently passes in a good >>>>>>>> timestamp that they can use: for system-time punctuations, >>>>>>>> they receive the current system time, and for stream-time >>>>>>>> punctuations, they get the current stream time. >>>>>>>> >>>>>>>> The potentially undefined RecordMetadata only contains these >>>>>>>> fields: >>>>>>>> * topic >>>>>>>> * partition >>>>>>>> * offset >>>>>>>> >>>>>>>> These fields aren't required (or even used) in a Sink, and >>>>>>>> it doesn't seem like they would be important to many >>>>>>>> applications. Furthermore, it doesn't _seem_ like you'd even >>>>>>>> want to set these fields. They seem purely informational and >>>>>>>> only useful in the context when you are actually processing >>>>>>>> a real input record. It doesn't sound like you were asking >>>>>>>> for it, but just to put it on the record, I think if we were >>>>>>>> to require values for the metadata from punctuators, people >>>>>>>> would mostly just make up their own dummy values, to no >>>>>>>> one's benefit. >>>>>>>> >>>>>>>> I should also note that with the current >>>>>>>> Record/RecordMetadata split, we will have the freedom to >>>>>>>> move fields into the Record class (or even add new fields) >>>>>>>> if we want them to become "data" as opposed to "metadata" in >>>>>>>> the future. >>>>>>>> >>>>>>>> Thanks for your reply; I was similarly floored when I >>>>>>>> realized the true nature of the current situation. Does my >>>>>>>> reply address your concerns? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> -John >>>>>>>> >>>>>>>> On Tue, 2020-09-29 at 18:34 -0700, Sophie Blee-Goldman >>>>>>>> wrote: >>>>>>>>>> However, the record metadata is only defined when the parent >>>>>> forwards >>>>>>>>>> while processing a >>>>>>>>> >>>>>>>>> real record, not when it calls forward from the punctuator >>>>>>>>> >>>>>>>>> >>>>>>>>> Can we take a step back for a second...why wouldn't you be required to >>>>>>>> set >>>>>>>>> the RecordContext >>>>>>>>> yourself when calling forward from a Punctuator? I think I agree with >>>>>>>> Paul >>>>>>>>> here, it seems kind of >>>>>>>>> absurd not to enforce that the RecordContext be present inside the >>>>>>>>> process() method. >>>>>>>>> >>>>>>>>> The original problem with Punctuators, as I understood it, was that >>>>>> all >>>>>>>> of >>>>>>>>> the RecordContext >>>>>>>>> fields were exposed automatically to both the Processor and any >>>>>>>> Punctuator, >>>>>>>>> due to being >>>>>>>>> direct methods on the ProcessorContext. We can't control which >>>>>>>>> ProcessorContext methods >>>>>>>>> someone will call from with a Punctuator vs from a Processor. The best >>>>>>>> we >>>>>>>>> could do was >>>>>>>>> set these "nonsense" fields to null when inside a Punctuator, or set >>>>>>>> them >>>>>>>>> to some dummy >>>>>>>>> values as you pointed out. >>>>>>>>> >>>>>>>>> But then you proposed the solution of a separate RecordContext which >>>>>> is >>>>>>>> not >>>>>>>>> attached to the >>>>>>>>> ProcessorContext at all. This seemed to solve the above problem very >>>>>>>>> neatly: we only pass >>>>>>>>> in the RecordContext to the process() method, so we don't have to >>>>>> worry >>>>>>>>> about people trying >>>>>>>>> to access these fields from within a Punctuator. The fields aren't >>>>>>>>> accessible unless they're >>>>>>>>> defined. >>>>>>>>> >>>>>>>>> So what happens when someone wants to forward something from within a >>>>>>>>> Punctuator? I >>>>>>>>> don't think it's reasonable to let the timestamp field be undefined, >>>>>>>> ever. >>>>>>>>> What if the Punctuator >>>>>>>>> forwards directly to a sink, or directly to some windowing logic. Are >>>>>> we >>>>>>>>> supposed to add >>>>>>>>> handling for the RecordContext == null case to every processor? Or are >>>>>>>> we >>>>>>>>> just going to >>>>>>>>> assume the implicit restriction that users will only forward records >>>>>>>> from a >>>>>>>>> Punctuator to >>>>>>>>> downstream processors that know how to handle and/or set the >>>>>>>> RecordContext >>>>>>>>> if it's >>>>>>>>> undefined. That seems to throw away a lot of the awesome safety added >>>>>> in >>>>>>>>> this KIP >>>>>>>>> >>>>>>>>> Apologies for the rant. But I feel pretty strongly that allowing to >>>>>>>> forward >>>>>>>>> records from a >>>>>>>>> Punctuator without a defined RecordContext would be asking for >>>>>> trouble. >>>>>>>>> Imo, if you >>>>>>>>> want to forward from a Punctuator, you need to store the info you need >>>>>>>> in >>>>>>>>> order to >>>>>>>>> set the timestamp, or make one up yourself >>>>>>>>> >>>>>>>>> (the one alternative I can think of here is that maybe we could pass >>>>>> in >>>>>>>> the >>>>>>>>> current >>>>>>>>> partition time, so users can at least put in a reasonable estimate for >>>>>>>> the >>>>>>>>> timestamp >>>>>>>>> that won't cause it to get dropped and won't potentially lurch the >>>>>>>>> streamtime far into >>>>>>>>> the future. This would be similar to what we do in the >>>>>>>> TimestampExtractor) >>>>>>>>> On Tue, Sep 29, 2020 at 6:06 PM John Roesler <vvcep...@apache.org> >>>>>>>> wrote: >>>>>>>>>> Oh, I guess one other thing I should have mentioned is that I’ve >>>>>>>> recently >>>>>>>>>> discovered that in cases where the context is undefined, we >>>>>> currently >>>>>>>> just >>>>>>>>>> fill in dummy values for the context. So there’s a good chance that >>>>>>>> real >>>>>>>>>> applications in use are depending on undefined context without even >>>>>>>>>> realizing it. What I’m hoping to do is just make the situation >>>>>>>> explicit and >>>>>>>>>> get rid of the dummy values. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> John >>>>>>>>>> >>>>>>>>>> On Tue, Sep 29, 2020, at 20:01, John Roesler wrote: >>>>>>>>>>> Thanks for the review, Paul! >>>>>>>>>>> >>>>>>>>>>> I had read some of that debate before. There seems to be some >>>>>>>> subtext >>>>>>>>>>> there, because they advise against using Optional in cases like >>>>>>>> this, >>>>>>>>>>> but there doesn’t seem to be a specific reason why it’s >>>>>>>> inappropriate. >>>>>>>>>>> I got the impression they were just afraid that people would go >>>>>>>>>>> overboard and make everything Optional. >>>>>>>>>>> >>>>>>>>>>> I could also make two methods, but it seemed like it might be an >>>>>>>>>>> unfortunate way to handle the issue, since Processor is just >>>>>> about a >>>>>>>>>>> Function as-is, but the two-method approach would require people >>>>>> to >>>>>>>>>>> implement both methods. >>>>>>>>>>> >>>>>>>>>>> To your question, this is something that’s only recently became >>>>>>>> clear >>>>>>>>>>> to me. Imagine you have a parent processor that calls forward both >>>>>>>> from >>>>>>>>>>> process and a punctuator. The child will have process() invoked in >>>>>>>> both >>>>>>>>>>> cases, and won’t be able to distinguish them. However, the record >>>>>>>>>>> metadata is only defined when the parent forwards while >>>>>> processing a >>>>>>>>>>> real record, not when it calls forward from the punctuator. >>>>>>>>>>> >>>>>>>>>>> This is why I wanted to make the metadata Optional, to advertise >>>>>>>> that >>>>>>>>>>> the metadata might be undefined if any ancestor processor ever >>>>>> calls >>>>>>>>>>> forward from a punctuator. We could remove the Optional and >>>>>> instead >>>>>>>>>>> just document that the argument might be null. >>>>>>>>>>> >>>>>>>>>>> With that context in place, what’s your take? >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> John >>>>>>>>>>> >>>>>>>>>>> On Tue, Sep 29, 2020, at 19:09, Paul Whalen wrote: >>>>>>>>>>>> Looks pretty good to me, though the Processor#process(Record, >>>>>>>>>>>> Optional<RecordMetadata>) signature caught my eye. There's some >>>>>>>>>> debate >>>>>>>>>>>> ( >>>>>>>>>>>> >>>>>> https://stackoverflow.com/questions/31922866/why-should-java-8s-optional-not-be-used-in-arguments >>>>>>>>>> ) >>>>>>>>>>>> about whether to use Optionals in arguments, and while that's a >>>>>>>> bit of >>>>>>>>>> a >>>>>>>>>>>> religious debate in the abstract, it did make me wonder whether >>>>>> it >>>>>>>>>> makes >>>>>>>>>>>> sense in this specific case. When is it actually not present? >>>>>> I >>>>>>>> was >>>>>>>>>>>> under >>>>>>>>>>>> the impression that we should always have access to it in >>>>>>>> process(), >>>>>>>>>> and >>>>>>>>>>>> that the concern about metadata being undefined was about having >>>>>>>>>> access >>>>>>>>>>>> to >>>>>>>>>>>> record metadata in the ProcessorContext held for use inside a >>>>>>>>>>>> Punctuator. >>>>>>>>>>>> >>>>>>>>>>>> If that's not the case and it is truly optional in process(), is >>>>>>>> there >>>>>>>>>> an >>>>>>>>>>>> opportunity for an alternate interface for the cases when we >>>>>>>> don't get >>>>>>>>>> it, >>>>>>>>>>>> rather than force the branching on implementers of the >>>>>> interface? >>>>>>>>>>>> Apologies if I've missed something, I took a look at the PR and >>>>>> I >>>>>>>>>> didn't >>>>>>>>>>>> see any spots where I thought it would be empty. Perhaps an >>>>>>>> example >>>>>>>>>> of a >>>>>>>>>>>> Punctuator using (and not using) the new API would clear things >>>>>>>> up. >>>>>>>>>>>> Best, >>>>>>>>>>>> Paul >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Sep 29, 2020 at 4:10 PM John Roesler < >>>>>> vvcep...@apache.org >>>>>>>>>> wrote: >>>>>>>>>>>>> Hello again, all, >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks for the latest round of discussion. I've taken the >>>>>>>>>>>>> recent feedback and come up with an updated KIP that seems >>>>>>>>>>>>> actually quite a bit nicer than the prior proposal. >>>>>>>>>>>>> >>>>>>>>>>>>> The specific diff on the KIP is here: >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=15&selectedPageVersions=14 >>>>>>>>>>>>> These changes are implemented in this POC PR: >>>>>>>>>>>>> https://github.com/apache/kafka/pull/9346 >>>>>>>>>>>>> >>>>>>>>>>>>> The basic idea is that, building on the recent conversaion, >>>>>>>>>>>>> we would transition away from the current API where we get >>>>>>>>>>>>> only key/value in the process() method and other "data" >>>>>>>>>>>>> comes in the ProcessorContext along with the "metadata". >>>>>>>>>>>>> >>>>>>>>>>>>> Instead, we formalize what is "data" and what is "metadata", >>>>>>>>>>>>> and pass it all in to the process method: >>>>>>>>>>>>> Processor#process(Record, Optional<RecordMetadata>) >>>>>>>>>>>>> >>>>>>>>>>>>> Also, you forward the whole data class instead of mutating >>>>>>>>>>>>> the ProcessorContext fields and also calling forward: >>>>>>>>>>>>> ProcessorContext#forward(Record) >>>>>>>>>>>>> >>>>>>>>>>>>> The Record class itself ships with methods like >>>>>>>>>>>>> record#withValue(NewV newValue) >>>>>>>>>>>>> that make a shallow copy of the input Record, enabling >>>>>>>>>>>>> Processors to safely handle the record without polluting the >>>>>>>>>>>>> context of their parents and siblings. >>>>>>>>>>>>> >>>>>>>>>>>>> This proposal has a number of key benefits: >>>>>>>>>>>>> * As we've discovered in KAFKA-9584, it's unsafe to mutate >>>>>>>>>>>>> the Headers via the ProcessorContext. This proposal offers a >>>>>>>>>>>>> way to safely forward changes only to downstream processors. >>>>>>>>>>>>> * The new API has symmetry (each processor's input is the >>>>>>>>>>>>> output of its parent processor) >>>>>>>>>>>>> * The API makes clear that the record metadata isn't always >>>>>>>>>>>>> defined (for example, in a punctuation, there is no current >>>>>>>>>>>>> topic/partition/offset) >>>>>>>>>>>>> * The API enables punctuators to forward well defined >>>>>>>>>>>>> headers downstream, which is currently not possible. >>>>>>>>>>>>> >>>>>>>>>>>>> Unless their are objections, I'll go ahead and re-finalize >>>>>>>>>>>>> this KIP and update that PR to a mergeable state. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, all, >>>>>>>>>>>>> -John >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, 2020-09-24 at 09:41 -0700, Matthias J. Sax wrote: >>>>>>>>>>>>>> Interesting proposal. However, I am not totally convinced, >>>>>>>> because >>>>>>>>>> I see >>>>>>>>>>>>>> a fundamental difference between "data" and "metadata". >>>>>>>>>>>>>> >>>>>>>>>>>>>> Topic/partition/offset are "metadata" in the strong sense >>>>>> and >>>>>>>> they >>>>>>>>>> are >>>>>>>>>>>>>> immutable. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On the other hand there is "primary" data like key and >>>>>> value, >>>>>>>> as >>>>>>>>>> well as >>>>>>>>>>>>>> "secondary" data like timestamp and headers. The issue seems >>>>>>>> that >>>>>>>>>> we >>>>>>>>>>>>>> treat "secondary data" more like metadata atm? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thus, promoting timestamp and headers into a first class >>>>>>>> citizen >>>>>>>>>> roll >>>>>>>>>>>>>> make sense to me (my original proposal about `RecordContext` >>>>>>>> would >>>>>>>>>> still >>>>>>>>>>>>>> fall short with this regard). However, putting both (data >>>>>> and >>>>>>>>>> metadata) >>>>>>>>>>>>>> into a `Record` abstraction might go too far? >>>>>>>>>>>>>> >>>>>>>>>>>>>> I am also a little bit concerned about `Record.copy()` >>>>>>>> because it >>>>>>>>>> might >>>>>>>>>>>>>> be a trap: Users might assume it does a full deep copy of >>>>>> the >>>>>>>>>> record, >>>>>>>>>>>>>> however, it would not. It would only create a new `Record` >>>>>>>> object >>>>>>>>>> as >>>>>>>>>>>>>> wrapper that points to the same key/value/header objects as >>>>>>>> the >>>>>>>>>> input >>>>>>>>>>>>>> record. >>>>>>>>>>>>>> >>>>>>>>>>>>>> With the current `context.forward(key, value)` we don't have >>>>>>>> this >>>>>>>>>> "deep >>>>>>>>>>>>>> copy" issue -- it's pretty clear what is happening. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Instead of `To.all().withTimestamp()` we could also add >>>>>>>>>>>>>> `context.forward(key, value, timestamp)` etc (just wondering >>>>>>>> about >>>>>>>>>> the >>>>>>>>>>>>>> exposition in overload)? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Also, `Record.withValue` etc sounds odd? Should a record not >>>>>>>> be >>>>>>>>>>>>>> immutable? So, we could have something like >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>> `RecordFactory.withKeyValue(...).withTimestamp(...).withHeaders(...).build()`. >>>>>>>>>>>>>> But it looks rather verbose? >>>>>>>>>>>>>> >>>>>>>>>>>>>> The other question is of course, to what extend to we want >>>>>> to >>>>>>>> keep >>>>>>>>>> the >>>>>>>>>>>>>> distinction between "primary" and "secondary" data? To me, >>>>>>>> it's a >>>>>>>>>>>>>> question of easy of use? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Just putting all this out to move the discussion forward. >>>>>>>> Don't >>>>>>>>>> have a >>>>>>>>>>>>>> concrete proposal atm. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On 9/14/20 9:24 AM, John Roesler wrote: >>>>>>>>>>>>>>> Thanks for this thought, Matthias! >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> To be honest, it's bugged me quite a bit that _all_ the >>>>>>>>>>>>>>> record information hasn't been an argument to `process`. I >>>>>>>>>>>>>>> suppose I was trying to be conservative in this proposal, >>>>>>>>>>>>>>> but then again, if we're adding new Processor and >>>>>>>>>>>>>>> ProcessorContext interfaces, then this is the time to make >>>>>>>>>>>>>>> such a change. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> To be unambiguous, I think this is what we're talking >>>>>> about: >>>>>>>>>>>>>>> ProcessorContext: >>>>>>>>>>>>>>> * applicationId >>>>>>>>>>>>>>> * taskId >>>>>>>>>>>>>>> * appConfigs >>>>>>>>>>>>>>> * appConfigsWithPrefix >>>>>>>>>>>>>>> * keySerde >>>>>>>>>>>>>>> * valueSerde >>>>>>>>>>>>>>> * stateDir >>>>>>>>>>>>>>> * metrics >>>>>>>>>>>>>>> * schedule >>>>>>>>>>>>>>> * commit >>>>>>>>>>>>>>> * forward >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> StateStoreContext: >>>>>>>>>>>>>>> * applicationId >>>>>>>>>>>>>>> * taskId >>>>>>>>>>>>>>> * appConfigs >>>>>>>>>>>>>>> * appConfigsWithPrefix >>>>>>>>>>>>>>> * keySerde >>>>>>>>>>>>>>> * valueSerde >>>>>>>>>>>>>>> * stateDir >>>>>>>>>>>>>>> * metrics >>>>>>>>>>>>>>> * register >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> RecordContext >>>>>>>>>>>>>>> * topic >>>>>>>>>>>>>>> * partition >>>>>>>>>>>>>>> * offset >>>>>>>>>>>>>>> * timestamp >>>>>>>>>>>>>>> * headers >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Your proposal sounds good to me as-is. Just to cover the >>>>>>>>>>>>>>> bases, though, I'm wondering if we should push the idea >>>>>> just >>>>>>>>>>>>>>> a little farther. Instead of decomposing >>>>>> key,value,context, >>>>>>>>>>>>>>> we could just keep them all in one object, like this: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Record: >>>>>>>>>>>>>>> * key >>>>>>>>>>>>>>> * value >>>>>>>>>>>>>>> * topic >>>>>>>>>>>>>>> * partition >>>>>>>>>>>>>>> * offset >>>>>>>>>>>>>>> * timestamp >>>>>>>>>>>>>>> * headers >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Then, we could have: >>>>>>>>>>>>>>> Processor#process(Record) >>>>>>>>>>>>>>> ProcessorContext#forward(Record, To) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Viewed from this perspective, a record has three >>>>>> properties >>>>>>>>>>>>>>> that people may specify in their processors: key, value, >>>>>> and >>>>>>>>>>>>>>> timestamp. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> We could deprecate `To#withTimestamp` and enable people to >>>>>>>>>>>>>>> specify the timestamp along with the key and value when >>>>>> they >>>>>>>>>>>>>>> forward a record. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> E.g., >>>>>>>>>>>>>>> RecordBuilder toForward = RecordBuilder.copy(record) >>>>>>>>>>>>>>> toForward.withKey(newKey) >>>>>>>>>>>>>>> toForward.withValue(newValue) >>>>>>>>>>>>>>> toForward.withTimestamp(newTimestamp) >>>>>>>>>>>>>>> Record newRecord = toForward.build() >>>>>>>>>>>>>>> context.forward(newRecord, To.child("child1")) >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Or, the more compact common case: >>>>>>>>>>>>>>> current: >>>>>>>>>>>>>>> context.forward(key, "newValue") >>>>>>>>>>>>>>> proposed: >>>>>>>>>>>>>>> >>>>>> context.forward(copy(record).withValue("newValue").build()) >>>>>>>>>>>>>>> It's slightly more verbose, but also more extensible. This >>>>>>>>>>>>>>> would give us a clean path to add header support in PAPI >>>>>> as >>>>>>>>>>>>>>> well, simply by adding `withHeaders` in RecordBuilder. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> It's also more symmetrical, since the recipient of >>>>>> `forward` >>>>>>>>>>>>>>> would just get the sent `Record`. Whereas today, the >>>>>> sender >>>>>>>>>>>>>>> puts the timestamp in `To`, but the recipient gets in in >>>>>> its >>>>>>>>>>>>>>> own `ProcessorContext`. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> WDYT? >>>>>>>>>>>>>>> -John >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Fri, 2020-09-11 at 12:30 -0700, Matthias J. Sax wrote: >>>>>>>>>>>>>>>> I think separating the different contexts make sense. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> In fact, we could even go one step further and remove >>>>>> the >>>>>>>>>> record >>>>>>>>>>>>> context >>>>>>>>>>>>>>>> from the processor context completely and we add a third >>>>>>>>>> parameter to >>>>>>>>>>>>>>>> `process(key, value, recordContext)`. This would make it >>>>>>>> clear >>>>>>>>>> that >>>>>>>>>>>>> the >>>>>>>>>>>>>>>> context is for the input record only and it's not >>>>>>>> possible to >>>>>>>>>> pass >>>>>>>>>>>>> it to >>>>>>>>>>>>>>>> a `punctuate` callback. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> For the stores and changelogging: I think there are two >>>>>>>> cases. >>>>>>>>>> (1) >>>>>>>>>>>>> You >>>>>>>>>>>>>>>> use a plain key-value store. For this case, it seems you >>>>>>>> do >>>>>>>>>> not care >>>>>>>>>>>>>>>> about the timestamp and thus does not care what >>>>>> timestamp >>>>>>>> is >>>>>>>>>> set in >>>>>>>>>>>>> the >>>>>>>>>>>>>>>> changelog records. (We can set anything we want, as it's >>>>>>>> not >>>>>>>>>>>>> relevant at >>>>>>>>>>>>>>>> all -- the timestamp is ignored on read anyway.) (2) The >>>>>>>> other >>>>>>>>>> case >>>>>>>>>>>>> is, >>>>>>>>>>>>>>>> that one does care about timestamps, and for this case >>>>>>>> should >>>>>>>>>> use >>>>>>>>>>>>>>>> TimestampedKeyValueStore. The passed timestamp will be >>>>>>>> set on >>>>>>>>>> the >>>>>>>>>>>>>>>> changelog records for this case. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thus, for both cases, accessing the record context does >>>>>>>> not >>>>>>>>>> seems to >>>>>>>>>>>>> be >>>>>>>>>>>>>>>> a requirement. And providing access to the processor >>>>>>>> context >>>>>>>>>> to, eg., >>>>>>>>>>>>>>>> `forward()` or similar seems safe. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On 9/10/20 7:25 PM, John Roesler wrote: >>>>>>>>>>>>>>>>> Thanks for the reply, Paul! >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I certainly intend to make sure that the changelogging >>>>>>>> layer >>>>>>>>>>>>>>>>> continues to work the way it does now, by hook or by >>>>>>>> crook. >>>>>>>>>>>>>>>>> I think the easiest path for me is to just "cheat" and >>>>>>>> get >>>>>>>>>>>>>>>>> the real ProcessorContext into the ChangeLoggingStore >>>>>>>>>>>>>>>>> implementation somehow. I'll tag you on the PR when I >>>>>>>> create >>>>>>>>>>>>>>>>> it, so you have an opportunity to express a preference >>>>>>>> about >>>>>>>>>>>>>>>>> the implementation choice, and maybe even compile/test >>>>>>>>>>>>>>>>> against it to make sure your stuff still works. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Regarding this: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> we have an interest in making a state store with a >>>>>>>> richer >>>>>>>>>>>>>>>>>> way of querying its data (like perhaps getting all >>>>>>>> values >>>>>>>>>>>>>>>>>> associated with a secondary key), while still >>>>>>>> ultimately >>>>>>>>>>>>>>>>>> writing to the changelog topic for later >>>>>> restoration. >>>>>>>>>>>>>>>>> This is very intriguing to me. On the side, I've been >>>>>>>>>>>>>>>>> preparing a couple of ideas related to this topic. I >>>>>>>> don't >>>>>>>>>>>>>>>>> think I have a coherent enough thought to even express >>>>>>>> it in >>>>>>>>>>>>>>>>> a Jira right now, but when I do, I'll tag you on it >>>>>>>> also to >>>>>>>>>>>>>>>>> see what you think. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Whenever you're ready to share the usability >>>>>> improvement >>>>>>>>>>>>>>>>> ideas, I'm very interested to see what you've come up >>>>>>>> with. >>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>> -John >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Thu, 2020-09-10 at 21:02 -0500, Paul Whalen wrote: >>>>>>>>>>>>>>>>>>> when you use a HashMap or RocksDB or other "state >>>>>>>>>> stores", you >>>>>>>>>>>>> don't >>>>>>>>>>>>>>>>>>> expect them to automatically know extra stuff >>>>>> about >>>>>>>> the >>>>>>>>>> record >>>>>>>>>>>>> you're >>>>>>>>>>>>>>>>>>> storing. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> So, I don't think there is any reason we *can't* >>>>>>>> retain the >>>>>>>>>>>>> record context >>>>>>>>>>>>>>>>>>> in the StateStoreContext, and if any users came >>>>>>>> along >>>>>>>>>> with a >>>>>>>>>>>>> clear use case >>>>>>>>>>>>>>>>>>> I'd find that convincing. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I agree with the principle of being conservative >>>>>> with >>>>>>>> the >>>>>>>>>>>>> StateStoreContext >>>>>>>>>>>>>>>>>> API. Regarding user expectations or a clear use >>>>>>>> case, the >>>>>>>>>> only >>>>>>>>>>>>>>>>>> counterpoint I would offer is that we sort of have >>>>>>>> that >>>>>>>>>> use case >>>>>>>>>>>>> already, >>>>>>>>>>>>>>>>>> which is the example I gave of the change logging >>>>>>>> store >>>>>>>>>> using the >>>>>>>>>>>>>>>>>> timestamp. I am curious if this functionality will >>>>>> be >>>>>>>>>> retained >>>>>>>>>>>>> when using >>>>>>>>>>>>>>>>>> built in state stores, or will a low-level processor >>>>>>>> get a >>>>>>>>>>>>> KeyValueStore >>>>>>>>>>>>>>>>>> that no longer writes to the changelog topic with >>>>>> the >>>>>>>>>> record's >>>>>>>>>>>>> timestamp. >>>>>>>>>>>>>>>>>> While I personally don't care much about that >>>>>>>> functionality >>>>>>>>>>>>> specifically, I >>>>>>>>>>>>>>>>>> have a general desire for custom state stores to >>>>>>>> easily do >>>>>>>>>> the >>>>>>>>>>>>> things that >>>>>>>>>>>>>>>>>> built in state stores do. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> It genuinely did not occur to me that users might be >>>>>>>>>> looking up >>>>>>>>>>>>> and/or >>>>>>>>>>>>>>>>>>> updating records of other keys from within a >>>>>>>> Processor. >>>>>>>>>>>>>>>>>> I'm glad you said this Sophie, because it gives me >>>>>> an >>>>>>>>>>>>> opportunity to say >>>>>>>>>>>>>>>>>> that this is actually a *huge* use case for my team. >>>>>>>> The >>>>>>>>>> state >>>>>>>>>>>>> store >>>>>>>>>>>>>>>>>> usability improvements I was referring to in my >>>>>>>> previous >>>>>>>>>> message >>>>>>>>>>>>> were about >>>>>>>>>>>>>>>>>> enabling the user to write custom stores while still >>>>>>>> easily >>>>>>>>>>>>> hooking into >>>>>>>>>>>>>>>>>> the ability to write to a changelog topic. I think >>>>>>>> that is >>>>>>>>>>>>> technically >>>>>>>>>>>>>>>>>> possible now, but I don't think it's trivial. >>>>>>>>>> Specifically, we >>>>>>>>>>>>> have an >>>>>>>>>>>>>>>>>> interest in making a state store with a richer way >>>>>> of >>>>>>>>>> querying >>>>>>>>>>>>> its data >>>>>>>>>>>>>>>>>> (like perhaps getting all values associated with a >>>>>>>>>> secondary >>>>>>>>>>>>> key), while >>>>>>>>>>>>>>>>>> still ultimately writing to the changelog topic for >>>>>>>> later >>>>>>>>>>>>> restoration. >>>>>>>>>>>>>>>>>> We recognize that this use case throws away some of >>>>>>>> what >>>>>>>>>> kafka >>>>>>>>>>>>> streams >>>>>>>>>>>>>>>>>> (especially the DSL) is good at - easy >>>>>>>> parallelizability by >>>>>>>>>>>>> partitioning >>>>>>>>>>>>>>>>>> all processing by key - and that our business logic >>>>>>>> would >>>>>>>>>>>>> completely fall >>>>>>>>>>>>>>>>>> apart if we were consuming from multi-partition >>>>>>>> topics with >>>>>>>>>>>>> multiple >>>>>>>>>>>>>>>>>> consumers. But we have found that using the low >>>>>> level >>>>>>>>>> processor >>>>>>>>>>>>> API is >>>>>>>>>>>>>>>>>> good for the very simple stream processing >>>>>> primitives >>>>>>>> it >>>>>>>>>>>>> provides: handling >>>>>>>>>>>>>>>>>> the plumbing of consuming from multiple kafka topics >>>>>>>> and >>>>>>>>>>>>> potentially >>>>>>>>>>>>>>>>>> updating persistent local state in a reliable way. >>>>>>>> That in >>>>>>>>>>>>> itself has >>>>>>>>>>>>>>>>>> proven to be a worthwhile programming model. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Since I got off track a bit, let me summarize: I >>>>>> don't >>>>>>>>>>>>> particularly care >>>>>>>>>>>>>>>>>> about the record context being available to state >>>>>>>> store >>>>>>>>>>>>> implementations, >>>>>>>>>>>>>>>>>> and I think this KIP is headed in the right >>>>>> direction >>>>>>>> in >>>>>>>>>> that >>>>>>>>>>>>> regard. But >>>>>>>>>>>>>>>>>> more generally, I wanted to express the importance >>>>>> of >>>>>>>>>>>>> maintaining a >>>>>>>>>>>>>>>>>> powerful and flexible StateStore interface. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks! >>>>>>>>>>>>>>>>>> Paul >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Thu, Sep 10, 2020 at 6:11 PM Sophie Blee-Goldman >>>>>> < >>>>>>>>>>>>> sop...@confluent.io> >>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Aha, I did misinterpret the example in your >>>>>> previous >>>>>>>>>> response >>>>>>>>>>>>> regarding the >>>>>>>>>>>>>>>>>>> range query after all. I thought you just meant a >>>>>>>>>> time-range >>>>>>>>>>>>> query inside a >>>>>>>>>>>>>>>>>>> punctuator. It genuinely did not occur to me that >>>>>>>> users >>>>>>>>>> might >>>>>>>>>>>>> be looking up >>>>>>>>>>>>>>>>>>> and/or updating records of other keys from within >>>>>> a >>>>>>>>>> Processor. >>>>>>>>>>>>> Sorry for >>>>>>>>>>>>>>>>>>> being closed minded >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I won't drag out this discussion any further by >>>>>>>> asking >>>>>>>>>> whether >>>>>>>>>>>>> that might >>>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>> a valid use case or just a lurking bug in itself >>>>>> :) >>>>>>>>>>>>>>>>>>> Thanks for humoring me. The current proposal for >>>>>>>> KIP-478 >>>>>>>>>>>>> sounds good to me >>>>>>>>>>>>>>>>>>> On Thu, Sep 10, 2020 at 3:43 PM John Roesler < >>>>>>>>>>>>> vvcep...@apache.org> wrote: >>>>>>>>>>>>>>>>>>>> Ah, thanks Sophie, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I'm sorry for misinterpreting your resonse. Yes, >>>>>>>> we >>>>>>>>>>>>>>>>>>>> absolutely can and should clear the context >>>>>> before >>>>>>>>>>>>>>>>>>>> punctuating. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> My secondary concern is maybe more far-fetched. >>>>>> I >>>>>>>> was >>>>>>>>>>>>>>>>>>>> thinking that inside process(key,value), a >>>>>>>> Processor >>>>>>>>>> might >>>>>>>>>>>>>>>>>>>> do a get/put of a _different_ key. Consider, for >>>>>>>>>> example, >>>>>>>>>>>>>>>>>>>> the way that Suppress processors work. When they >>>>>>>> get a >>>>>>>>>>>>>>>>>>>> record, they add it to the store and then do a >>>>>>>> range >>>>>>>>>> scan >>>>>>>>>>>>>>>>>>>> and possibly forward a _different_ record. Of >>>>>>>> course, >>>>>>>>>> this >>>>>>>>>>>>>>>>>>>> is an operation that is deeply coupled to the >>>>>>>>>> internals, and >>>>>>>>>>>>>>>>>>>> the Suppress processor accordingly actually does >>>>>>>> get >>>>>>>>>> access >>>>>>>>>>>>>>>>>>>> to the internal context so that it can set the >>>>>>>> context >>>>>>>>>>>>>>>>>>>> before forwarding. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Still, it seems like I've had a handful of >>>>>>>>>> conversations >>>>>>>>>>>>>>>>>>>> with people over the years in which they tell me >>>>>>>> they >>>>>>>>>> are >>>>>>>>>>>>>>>>>>>> using state stores in a way that transcends the >>>>>>>> "get >>>>>>>>>> and put >>>>>>>>>>>>>>>>>>>> the currently processing record" access >>>>>> pattern. I >>>>>>>>>> doubt >>>>>>>>>>>>>>>>>>>> that those folks would even have considered the >>>>>>>>>> possiblity >>>>>>>>>>>>>>>>>>>> that the currently processing record's _context_ >>>>>>>> could >>>>>>>>>>>>>>>>>>>> pollute their state store operations, as I >>>>>> myself >>>>>>>>>> never gave >>>>>>>>>>>>>>>>>>>> it a second thought until the current >>>>>> conversation >>>>>>>>>> began. In >>>>>>>>>>>>>>>>>>>> cases like that, we have actually set a trap for >>>>>>>> these >>>>>>>>>>>>>>>>>>>> people, and it seems better to dismantle the >>>>>> trap. >>>>>>>>>>>>>>>>>>>> As you noted, really the only people who would >>>>>> be >>>>>>>>>> negatively >>>>>>>>>>>>>>>>>>>> impacted are people who implement their own >>>>>> state >>>>>>>>>> stores. >>>>>>>>>>>>>>>>>>>> These folks will get the deprecation warning and >>>>>>>> try to >>>>>>>>>>>>>>>>>>>> adapt their stores to the new interface. If they >>>>>>>> needed >>>>>>>>>>>>>>>>>>>> access to the record context, they would find >>>>>>>> it's now >>>>>>>>>>>>>>>>>>>> missing. They'd ask us about it, and we'd have >>>>>> the >>>>>>>>>> ability >>>>>>>>>>>>>>>>>>>> to explain the lurking bug that they have had in >>>>>>>> their >>>>>>>>>>>>>>>>>>>> stores all along, as well as the new recommended >>>>>>>>>> pattern >>>>>>>>>>>>>>>>>>>> (just pass everything you need in the value). If >>>>>>>> that's >>>>>>>>>>>>>>>>>>>> unsatisfying, _then_ we should consider amending >>>>>>>> the >>>>>>>>>> API. >>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>> -John >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Thu, 2020-09-10 at 15:21 -0700, Sophie >>>>>>>> Blee-Goldman >>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>> Regarding your first sentence, "...the >>>>>>>> processor >>>>>>>>>> would >>>>>>>>>>>>> null >>>>>>>>>>>>>>>>>>>>>> out the record context...", this is not >>>>>>>> possible, >>>>>>>>>> since >>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> processor doesn't have write access to the >>>>>>>>>> context. We >>>>>>>>>>>>> could >>>>>>>>>>>>>>>>>>>>>> add it, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Sorry, this was poorly phrased, I definitely >>>>>>>> did not >>>>>>>>>> mean >>>>>>>>>>>>> to imply that >>>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>> should make the context modifiable by the >>>>>>>> Processors >>>>>>>>>>>>> themselves. I >>>>>>>>>>>>>>>>>>> meant >>>>>>>>>>>>>>>>>>>>> this should be handled by the internal >>>>>>>> processing >>>>>>>>>>>>> framework that deals >>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>> passing records from one Processor to the >>>>>> next, >>>>>>>>>> setting >>>>>>>>>>>>> the record >>>>>>>>>>>>>>>>>>>> context >>>>>>>>>>>>>>>>>>>>> when a new record is picked up, invoking the >>>>>>>>>> punctuators, >>>>>>>>>>>>> etc. I >>>>>>>>>>>>>>>>>>> believe >>>>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>> all currently happens in the StreamTask? It >>>>>>>> already >>>>>>>>>> can >>>>>>>>>>>>> and does >>>>>>>>>>>>>>>>>>>> overwrite >>>>>>>>>>>>>>>>>>>>> the record context as new records are >>>>>>>> processed, and >>>>>>>>>> is >>>>>>>>>>>>> also >>>>>>>>>>>>>>>>>>> responsible >>>>>>>>>>>>>>>>>>>>> for calling the punctuators, so it doesn't >>>>>> seem >>>>>>>> like >>>>>>>>>> a >>>>>>>>>>>>> huge leap to >>>>>>>>>>>>>>>>>>> just >>>>>>>>>>>>>>>>>>>> say >>>>>>>>>>>>>>>>>>>>> "null out the current record before >>>>>> punctuating" >>>>>>>>>>>>>>>>>>>>> To clarify, I was never advocating or even >>>>>>>>>> considering to >>>>>>>>>>>>> give the >>>>>>>>>>>>>>>>>>>>> Processors >>>>>>>>>>>>>>>>>>>>> write access to the record context. Sorry if >>>>>> my >>>>>>>> last >>>>>>>>>>>>> message (or all of >>>>>>>>>>>>>>>>>>>>> them) >>>>>>>>>>>>>>>>>>>>> was misleading. I just wanted to point out >>>>>> that >>>>>>>> the >>>>>>>>>>>>> punctuator concern >>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>> orthogonal to the question of whether we >>>>>> should >>>>>>>>>> include >>>>>>>>>>>>> the record >>>>>>>>>>>>>>>>>>>> context >>>>>>>>>>>>>>>>>>>>> in the StateStoreContext. It's definitely a >>>>>> real >>>>>>>>>> problem, >>>>>>>>>>>>> but it's a >>>>>>>>>>>>>>>>>>>>> problem >>>>>>>>>>>>>>>>>>>>> that exists at the Processor level and not >>>>>> just >>>>>>>> the >>>>>>>>>>>>> StateStore. >>>>>>>>>>>>>>>>>>>>> So, I don't think there is any reason we >>>>>> *can't* >>>>>>>>>> retain >>>>>>>>>>>>> the record >>>>>>>>>>>>>>>>>>>> context >>>>>>>>>>>>>>>>>>>>> in the >>>>>>>>>>>>>>>>>>>>> StateStoreContext, and if any users came along >>>>>>>> with a >>>>>>>>>>>>> clear use case >>>>>>>>>>>>>>>>>>> I'd >>>>>>>>>>>>>>>>>>>>> find >>>>>>>>>>>>>>>>>>>>> that convincing. In the absence of any >>>>>>>> examples, the >>>>>>>>>>>>> conservative >>>>>>>>>>>>>>>>>>>> approach >>>>>>>>>>>>>>>>>>>>> sounds good to me. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> If it turns out that someone did need the >>>>>> record >>>>>>>>>> context >>>>>>>>>>>>> in their >>>>>>>>>>>>>>>>>>> custom >>>>>>>>>>>>>>>>>>>>> state >>>>>>>>>>>>>>>>>>>>> store, I'm sure they'll submit a politely >>>>>>>> worded bug >>>>>>>>>>>>> report alerting us >>>>>>>>>>>>>>>>>>>>> that we >>>>>>>>>>>>>>>>>>>>> broke their application. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Thu, Sep 10, 2020 at 3:05 PM John Roesler < >>>>>>>>>>>>> vvcep...@apache.org> >>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>> Thanks, Sophie, >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Yes, now that you point it out, I can see >>>>>>>> that the >>>>>>>>>> record >>>>>>>>>>>>>>>>>>>>>> context itself should be nulled out by >>>>>> Streams >>>>>>>>>> before >>>>>>>>>>>>>>>>>>>>>> invoking punctuators. From that perspective, >>>>>>>> we >>>>>>>>>> don't >>>>>>>>>>>>> need >>>>>>>>>>>>>>>>>>>>>> to think about the second-order problem of >>>>>>>> what's >>>>>>>>>> in the >>>>>>>>>>>>>>>>>>>>>> context for the state store when called >>>>>> from a >>>>>>>>>>>>> punctuator. >>>>>>>>>>>>>>>>>>>>>> Regarding your first sentence, "...the >>>>>>>> processor >>>>>>>>>> would >>>>>>>>>>>>> null >>>>>>>>>>>>>>>>>>>>>> out the record context...", this is not >>>>>>>> possible, >>>>>>>>>> since >>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> processor doesn't have write access to the >>>>>>>>>> context. We >>>>>>>>>>>>> could >>>>>>>>>>>>>>>>>>>>>> add it, but then all kinds of strange >>>>>> effects >>>>>>>>>> would ensue >>>>>>>>>>>>>>>>>>>>>> when downstream processors execute but the >>>>>>>> context >>>>>>>>>> is >>>>>>>>>>>>> empty, >>>>>>>>>>>>>>>>>>>>>> etc. Better to just let the framework manage >>>>>>>> the >>>>>>>>>> record >>>>>>>>>>>>>>>>>>>>>> context and keep it read-only for >>>>>> Processors. >>>>>>>>>>>>>>>>>>>>>> Reading between the lines of your last >>>>>> reply, >>>>>>>> it >>>>>>>>>> sounds >>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>> the disconnect may just have been a mutual >>>>>>>>>>>>> misunderstanding >>>>>>>>>>>>>>>>>>>>>> about whether or not Processors currently >>>>>> have >>>>>>>>>> access to >>>>>>>>>>>>> set >>>>>>>>>>>>>>>>>>>>>> the record context. Since they do not, if we >>>>>>>>>> wanted to >>>>>>>>>>>>> add >>>>>>>>>>>>>>>>>>>>>> the record context to StateStoreContext in a >>>>>>>>>> well-defined >>>>>>>>>>>>>>>>>>>>>> way, we'd also have to add the ability for >>>>>>>>>> Processors to >>>>>>>>>>>>>>>>>>>>>> manipulate it. But then, we're just >>>>>> creating a >>>>>>>>>>>>> side-channel >>>>>>>>>>>>>>>>>>>>>> for Processors to pass some information in >>>>>>>>>> arguments to >>>>>>>>>>>>>>>>>>>>>> "put()" and other information implicitly >>>>>>>> through >>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> context. It seems better just to go for a >>>>>>>> single >>>>>>>>>> channel >>>>>>>>>>>>> for >>>>>>>>>>>>>>>>>>>>>> now. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> It sounds like you're basically in favor of >>>>>>>> the >>>>>>>>>>>>> conservative >>>>>>>>>>>>>>>>>>>>>> approach, and you just wanted to understand >>>>>>>> the >>>>>>>>>> blockers >>>>>>>>>>>>>>>>>>>>>> that I implied. Does my clarification make >>>>>>>> sense? >>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>> -John >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Thu, 2020-09-10 at 10:54 -0700, Sophie >>>>>>>>>> Blee-Goldman >>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>> I was just thinking that the processor >>>>>> would >>>>>>>>>> null out >>>>>>>>>>>>> the record >>>>>>>>>>>>>>>>>>>> context >>>>>>>>>>>>>>>>>>>>>>> after it >>>>>>>>>>>>>>>>>>>>>>> finished processing the record, so I'm not >>>>>>>> sure I >>>>>>>>>>>>> follow why this >>>>>>>>>>>>>>>>>>>> would >>>>>>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>>> possible? AFAIK we never call a punctuator >>>>>>>> in the >>>>>>>>>>>>> middle of >>>>>>>>>>>>>>>>>>>> processing a >>>>>>>>>>>>>>>>>>>>>>> record through the topology, and even if >>>>>> we >>>>>>>> did, >>>>>>>>>> we >>>>>>>>>>>>> still know when >>>>>>>>>>>>>>>>>>>> it is >>>>>>>>>>>>>>>>>>>>>>> about >>>>>>>>>>>>>>>>>>>>>>> to be called and could set it to null >>>>>>>> beforehand. >>>>>>>>>>>>>>>>>>>>>>> I'm not trying to advocate for it here, >>>>>> I'm >>>>>>>> in >>>>>>>>>>>>> agreement that >>>>>>>>>>>>>>>>>>>> anything >>>>>>>>>>>>>>>>>>>>>> you >>>>>>>>>>>>>>>>>>>>>>> want >>>>>>>>>>>>>>>>>>>>>>> to access within the store can and should >>>>>> be >>>>>>>>>> accessed >>>>>>>>>>>>> within the >>>>>>>>>>>>>>>>>>>> calling >>>>>>>>>>>>>>>>>>>>>>> Processor/Punctuator before reaching the >>>>>>>> store. >>>>>>>>>> The >>>>>>>>>>>>> "we can always >>>>>>>>>>>>>>>>>>>> add it >>>>>>>>>>>>>>>>>>>>>>> later if necessary" argument is also >>>>>> pretty >>>>>>>>>>>>> convincing. Just trying >>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>> understand >>>>>>>>>>>>>>>>>>>>>>> why this wouldn't be possible. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> FWIW, the question of "what is the current >>>>>>>>>> record in >>>>>>>>>>>>> the context >>>>>>>>>>>>>>>>>>> of a >>>>>>>>>>>>>>>>>>>>>>> Punctuator" >>>>>>>>>>>>>>>>>>>>>>> exists independently of whether we want to >>>>>>>> add >>>>>>>>>> this to >>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> StateStoreContext >>>>>>>>>>>>>>>>>>>>>>> or not. The full ProcessorContext, >>>>>>>> including the >>>>>>>>>>>>> current record >>>>>>>>>>>>>>>>>>>> context, >>>>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>> already available within a Punctuator, so >>>>>>>>>> removing the >>>>>>>>>>>>> current >>>>>>>>>>>>>>>>>>> record >>>>>>>>>>>>>>>>>>>>>>> context >>>>>>>>>>>>>>>>>>>>>>> from the StateStoreContext does not solve >>>>>>>> the >>>>>>>>>> problem. >>>>>>>>>>>>> Users can -- >>>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>>>>>>>> (see KAFKA-9584 < >>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-9584 >>>>>>>>>>>>>>>>>>>> ;;) >>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>>> hit >>>>>>>>>>>>>>>>>>>>>>> such subtle bugs without ever invoking a >>>>>>>>>> StateStore >>>>>>>>>>>>>>>>>>>>>>> from their punctuator. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Again, I think I do agree that we should >>>>>>>> leave >>>>>>>>>> the >>>>>>>>>>>>> current record >>>>>>>>>>>>>>>>>>>> context >>>>>>>>>>>>>>>>>>>>>>> off of >>>>>>>>>>>>>>>>>>>>>>> the StateStoreContext, but I don't think >>>>>> the >>>>>>>>>>>>> Punctuator argument >>>>>>>>>>>>>>>>>>>> against >>>>>>>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>>>>>> is >>>>>>>>>>>>>>>>>>>>>>> very convincing. It sounds to me like we >>>>>>>> need to >>>>>>>>>>>>> disallow access to >>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>> current >>>>>>>>>>>>>>>>>>>>>>> record context from within the Punctuator, >>>>>>>>>> independent >>>>>>>>>>>>> of anything >>>>>>>>>>>>>>>>>>>> to do >>>>>>>>>>>>>>>>>>>>>>> with >>>>>>>>>>>>>>>>>>>>>>> state stores >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On Thu, Sep 10, 2020 at 7:12 AM John >>>>>>>> Roesler < >>>>>>>>>>>>> vvcep...@apache.org> >>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>> Thanks for the thoughts, Sophie. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> I agree that the extra information could >>>>>>>> be >>>>>>>>>> useful. >>>>>>>>>>>>> My only >>>>>>>>>>>>>>>>>>>> concern is >>>>>>>>>>>>>>>>>>>>>>>> that it doesn’t seem like we can >>>>>> actually >>>>>>>>>> supply >>>>>>>>>>>>> that extra >>>>>>>>>>>>>>>>>>>> information >>>>>>>>>>>>>>>>>>>>>>>> correctly. So, then we have a situation >>>>>>>> where >>>>>>>>>> the >>>>>>>>>>>>> system offers >>>>>>>>>>>>>>>>>>>> useful >>>>>>>>>>>>>>>>>>>>>> API >>>>>>>>>>>>>>>>>>>>>>>> calls that are only correct in a narrow >>>>>>>> range >>>>>>>>>> of use >>>>>>>>>>>>> cases. >>>>>>>>>>>>>>>>>>>> Outside of >>>>>>>>>>>>>>>>>>>>>>>> those use cases, you get incorrect >>>>>>>> behavior. >>>>>>>>>>>>>>>>>>>>>>>> If it were possible to null out the >>>>>>>> context >>>>>>>>>> before >>>>>>>>>>>>> you put a >>>>>>>>>>>>>>>>>>>> document >>>>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>> which the context doesn’t apply, then >>>>>> the >>>>>>>>>> concern >>>>>>>>>>>>> would be >>>>>>>>>>>>>>>>>>>> mitigated. >>>>>>>>>>>>>>>>>>>>>> But >>>>>>>>>>>>>>>>>>>>>>>> it would still be pretty weird from the >>>>>>>>>> perspective >>>>>>>>>>>>> of the store >>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>> sometimes the context is populated and >>>>>>>> other >>>>>>>>>> times, >>>>>>>>>>>>> it’s null. >>>>>>>>>>>>>>>>>>>>>>>> But that seems moot, since it doesn’t >>>>>> seem >>>>>>>>>> possible >>>>>>>>>>>>> to null out >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>> context. Only the Processor could know >>>>>>>> whether >>>>>>>>>> it’s >>>>>>>>>>>>> about to put >>>>>>>>>>>>>>>>>>> a >>>>>>>>>>>>>>>>>>>>>> document >>>>>>>>>>>>>>>>>>>>>>>> different from the context or not. And >>>>>> it >>>>>>>>>> would be >>>>>>>>>>>>> inappropriate >>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>> offer a >>>>>>>>>>>>>>>>>>>>>>>> public ProcessorContext api to manage >>>>>> the >>>>>>>>>> record >>>>>>>>>>>>> context. >>>>>>>>>>>>>>>>>>>>>>>> Ultimately, it still seems like if you >>>>>>>> want to >>>>>>>>>> store >>>>>>>>>>>>> headers, you >>>>>>>>>>>>>>>>>>>> can >>>>>>>>>>>>>>>>>>>>>>>> store them explicitly, right? That >>>>>>>> doesn’t seem >>>>>>>>>>>>> onerous to me, >>>>>>>>>>>>>>>>>>> and >>>>>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>>>>> kind >>>>>>>>>>>>>>>>>>>>>>>> of seems better than relying on >>>>>> undefined >>>>>>>> or >>>>>>>>>>>>> asymmetrical >>>>>>>>>>>>>>>>>>> behavior >>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>> store itself. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Anyway, I’m not saying that we couldn’t >>>>>>>> solve >>>>>>>>>> these >>>>>>>>>>>>> problems. >>>>>>>>>>>>>>>>>>> Just >>>>>>>>>>>>>>>>>>>>>> that it >>>>>>>>>>>>>>>>>>>>>>>> seems a little that we can be >>>>>>>> conservative and >>>>>>>>>> avoid >>>>>>>>>>>>> them for >>>>>>>>>>>>>>>>>>> now. >>>>>>>>>>>>>>>>>>>> If >>>>>>>>>>>>>>>>>>>>>> it >>>>>>>>>>>>>>>>>>>>>>>> turns out we really need to solve them, >>>>>>>> we can >>>>>>>>>>>>> always do it >>>>>>>>>>>>>>>>>>> later. >>>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>>>> John >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> On Wed, Sep 9, 2020, at 22:46, Sophie >>>>>>>>>> Blee-Goldman >>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>> If you were to call "put" from a >>>>>>>>>> punctuator, or >>>>>>>>>>>>> do a >>>>>>>>>>>>>>>>>>>>>>>>>> `range()` query and then update one >>>>>> of >>>>>>>>>> those >>>>>>>>>>>>> records with >>>>>>>>>>>>>>>>>>>>>>>>>> `put()`, you'd have a very subtle >>>>>> bug >>>>>>>> on >>>>>>>>>> your >>>>>>>>>>>>> hands. >>>>>>>>>>>>>>>>>>>>>>>>> Can you elaborate on this a bit? I >>>>>> agree >>>>>>>>>> that the >>>>>>>>>>>>> punctuator >>>>>>>>>>>>>>>>>>>> case is >>>>>>>>>>>>>>>>>>>>>> an >>>>>>>>>>>>>>>>>>>>>>>>> obvious exemption to the assumption >>>>>> that >>>>>>>>>> store >>>>>>>>>>>>> invocations >>>>>>>>>>>>>>>>>>> always >>>>>>>>>>>>>>>>>>>>>>>>> have a corresponding "current record", >>>>>>>> but I >>>>>>>>>> don't >>>>>>>>>>>>> understand >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>> second example. Are you envisioning a >>>>>>>>>> scenario >>>>>>>>>>>>> where the >>>>>>>>>>>>>>>>>>> #process >>>>>>>>>>>>>>>>>>>>>>>>> method performs a range query and then >>>>>>>>>> updates >>>>>>>>>>>>> records? Or were >>>>>>>>>>>>>>>>>>>>>>>>> you just giving another example of the >>>>>>>>>> punctuator >>>>>>>>>>>>> case? >>>>>>>>>>>>>>>>>>>>>>>>> I only bring it up because I agree >>>>>> that >>>>>>>> the >>>>>>>>>>>>> current record >>>>>>>>>>>>>>>>>>>>>> information >>>>>>>>>>>>>>>>>>>>>>>> could >>>>>>>>>>>>>>>>>>>>>>>>> still be useful within the context of >>>>>>>> the >>>>>>>>>> store. >>>>>>>>>>>>> As a non-user >>>>>>>>>>>>>>>>>>> my >>>>>>>>>>>>>>>>>>>>>> input >>>>>>>>>>>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>>>> definitely has limited value, but it >>>>>>>> just >>>>>>>>>> isn't >>>>>>>>>>>>> striking me as >>>>>>>>>>>>>>>>>>>>>> obvious >>>>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>> we >>>>>>>>>>>>>>>>>>>>>>>>> should remove access to the current >>>>>>>> record >>>>>>>>>> context >>>>>>>>>>>>> from the >>>>>>>>>>>>>>>>>>> state >>>>>>>>>>>>>>>>>>>>>> stores. >>>>>>>>>>>>>>>>>>>>>>>>> If there is no current record, as in >>>>>> the >>>>>>>>>>>>> punctuator case, we >>>>>>>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>>>>>>> just >>>>>>>>>>>>>>>>>>>>>>>>> set >>>>>>>>>>>>>>>>>>>>>>>>> the record context to null (or >>>>>>>>>> Optional.empty, >>>>>>>>>>>>> etc). >>>>>>>>>>>>>>>>>>>>>>>>> That said, the put() always has to >>>>>> come >>>>>>>> from >>>>>>>>>>>>> somewhere, and >>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>> somewhere is always going to be >>>>>> either a >>>>>>>>>> Processor >>>>>>>>>>>>> or a >>>>>>>>>>>>>>>>>>>> Punctuator, >>>>>>>>>>>>>>>>>>>>>> both >>>>>>>>>>>>>>>>>>>>>>>>> of which will still have access to the >>>>>>>> full >>>>>>>>>>>>> context. So >>>>>>>>>>>>>>>>>>>> additional >>>>>>>>>>>>>>>>>>>>>> info >>>>>>>>>>>>>>>>>>>>>>>>> such as >>>>>>>>>>>>>>>>>>>>>>>>> the timestamp can and should probably >>>>>> be >>>>>>>>>> supplied >>>>>>>>>>>>> to the store >>>>>>>>>>>>>>>>>>>> before >>>>>>>>>>>>>>>>>>>>>>>>> calling put(), rather than looked up >>>>>> by >>>>>>>> the >>>>>>>>>> store. >>>>>>>>>>>>> But I can >>>>>>>>>>>>>>>>>>> see >>>>>>>>>>>>>>>>>>>> some >>>>>>>>>>>>>>>>>>>>>>>> other >>>>>>>>>>>>>>>>>>>>>>>>> things being useful, for example the >>>>>>>> current >>>>>>>>>>>>> record's headers. >>>>>>>>>>>>>>>>>>>> Maybe >>>>>>>>>>>>>>>>>>>>>>>> if/when >>>>>>>>>>>>>>>>>>>>>>>>> we add better (or any) support for >>>>>>>> headers in >>>>>>>>>>>>> state stores this >>>>>>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>>>>>>>> be >>>>>>>>>>>>>>>>>>>>>>>>> less true. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Of course as John has made clear, it's >>>>>>>>>> pretty hard >>>>>>>>>>>>> to judge >>>>>>>>>>>>>>>>>>>> without >>>>>>>>>>>>>>>>>>>>>>>>> examples >>>>>>>>>>>>>>>>>>>>>>>>> and more insight as to what actually >>>>>>>> goes on >>>>>>>>>>>>> within a custom >>>>>>>>>>>>>>>>>>>> state >>>>>>>>>>>>>>>>>>>>>> store >>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Sep 9, 2020 at 8:07 PM John >>>>>>>> Roesler < >>>>>>>>>>>>>>>>>>> vvcep...@apache.org >>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>> Hi Paul, >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> It's good to hear from you! >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> I'm glad you're in favor of the >>>>>>>> direction. >>>>>>>>>>>>> Especially when >>>>>>>>>>>>>>>>>>>>>>>>>> it comes to public API and usability >>>>>>>>>> concens, I >>>>>>>>>>>>> tend to >>>>>>>>>>>>>>>>>>>>>>>>>> think that "the folks who matter" >>>>>> are >>>>>>>>>> actually >>>>>>>>>>>>> the folks who >>>>>>>>>>>>>>>>>>>>>>>>>> have to use the APIs to accomplish >>>>>>>> real >>>>>>>>>> tasks. >>>>>>>>>>>>> It can be >>>>>>>>>>>>>>>>>>>>>>>>>> hard for me to be sure I'm thinking >>>>>>>>>> clearly from >>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>>> perspective. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Funny story, I also started down >>>>>> this >>>>>>>> road >>>>>>>>>> a >>>>>>>>>>>>> couple of times >>>>>>>>>>>>>>>>>>>>>>>>>> already and backed them out before >>>>>>>> the KIP >>>>>>>>>>>>> because I was >>>>>>>>>>>>>>>>>>>>>>>>>> afraid of the scope of the proposal. >>>>>>>>>>>>> Unfortunately, needing >>>>>>>>>>>>>>>>>>>>>>>>>> to make a new ProcessorContext kind >>>>>> of >>>>>>>>>> forced my >>>>>>>>>>>>> hand. >>>>>>>>>>>>>>>>>>>>>>>>>> I see you've called me out about the >>>>>>>>>>>>> ChangeLogging stores :) >>>>>>>>>>>>>>>>>>>>>>>>>> In fact, I think these are the >>>>>>>> main/only >>>>>>>>>> reason >>>>>>>>>>>>> that stores >>>>>>>>>>>>>>>>>>>>>>>>>> might really need to invoke >>>>>>>> "forward()". My >>>>>>>>>>>>> secret plan was >>>>>>>>>>>>>>>>>>>>>>>>>> to cheat and either accomplish >>>>>>>>>> change-logging by >>>>>>>>>>>>> a different >>>>>>>>>>>>>>>>>>>>>>>>>> mechanism than implementing the >>>>>> store >>>>>>>>>> interface, >>>>>>>>>>>>> or by just >>>>>>>>>>>>>>>>>>>>>>>>>> breaking encapsulation to sneak the >>>>>>>> "real" >>>>>>>>>>>>> ProcessorContext >>>>>>>>>>>>>>>>>>>>>>>>>> into the ChangeLogging stores. But >>>>>>>> those >>>>>>>>>> are all >>>>>>>>>>>>>>>>>>>>>>>>>> implementation details. I think the >>>>>>>> key >>>>>>>>>> question >>>>>>>>>>>>> is whether >>>>>>>>>>>>>>>>>>>>>>>>>> anyone else has a store >>>>>>>> implementation that >>>>>>>>>>>>> needs to call >>>>>>>>>>>>>>>>>>>>>>>>>> "forward()". It's not what you >>>>>>>> mentioned, >>>>>>>>>> but >>>>>>>>>>>>> since you >>>>>>>>>>>>>>>>>>>>>>>>>> spoke up, I'll just ask: if you have >>>>>>>> a use >>>>>>>>>> case >>>>>>>>>>>>> for calling >>>>>>>>>>>>>>>>>>>>>>>>>> "forward()" in a store, please share >>>>>>>> it. >>>>>>>>>>>>>>>>>>>>>>>>>> Regarding the other record-specific >>>>>>>> context >>>>>>>>>>>>> methods, I think >>>>>>>>>>>>>>>>>>>>>>>>>> you have a good point, but I also >>>>>>>> can't >>>>>>>>>> quite >>>>>>>>>>>>> wrap my head >>>>>>>>>>>>>>>>>>>>>>>>>> around how we can actually guarantee >>>>>>>> it to >>>>>>>>>> work >>>>>>>>>>>>> in general. >>>>>>>>>>>>>>>>>>>>>>>>>> For example, the case you cited, >>>>>>>> where the >>>>>>>>>>>>> implementation of >>>>>>>>>>>>>>>>>>>>>>>>>> `KeyValueStore#put(key, value)` uses >>>>>>>> the >>>>>>>>>> context >>>>>>>>>>>>> to augment >>>>>>>>>>>>>>>>>>>>>>>>>> the record with timestamp >>>>>>>> information. This >>>>>>>>>>>>> relies on the >>>>>>>>>>>>>>>>>>>>>>>>>> assumption that you would only call >>>>>>>>>> "put()" from >>>>>>>>>>>>> inside a >>>>>>>>>>>>>>>>>>>>>>>>>> `Processor#process(key, value)` call >>>>>>>> in >>>>>>>>>> which >>>>>>>>>>>>> the record >>>>>>>>>>>>>>>>>>>>>>>>>> being processed is the same record >>>>>>>> that >>>>>>>>>> you're >>>>>>>>>>>>> trying to put >>>>>>>>>>>>>>>>>>>>>>>>>> into the store. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> If you were to call "put" from a >>>>>>>>>> punctuator, or >>>>>>>>>>>>> do a >>>>>>>>>>>>>>>>>>>>>>>>>> `range()` query and then update one >>>>>> of >>>>>>>>>> those >>>>>>>>>>>>> records with >>>>>>>>>>>>>>>>>>>>>>>>>> `put()`, you'd have a very subtle >>>>>> bug >>>>>>>> on >>>>>>>>>> your >>>>>>>>>>>>> hands. Right >>>>>>>>>>>>>>>>>>>>>>>>>> now, the Streams component that >>>>>>>> actually >>>>>>>>>> calls >>>>>>>>>>>>> the Processor >>>>>>>>>>>>>>>>>>>>>>>>>> takes care to set the right record >>>>>>>> context >>>>>>>>>>>>> before invoking >>>>>>>>>>>>>>>>>>>>>>>>>> the method, and in the case of >>>>>>>> caching, >>>>>>>>>> etc., it >>>>>>>>>>>>> also takes >>>>>>>>>>>>>>>>>>>>>>>>>> care to swap out the old context and >>>>>>>> keep >>>>>>>>>> it >>>>>>>>>>>>> somewhere safe. >>>>>>>>>>>>>>>>>>>>>>>>>> But when it comes to public API >>>>>>>> Processors >>>>>>>>>>>>> calling methods >>>>>>>>>>>>>>>>>>>>>>>>>> on StateStores, there's no >>>>>>>> opportunity for >>>>>>>>>> any >>>>>>>>>>>>> component to >>>>>>>>>>>>>>>>>>>>>>>>>> make sure the context is always >>>>>>>> correct. >>>>>>>>>>>>>>>>>>>>>>>>>> In the face of that situation, it >>>>>>>> seemed >>>>>>>>>> better >>>>>>>>>>>>> to just move >>>>>>>>>>>>>>>>>>>>>>>>>> in the direction of a "normal" data >>>>>>>> store. >>>>>>>>>> I.e., >>>>>>>>>>>>> when you >>>>>>>>>>>>>>>>>>>>>>>>>> use a HashMap or RocksDB or other >>>>>>>> "state >>>>>>>>>>>>> stores", you don't >>>>>>>>>>>>>>>>>>>>>>>>>> expect them to automatically know >>>>>>>> extra >>>>>>>>>> stuff >>>>>>>>>>>>> about the >>>>>>>>>>>>>>>>>>>>>>>>>> record you're storing. If you need >>>>>>>> them to >>>>>>>>>> know >>>>>>>>>>>>> something, >>>>>>>>>>>>>>>>>>>>>>>>>> you just put it in the value. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> All of that said, I'm just reasoning >>>>>>>> from >>>>>>>>>> first >>>>>>>>>>>>> principles >>>>>>>>>>>>>>>>>>>>>>>>>> here. To really know if this is a >>>>>>>> mistake >>>>>>>>>> or >>>>>>>>>>>>> not, I need to >>>>>>>>>>>>>>>>>>>>>>>>>> be in your place. So please push >>>>>> back >>>>>>>> if >>>>>>>>>> you >>>>>>>>>>>>> think what I >>>>>>>>>>>>>>>>>>>>>>>>>> said is nonsense. My personal plan >>>>>>>> was to >>>>>>>>>> keep >>>>>>>>>>>>> an eye out >>>>>>>>>>>>>>>>>>>>>>>>>> during the period where the old API >>>>>>>> was >>>>>>>>>> still >>>>>>>>>>>>> present, but >>>>>>>>>>>>>>>>>>>>>>>>>> deprecated, to see if people were >>>>>>>>>> struggling to >>>>>>>>>>>>> use the new >>>>>>>>>>>>>>>>>>>>>>>>>> API. If so, then we'd have a chance >>>>>> to >>>>>>>>>> address >>>>>>>>>>>>> it before >>>>>>>>>>>>>>>>>>>>>>>>>> dropping the old API. But it's even >>>>>>>> better >>>>>>>>>> if >>>>>>>>>>>>> you can help >>>>>>>>>>>>>>>>>>>>>>>>>> think it through now. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> It did also cross my mind to _not_ >>>>>>>> add the >>>>>>>>>>>>>>>>>>>>>>>>>> StateStoreContext, but just to >>>>>>>> continue to >>>>>>>>>> punt >>>>>>>>>>>>> on the >>>>>>>>>>>>>>>>>>>>>>>>>> question by just dropping in the new >>>>>>>>>>>>> ProcessorContext to the >>>>>>>>>>>>>>>>>>>>>>>>>> new init method. If >>>>>> StateStoreContext >>>>>>>>>> seems too >>>>>>>>>>>>> bold, we can >>>>>>>>>>>>>>>>>>>>>>>>>> go that direction. But if we >>>>>> actually >>>>>>>> add >>>>>>>>>> some >>>>>>>>>>>>> methods to >>>>>>>>>>>>>>>>>>>>>>>>>> StateStoreContext, I'd like to be >>>>>>>> able to >>>>>>>>>> ensure >>>>>>>>>>>>> they would >>>>>>>>>>>>>>>>>>>>>>>>>> be well defined. I think the current >>>>>>>>>> situation >>>>>>>>>>>>> was more of >>>>>>>>>>>>>>>>>>>>>>>>>> an oversight than a choice. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks again for your reply, >>>>>>>>>>>>>>>>>>>>>>>>>> -John >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, 2020-09-09 at 21:23 -0500, >>>>>>>> Paul >>>>>>>>>> Whalen >>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>> John, >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> It's exciting to see this KIP head >>>>>>>> in >>>>>>>>>> this >>>>>>>>>>>>> direction! In >>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>> last >>>>>>>>>>>>>>>>>>>>>>>> year >>>>>>>>>>>>>>>>>>>>>>>>>> or >>>>>>>>>>>>>>>>>>>>>>>>>>> so I've tried to sketch out some >>>>>>>>>> usability >>>>>>>>>>>>> improvements for >>>>>>>>>>>>>>>>>>>>>> custom >>>>>>>>>>>>>>>>>>>>>>>> state >>>>>>>>>>>>>>>>>>>>>>>>>>> stores, and I also ended up >>>>>>>> splitting >>>>>>>>>> out the >>>>>>>>>>>>>>>>>>>> StateStoreContext >>>>>>>>>>>>>>>>>>>>>> from >>>>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>> ProcessorContext in an attempt to >>>>>>>>>> facilitate >>>>>>>>>>>>> what I was >>>>>>>>>>>>>>>>>>>> doing. I >>>>>>>>>>>>>>>>>>>>>>>> sort of >>>>>>>>>>>>>>>>>>>>>>>>>>> abandoned it when I realized how >>>>>>>> large >>>>>>>>>> the >>>>>>>>>>>>> ideal change >>>>>>>>>>>>>>>>>>> might >>>>>>>>>>>>>>>>>>>>>> have >>>>>>>>>>>>>>>>>>>>>>>> to be, >>>>>>>>>>>>>>>>>>>>>>>>>>> but it's great to see that there >>>>>> is >>>>>>>> other >>>>>>>>>>>>> interest in >>>>>>>>>>>>>>>>>>> moving >>>>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>>>>>> direction (from the folks that >>>>>>>> matter :) >>>>>>>>>> ). >>>>>>>>>>>>>>>>>>>>>>>>>>> Having taken a stab at it myself, >>>>>> I >>>>>>>> have >>>>>>>>>> a >>>>>>>>>>>>> comment/question >>>>>>>>>>>>>>>>>>>> on >>>>>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>>>>> bullet >>>>>>>>>>>>>>>>>>>>>>>>>>> about StateStoreContext: >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> It does *not* include anything >>>>>>>>>> processor- or >>>>>>>>>>>>> record- >>>>>>>>>>>>>>>>>>>> specific, >>>>>>>>>>>>>>>>>>>>>> like >>>>>>>>>>>>>>>>>>>>>>>>>>>> `forward()` or any information >>>>>>>> about >>>>>>>>>> the >>>>>>>>>>>>> "current" >>>>>>>>>>>>>>>>>>> record, >>>>>>>>>>>>>>>>>>>>>> which is >>>>>>>>>>>>>>>>>>>>>>>>>> only a >>>>>>>>>>>>>>>>>>>>>>>>>>>> well-defined in the context of >>>>>> the >>>>>>>>>>>>> Processor. Processors >>>>>>>>>>>>>>>>>>>>>> process >>>>>>>>>>>>>>>>>>>>>>>> one >>>>>>>>>>>>>>>>>>>>>>>>>> record >>>>>>>>>>>>>>>>>>>>>>>>>>>> at a time, but state stores may >>>>>> be >>>>>>>>>> used to >>>>>>>>>>>>> store and >>>>>>>>>>>>>>>>>>> fetch >>>>>>>>>>>>>>>>>>>> many >>>>>>>>>>>>>>>>>>>>>>>>>> records, so >>>>>>>>>>>>>>>>>>>>>>>>>>>> there is no "current record". >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> I totally agree that >>>>>>>> record-specific or >>>>>>>>>>>>> processor-specific >>>>>>>>>>>>>>>>>>>>>> context >>>>>>>>>>>>>>>>>>>>>>>> in a >>>>>>>>>>>>>>>>>>>>>>>>>>> state store is often not >>>>>>>> well-defined >>>>>>>>>> and it >>>>>>>>>>>>> would be good >>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>>>> separate >>>>>>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>>>> out, but sometimes it (at least >>>>>>>>>>>>> record-specific context) is >>>>>>>>>>>>>>>>>>>>>> actually >>>>>>>>>>>>>>>>>>>>>>>>>>> useful, for example, passing the >>>>>>>> record's >>>>>>>>>>>>> timestamp through >>>>>>>>>>>>>>>>>>>> to >>>>>>>>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>>>>>>>>>>>>> underlying storage (or changelog >>>>>>>> topic): >>>>>> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java#L121 >>>>>>>>>>>>>>>>>>>>>>>>>>> You could have the writer client >>>>>> of >>>>>>>> the >>>>>>>>>> state >>>>>>>>>>>>> store pass >>>>>>>>>>>>>>>>>>> this >>>>>>>>>>>>>>>>>>>>>>>> through, >>>>>>>>>>>>>>>>>>>>>>>>>> but >>>>>>>>>>>>>>>>>>>>>>>>>>> it would be nice to be able to >>>>>> write >>>>>>>>>> state >>>>>>>>>>>>> stores where the >>>>>>>>>>>>>>>>>>>>>> client >>>>>>>>>>>>>>>>>>>>>>>> did >>>>>>>>>>>>>>>>>>>>>>>>>> not >>>>>>>>>>>>>>>>>>>>>>>>>>> have this responsibility. I'm not >>>>>>>> sure >>>>>>>>>> if the >>>>>>>>>>>>> solution is >>>>>>>>>>>>>>>>>>>> to add >>>>>>>>>>>>>>>>>>>>>>>> some >>>>>>>>>>>>>>>>>>>>>>>>>>> things back to StateStoreContext, >>>>>> or >>>>>>>>>> make yet >>>>>>>>>>>>> another >>>>>>>>>>>>>>>>>>> context >>>>>>>>>>>>>>>>>>>>>> that >>>>>>>>>>>>>>>>>>>>>>>>>>> represents record-specific context >>>>>>>> while >>>>>>>>>>>>> inside a state >>>>>>>>>>>>>>>>>>>> store. >>>>>>>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>>>>>>> Paul >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Sep 9, 2020 at 5:43 PM >>>>>> John >>>>>>>>>> Roesler < >>>>>>>>>>>>>>>>>>>> j...@vvcephei.org> >>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello all, >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> I've been slowly pushing KIP-478 >>>>>>>>>> forward >>>>>>>>>>>>> over the last >>>>>>>>>>>>>>>>>>>> year, >>>>>>>>>>>>>>>>>>>>>>>>>>>> and I'm happy to say that we're >>>>>>>> making >>>>>>>>>> good >>>>>>>>>>>>> progress now. >>>>>>>>>>>>>>>>>>>>>>>>>>>> However, several issues with the >>>>>>>>>> original >>>>>>>>>>>>> design have >>>>>>>>>>>>>>>>>>> come >>>>>>>>>>>>>>>>>>>>>>>>>>>> to light. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> The major changes: >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> We discovered that the original >>>>>>>> plan >>>>>>>>>> of just >>>>>>>>>>>>> adding >>>>>>>>>>>>>>>>>>> generic >>>>>>>>>>>>>>>>>>>>>>>>>>>> parameters to ProcessorContext >>>>>>>> was too >>>>>>>>>>>>> disruptive, so we >>>>>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>>>>>>>>>> now adding a new >>>>>>>> api.ProcessorContext. >>>>>>>>>>>>>>>>>>>>>>>>>>>> That choice forces us to add a >>>>>> new >>>>>>>>>>>>> StateStore.init method >>>>>>>>>>>>>>>>>>>>>>>>>>>> for the new context, but >>>>>>>>>> ProcessorContext >>>>>>>>>>>>> really isn't >>>>>>>>>>>>>>>>>>>> ideal >>>>>>>>>>>>>>>>>>>>>>>>>>>> for state stores to begin with, >>>>>>>> so I'm >>>>>>>>>>>>> proposing a new >>>>>>>>>>>>>>>>>>>>>>>>>>>> StateStoreContext for this >>>>>>>> purpose. In >>>>>>>>>> a >>>>>>>>>>>>> nutshell, there >>>>>>>>>>>>>>>>>>>> are >>>>>>>>>>>>>>>>>>>>>>>>>>>> quite a few methods in >>>>>>>>>> ProcessorContext that >>>>>>>>>>>>> actually >>>>>>>>>>>>>>>>>>>> should >>>>>>>>>>>>>>>>>>>>>>>>>>>> never be called from inside a >>>>>>>>>> StateStore. >>>>>>>>>>>>>>>>>>>>>>>>>>>> Also, since there is a new >>>>>>>>>> ProcessorContext >>>>>>>>>>>>> interface, we >>>>>>>>>>>>>>>>>>>>>>>>>>>> need a new MockProcessorContext >>>>>>>>>>>>> implementation in the >>>>>>>>>>>>>>>>>>> test- >>>>>>>>>>>>>>>>>>>>>>>>>>>> utils module. >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> The changeset for the KIP >>>>>>>> document is >>>>>>>>>> here: >>>>>> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=118172121&selectedPageVersions=14&selectedPageVersions=10 >>>>>>>>>>>>>>>>>>>>>>>>>>>> And the KIP itself is here: >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-478+-+Strongly+typed+Processor+API >>>>>>>>>>>>>>>>>>>>>>>>>>>> If you have any concerns, please >>>>>>>> let >>>>>>>>>> me know! >>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>>>>>>>>> -John >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >