Thanks, Nick, It sounds like we're on the same page. I didn't think (A) would be fundamentally "hard", just that it might be a pain in practice. Reading your response, if you're up for it, it sounds like a prototype of (A) would be the tie-breaker between the two approaches.
To be honest, I've been burned enough times that I tend to prototype my KIPs more often than not anyway. Thanks, -John On Mon, 2022-02-07 at 11:42 +0000, Nick Telford wrote: > Hi everyone, > > Guozhang, the scope of my KIP is specifically about deploying structural > changes to existing applications, i.e. "upgrades". Sharing state between > different applications was not in the scope of my original proposal. > > John's email has it exactly right, and I think this points to my KIP not > explaining the problem correctly. Any suggestions on how I could better > clarify the intent of my proposal in the KIP? > > John, regarding your comments: > > A) being difficult to clean up state after migrations. Unless I've missed > something, this shouldn't be a problem. Tasks are already internally aware > of which stores they own from the Topology structure, irrespective of where > on-disk the StateStore data is, they should be able to find it. I think the > only real issue with this approach is that it will require changing, most > likely, quite a bit of code. We'll need to separate the concept of "state > directory" from "task directory", (which will still be needed to store Task > meta-data, like .lock files). At the very least, I think significant > changes may need to be made to StateDirectory and StateManager, but I > haven't investigated in detail. Perhaps it would make sense to first > explore this approach with a prototype to see how invasive it would become? > > B) My intent was always that this process would occur between > KafkaStreams.start() and threads actually starting, so that the migration > would occur safely. I'm not sure what kind of unexpected structural changes > could be detected by such a process; it might just be useful for general > validation. The main reasons I prefer (A) is that: 1) (B) requires > additional state meta-data, whereas (A) does not, which is an increase in > system complexity and; 2) I believe that (A) actually addresses a semantic > bug: specifically that StateStores are tightly coupled to Tasks, which is > unnecessary. Reducing this coupling would add no complexity, and > potentially simplify other processes in the future. > > Regards, > > Nick > > On Sat, 5 Feb 2022 at 17:19, John Roesler <vvcep...@apache.org> wrote: > > > Hello all, > > > > Thanks for the KIP, Nick! > > > > Based on this conversation, I think I might have misread the > > KIP, but it looks like Nick is just proposing a small fix to > > the existing compatability mechanism. > > > > Although we tell people to avoid changing topologies on the > > fly in general, we also tell them that, if they name all the > > persistent resources (stores and repartition nodes), then > > they can change the topologies without breaking anything > > (provided the change itself is logically sound). > > > > It seems like this KIP is just pointing out a flaw in that > > mechanism, that the (named) stores are kept inside the task > > directories, so if some change renumbers the tasks, Streams > > won't be able to find the local store files anymore. IIUC, > > the changelog topic will still be fine, so Streams would > > just allocate a new state directory in the new task name and > > restore the changelog into it. > > > > So, I think all this KIP is after is a way to preserve the > > local state files of a named store in the face of task > > renumbering. That's not to say that there's not some overlap > > with the NamedTopologies work, or that there's no value in > > being able to automatically reuse unnamed stores. But it > > probably makes sense to let Nick fix this one specific > > problem instead of coupling it to other large-scale > > engineering projects. > > > > Regarding the KIP itself: > > > > (A) is quite clean, but it does make it more challenging to > > clean up state when tasks migrate to other nodes. If that's > > the only problem, then I agree this is probably the best > > solution. > > > > (B) also makes a lot of sense to me, and I actually don't > > think it's a hack. It might also be useful for detecting > > when a topology has changed unexpectedly, for example. On > > the other hand, to safely move a state directory from one > > task directory to the other, we have to be sure no other > > thread is using either directory. To do that, we could > > either perform the operation in `KafkaStreams.start()` > > before any threads are started (we already know the topology > > at this point), or we can try to grab the directory locks on > > both tasks (but that sounds like a recipe for deadlock). > > > > In a nutshell, I'm supportive of this KIP, and I'd sugest we > > do a little more discovery on the implications of dropping > > the task level of the directory hierarchy before committing > > to A. And/or be a little more specific about how we can > > safely move state directories around before committing to B. > > > > Thanks again! > > -John > > > > On Fri, 2022-02-04 at 11:09 -0800, Guozhang Wang wrote: > > > Hi folks, > > > > > > I think the NamedTopology work would help with the convenience of the > > > solution for this KIP, but I feel it is not by itself the solution here. > > If > > > I'm not mistaken, the scope of this KIP is trying to tackle that, > > *assuming > > > the developer already knows* a new topology or part of the topology e.g. > > > like a state store of the topology does not change, then how to > > effectively > > > be able to reuse that part of the topology. Today it is very hard to > > reuse > > > part (say a state store, an internal topic) of a previous topology's > > > persistent state because: > > > > > > 1) the names of those persistent states are prefixed by the application > > id. > > > 2) the names of those persistent states are suffixed by the index, which > > > reflects the structure of the topology. > > > 3) the dir path of the persistent states are "prefixed" by the task id, > > > which is hence dependent on the sub-topology id. > > > > > > My quick thoughts are that 1) is easy to go around as long as users reuse > > > the same appId, 3) can be tackled with the help of the named topology but > > > each named topology can still be composed of multiple sub-topologies so > > > extra work is still needed to align the sub-topology ids, but we still > > need > > > something to tackle 2) here, which I was pondering between those options > > > and at the moment leaning towards option 2). > > > > > > Does that make sense to you? > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Feb 4, 2022 at 4:38 AM Nick Telford <nick.telf...@gmail.com> > > wrote: > > > > > > > Hi Guozhang, Sophie, > > > > > > > > Thanks for both taking the time to review my proposal. > > > > > > > > I did actually see the NamedTopology classes, and noted that they were > > > > internal. I didn't realise they are part of an intended solution to > > this > > > > problem, that's very interesting. I'm going to try to find some time to > > > > take a look at your experimental work so I can understand it a bit > > better. > > > > > > > > From your description, it sounds like the NamedTopology approach should > > > > enable users to solve this problem at the level that they wish to. My > > > > concern is that users will need to be explicit about how their > > Topology is > > > > structured, and will need to know in advance how their Topologies might > > > > evolve in the future in order to correctly break them up by name. For > > > > example, if a user mistakenly assumes one particular structure for > > their > > > > application, but later makes changes that implicitly cause an existing > > > > NamedTopology to have its internal Subtopologies re-ordered, the user > > will > > > > need to clear all the local state for that NamedTopology, at least. > > > > > > > > Unless I'm mistaken, StateStores are defined exclusively by the data in > > > > their changelogs. Even if you make changes to a Topology that requires > > > > clearing locally materialized state, the changelogs aren't reset[1], > > so the > > > > newly rebuilt state is materialized from the pre-existing values. Even > > if > > > > changes are made to the Subtopology that writes to the StateStore, the > > > > existing data in the changelog hasn't changed. The contents of the > > > > StateStore evolves. This is exactly the same as a traditional database > > > > table, where a client may evolve its behaviour to subtly change the > > > > semantics of the data written to the table, without deleting the > > existing > > > > data. > > > > > > > > If a user makes a change that means a different Subtopology reads from > > the > > > > StateStore, the semantics of, and the data in the store itself hasn't > > > > actually changed. The only reason we need to reset this local state at > > all > > > > is due to the conflict on-disk caused by the change in Subtopology > > ordinal. > > > > If local StateStore data was decoupled from Tasks, this conflict would > > > > disappear, and the application would work as expected. > > > > > > > > A Subtopology is defined by all connected topics, including changelogs, > > > > repartition topics, source topics and sink topics. Whereas a > > StateStore is > > > > defined exclusively by its changelog. So why do we tightly couple > > > > StateStore to Subtopology? This is my central argument for option A > > that I > > > > outlined in the KIP, and I would like to discuss it further, even if > > only > > > > to educate myself on why it's not possible :-D > > > > > > > > I still think the NamedTopology work is valuable, but more as a means > > to > > > > better organize large applications. > > > > > > > > Regards, > > > > Nick > > > > > > > > 1: The only exception to this I can think of is when a user decides to > > > > change the format (Serdes) or semantics of the data in the store, in > > which > > > > case they would need to do a full reset by also clearing the changelog > > > > topic for that store. Realistically, users that wish to do this would > > be > > > > better off just creating a new store and deleting the old one, so I > > don't > > > > think it's a case worth optimizing for. > > > > > > > > On Fri, 4 Feb 2022 at 08:22, Sophie Blee-Goldman > > > > <sop...@confluent.io.invalid> wrote: > > > > > > > > > Hey Nick, > > > > > > > > > > thanks for the KIP, this is definitely a much-needed feature. I've > > > > actually > > > > > been working on > > > > > a somewhat similar feature for a while now and have a good chunk of > > the > > > > > implementation > > > > > completed -- but so far it's only exposed via internal APIs and > > hasn't > > > > been > > > > > brought to a KIP > > > > > yet, as it's a fairly large and complex project and I wanted to get > > all > > > > the > > > > > details hashed out > > > > > before settling on a public API. > > > > > > > > > > For some sense of how complicated it's been, you can check out the > > JIRA > > > > > ticket we've been > > > > > filing PRs under -- there are already 25 PRs to the feature. See > > > > > KAFKA-12648 > > > > > <https://issues.apache.org/jira/browse/KAFKA-12648>. You can check > > > > > out the new KafkaStreamsNamedTopologyWrapper class to see what the > > > > current > > > > > API looks like > > > > > -- I recommend taking a look to see if this might cover some or all > > of > > > > the > > > > > things you wanted > > > > > this KIP to do. > > > > > > > > > > For a high-level sketch, my work introduces the concept of a > > > > > "NamedTopology" (which will be > > > > > renamed to "ModularTopology" in the future, but is still referred to > > as > > > > > "named" in the codebase > > > > > so I'll keep using it for now) . Each KafkaStreams app can execute > > > > multiple > > > > > named topologies, > > > > > which are just regular topologies that are given a unique name. The > > > > > essential feature of a > > > > > named topology is that it can be dynamically added or removed without > > > > even > > > > > stopping the > > > > > application, much less resetting it. Technically a NamedTopology can > > be > > > > > composed or one > > > > > or more subtopologies, but if you want to be able to update the > > > > application > > > > > at a subtopology > > > > > level you can just name each subtopology. > > > > > > > > > > So I believe the feature you want is actually already implemented, > > for > > > > the > > > > > most part -- it's currently > > > > > missing a few things that I just didn't bother to implement yet since > > > > I've > > > > > been focused > > > > > on getting a working, minimal POC that I could use for testing. (For > > > > > example it doesn't yet > > > > > support global state stores) But beyond that, the only remaining > > work to > > > > > make this feature > > > > > available is to settle on the APIs, get a KIP passed, and implement > > said > > > > > APIs. > > > > > > > > > > Would you be interested in helping out with the NamedTopology work > > so we > > > > > can turn it into a > > > > > a full-fledged public feature? I'm happy to let you take the lead on > > the > > > > > KIP, maybe by adapting > > > > > this one if you think it makes sense to do so. The NamedTopology > > feature > > > > is > > > > > somewhat larger > > > > > in scope than strictly necessary for your purposes, however, so you > > could > > > > > take on just a part > > > > > of it and leave anything beyond that for me to do as followup. > > > > > > > > > > By the way: one advantage of the NamedTopology feature is that we > > don't > > > > > have to worry about > > > > > any compatibility issues or upgrade/migration path -- it's opt-in by > > > > > definition. (Of course we would > > > > > recommend using it to all users, like we do with named operators) > > > > > > > > > > Let me know what you think and how you want to proceed from here -- I > > > > > wouldn't want you to > > > > > spend time re-implementing more or less the same thing, but I most > > likely > > > > > wasn't going to find time > > > > > to put out a KIP for the NamedTopology feature in the near future. > > If you > > > > > would be able to help > > > > > drive this to completion, we'd each have significantly less work to > > do to > > > > > achieve our goals :) > > > > > > > > > > Cheers, > > > > > Sophie > > > > > > > > > > > > > > > On Thu, Feb 3, 2022 at 6:12 PM Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > > > > > > > Hello Nick, > > > > > > > > > > > > Thanks for bringing this up and for the proposed options. I read > > though > > > > > > your writeup and here are some of my thoughts: > > > > > > > > > > > > 1) When changing the topology of Kafka Streams, the developer need > > to > > > > > first > > > > > > decide if the whole topology's persisted state (including both the > > > > state > > > > > > store as well as its changelogs, and the repartition topics, and > > the > > > > > > source/sink external topics) or part of the persisted state can be > > > > > reused. > > > > > > This involves two types of changes: > > > > > > > > > > > > a) structural change of the topology, such like a new processor > > node is > > > > > > added/removed, a new intermediate topic is added/removed etc. > > > > > > b) semantic change of a processor, such as a numerical filter node > > > > > changing > > > > > > its filter threshold etc. > > > > > > > > > > > > Today both of them are more or less determined by developers > > manually. > > > > > > However, though automatically determining on changes of type b) is > > hard > > > > > if > > > > > > not possible, automatic determining on the type of a) is doable > > since > > > > > it's > > > > > > depend on just the information of: > > > > > > * number of sub-topologies, and their orders (i.e. sequence of ids) > > > > > > * used state stores and changelog topics within the sub-topology > > > > > > * used repartition topics > > > > > > * etc > > > > > > > > > > > > So let's assume in the long run we can indeed automatically > > determine > > > > if > > > > > a > > > > > > topology or part of it (a sub-topology) is structurally the same, > > what > > > > we > > > > > > can do is to "translate" the old persisted state names to the > > > > > > new, isomorphic topology's names. Following this thought I'm > > leaning > > > > > > towards the direction of option B in your proposal. But since in > > this > > > > KIP > > > > > > automatic determining structural changes are out of the scope, I > > feel > > > > we > > > > > > can consider adding some sort of a "migration tool" from an old > > > > topology > > > > > to > > > > > > new topology by renaming all the persisted states (store dirs and > > > > names, > > > > > > topic names). > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > On Tue, Jan 25, 2022 at 9:10 AM Nick Telford < > > nick.telf...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > I'd like to start a discussion on Kafka Streams KIP-816 ( > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-816%3A+Topology+changes+without+local+state+reset > > > > > > > ) > > > > > > > > > > > > > > This KIP outlines 3 possible solutions to the problem, and I > > plan to > > > > > > > whittle this down to a definitive solution based on this > > discussion. > > > > > > > > > > > > > > Of the 3 proposed solutions: > > > > > > > * 'A' is probably the "correct" solution, but is also quite a > > > > > significant > > > > > > > change. > > > > > > > * 'B' is the least invasive, but most "hacky" solution. > > > > > > > * 'C' requires a change to the wire protocol and will likely have > > > > > > > unintended consequences. C is also the least complete solution, > > and > > > > > will > > > > > > > need significant additional work to make it work. > > > > > > > > > > > > > > Please let me know if the Motivation and Background sections need > > > > more > > > > > > > clarity. > > > > > > > > > > > > > > Regards, > > > > > > > > > > > > > > Nick Telford > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > >