Hi everyone, I'm going to resurrect this KIP, because I would like the community to benefit from our solution.
In the end, we internally solved this problem using Option B: automatically moving state directories to the correct location whenever they're no longer aligned with the Topology. We implemented this for ourselves externally to Kafka Streams, by using Topology#describe() to analyse the Topology, and then moving state directories before calling KafkaStreams#start(). I've updated/re-written the KIP to focus on this solution, albeit properly integrated into Kafka Streams. Let me know what you think, Nick On Tue, 15 Feb 2022 at 16:23, Nick Telford <nick.telf...@gmail.com> wrote: > In the KIP, for Option A I suggested a new path of: > > /state/dir/stores/<store name>/<partition number> > > I made the mistake of thinking that the rocksdb/ segment goes *after* the > store name in the current scheme, e.g. > > /state/dir/<task id>/<store name>[/rocksdb] > > This is a mistake. I'd always intended for a combination of the store name > and partition number to be encoded in the new path (instead of the store > name and task ID, that we have now). The exact encoding doesn't really > bother me too much, so if you have any conventions you think we should > follow here (hyphenated vs. underscored vs. directory separator, etc.) > please let me know. > > I should be able to find some time hopefully next week to start working on > this, which should shed some more light on issues that might arise. > > In the meantime I'll correct the KIP to include the rocksdb segment. > > Thanks everyone for your input so far! > > Nick > > On Mon, 14 Feb 2022 at 22:02, Guozhang Wang <wangg...@gmail.com> wrote: > >> Thanks for the clarification John! >> >> Nick, sorry that I was not super clear in my latest email. I meant exactly >> what John said. >> >> Just to clarify, I do think that this KIP is relatively orthogonal to the >> named topology work; as long as we still keep the topo name encoded it >> should be fine since two named topologies can indeed have the same store >> name, but that would not need to be considered as part of this KIP. >> >> >> Guozhang >> >> On Mon, Feb 14, 2022 at 9:02 AM John Roesler <vvcep...@apache.org> wrote: >> >> > Hi Nick, >> > >> > When Guozgang and I were chatting, we realized that it’s not completely >> > sufficient just to move the state store directories, because their names >> > are not unique. In particular, more than one partition of the store may >> be >> > assigned to the same instance. Right now, this is handled because the >> task >> > is encoded the partition number. >> > >> > For example, if we have a store "mystore" in subtopology 1 and we have >> two >> > out of four partitions (0 and 3) assigned to the local node, the disk >> will >> > have these paths: >> > >> > {app_id}/1_0/rocksdb/mystore >> > {app_id}/1_3/rocksdb/mystore >> > >> > Clearly, we can't just elevate both "mystore" directories to reside >> under >> > {appid}, because >> > they have the same name. When I think of option (A), here's what I >> picture: >> > >> > {app_id}/rocksdb/mystore-0 >> > {app_id}/rocksdb/mystore-3 >> > >> > In the future, one thing we're considering to do is actually store all >> the >> > positions in the same rocksDB database, which is a pretty convenient >> step >> > away from option (A) (another reason to prefer it to option (B) ). >> > >> > I just took a look at how named topologies are handled, and they're >> > actually >> > a separate path segment, not part of the task id, like this: >> > >> > {app_id}/__{topo_name}__/1_0/rocksdb/mystore >> > {app_id}/__{topo_name}__/1_3/rocksdb/mystore >> > >> > Which is pretty convenient because it means there are no >> > implications for your proposal. If you implement the above >> > code, then we'll just wind up with: >> > >> > {app_id}/__{topo_name}__/rocksdb/mystore-0 >> > {app_id}/__{topo_name}__/rocksdb/mystore-3 >> > >> > Does that make sense? >> > >> > Thanks, >> > -John >> > >> > >> > On Mon, Feb 14, 2022, at 03:57, Nick Telford wrote: >> > > Hi Guozhang, >> > > >> > > Sorry I haven't had the time to respond to your earlier email, but I >> just >> > > wanted to clarify something with respect to your most recent email. >> > > >> > > My original plan in option A is to remove the entire Task ID from the >> > State >> > > Store path, which would insulate it from any changes to the Task ID >> > format >> > > introduced by Named Topologies or anything else. This would in fact >> > > consolidate the store for the instance, rather than by-Task (which I >> > think >> > > is what you meant by "one physical store per state"?). >> > > >> > > I did highlight in option C the possibility of changing the format of >> the >> > > Task ID to change the sub-topology ID from an ordinal to a stable >> > > identifier. Although I'm not convinced that this option is viable, or >> > even >> > > desirable. >> > > >> > > Regards, >> > > >> > > Nick >> > > >> > > On Sat, 12 Feb 2022 at 00:36, Guozhang Wang <wangg...@gmail.com> >> wrote: >> > > >> > > > Just to follow-up on this thread, I had another chat with John >> > regarding >> > > > option a) and I think the key thought is that, today the task-id is >> in >> > the >> > > > form of [sub-topologyID]-[partitionID] --- and in the future with >> > > > named-topology it could be extended to three digits as >> > > > [named-topologyID]-[sub-topologyID]-[partitionID] --- and for the >> > purpose >> > > > of this KIP's option A), we actually just want to remove the >> > > > [sub-topologyID] from the taskID as part of the file path hierarchy, >> > right? >> > > > >> > > > If yes, given that in the future we want: >> > > > >> > > > * allow topology evolution with compatibility validations. >> > > > * consolidating persistent state stores so that we do not have one >> > physical >> > > > store per state, but potentially one store for the whole instance. >> > > > >> > > > No matter if we want to provide certain tooling for mapping the >> > persistent >> > > > state path / names as in option B), pursuing some solutions in the >> > > > direction of option A) to be independent of the sub-topologyID since >> > state >> > > > store names within a topology should be sufficiently unique would >> make >> > a >> > > > lot of sense. >> > > > >> > > > >> > > > On Mon, Feb 7, 2022 at 3:52 PM Guozhang Wang <wangg...@gmail.com> >> > wrote: >> > > > >> > > > > Hello Nick, >> > > > > >> > > > > I think I'm on the same page of the scope of your KIP, and what I >> was >> > > > > trying to get is that, there are some other efforts going on in >> > parallel >> > > > > that tries to identify if two topologies, or some part of them, >> are >> > > > > isomorphic in structure, and hence their corresponding persistent >> > states >> > > > > may be reusable. That's why I was saying that "assume, we know >> which >> > > > > persistent states can be reusable". i.e. let's say we know that >> the >> > new >> > > > > topology's sub-topology 1's state store A-0005 is the same as the >> old >> > > > > topology's sub-topology 0' state store A-0004, then what we can do >> > to let >> > > > > the new topology state store to be loaded as the old state store. >> > With >> > > > that >> > > > > in my mind originally, I said maybe option B) is sufficient to >> > rename the >> > > > > dir path / state store names before we start the new app's >> topology. >> > But >> > > > > that's just one aspect of it and we do not necessarily need to >> > follow :) >> > > > If >> > > > > you are up to do a prototype following option A) with a good >> upgrade >> > > > path, >> > > > > it would be a great solution too. >> > > > > >> > > > > >> > > > > On Mon, Feb 7, 2022 at 8:59 AM John Roesler <vvcep...@apache.org> >> > wrote: >> > > > > >> > > > > > Thanks, Nick, >> > > > > > >> > > > > > It sounds like we're on the same page. I didn't think (A) >> > > > > > would be fundamentally "hard", just that it might be a pain >> > > > > > in practice. Reading your response, if you're up for it, it >> > > > > > sounds like a prototype of (A) would be the tie-breaker >> > > > > > between the two approaches. >> > > > > > >> > > > > > To be honest, I've been burned enough times that I tend to >> > > > > > prototype my KIPs more often than not anyway. >> > > > > > >> > > > > > Thanks, >> > > > > > -John >> > > > > > >> > > > > > On Mon, 2022-02-07 at 11:42 +0000, Nick Telford wrote: >> > > > > > > Hi everyone, >> > > > > > > >> > > > > > > Guozhang, the scope of my KIP is specifically about deploying >> > > > structural >> > > > > > > changes to existing applications, i.e. "upgrades". Sharing >> state >> > > > between >> > > > > > > different applications was not in the scope of my original >> > proposal. >> > > > > > > >> > > > > > > John's email has it exactly right, and I think this points to >> my >> > KIP >> > > > not >> > > > > > > explaining the problem correctly. Any suggestions on how I >> could >> > > > better >> > > > > > > clarify the intent of my proposal in the KIP? >> > > > > > > >> > > > > > > John, regarding your comments: >> > > > > > > >> > > > > > > A) being difficult to clean up state after migrations. Unless >> > I've >> > > > > > missed >> > > > > > > something, this shouldn't be a problem. Tasks are already >> > internally >> > > > > > aware >> > > > > > > of which stores they own from the Topology structure, >> > irrespective of >> > > > > > where >> > > > > > > on-disk the StateStore data is, they should be able to find >> it. I >> > > > think >> > > > > > the >> > > > > > > only real issue with this approach is that it will require >> > changing, >> > > > > > most >> > > > > > > likely, quite a bit of code. We'll need to separate the >> concept >> > of >> > > > > > "state >> > > > > > > directory" from "task directory", (which will still be needed >> to >> > store >> > > > > > Task >> > > > > > > meta-data, like .lock files). At the very least, I think >> > significant >> > > > > > > changes may need to be made to StateDirectory and >> StateManager, >> > but I >> > > > > > > haven't investigated in detail. Perhaps it would make sense to >> > first >> > > > > > > explore this approach with a prototype to see how invasive it >> > would >> > > > > > become? >> > > > > > > >> > > > > > > B) My intent was always that this process would occur between >> > > > > > > KafkaStreams.start() and threads actually starting, so that >> the >> > > > > > migration >> > > > > > > would occur safely. I'm not sure what kind of unexpected >> > structural >> > > > > > changes >> > > > > > > could be detected by such a process; it might just be useful >> for >> > > > general >> > > > > > > validation. The main reasons I prefer (A) is that: 1) (B) >> > requires >> > > > > > > additional state meta-data, whereas (A) does not, which is an >> > increase >> > > > > > in >> > > > > > > system complexity and; 2) I believe that (A) actually >> addresses a >> > > > > > semantic >> > > > > > > bug: specifically that StateStores are tightly coupled to >> Tasks, >> > which >> > > > > > is >> > > > > > > unnecessary. Reducing this coupling would add no complexity, >> and >> > > > > > > potentially simplify other processes in the future. >> > > > > > > >> > > > > > > Regards, >> > > > > > > >> > > > > > > Nick >> > > > > > > >> > > > > > > On Sat, 5 Feb 2022 at 17:19, John Roesler < >> vvcep...@apache.org> >> > > > wrote: >> > > > > > > >> > > > > > > > Hello all, >> > > > > > > > >> > > > > > > > Thanks for the KIP, Nick! >> > > > > > > > >> > > > > > > > Based on this conversation, I think I might have misread the >> > > > > > > > KIP, but it looks like Nick is just proposing a small fix to >> > > > > > > > the existing compatability mechanism. >> > > > > > > > >> > > > > > > > Although we tell people to avoid changing topologies on the >> > > > > > > > fly in general, we also tell them that, if they name all the >> > > > > > > > persistent resources (stores and repartition nodes), then >> > > > > > > > they can change the topologies without breaking anything >> > > > > > > > (provided the change itself is logically sound). >> > > > > > > > >> > > > > > > > It seems like this KIP is just pointing out a flaw in that >> > > > > > > > mechanism, that the (named) stores are kept inside the task >> > > > > > > > directories, so if some change renumbers the tasks, Streams >> > > > > > > > won't be able to find the local store files anymore. IIUC, >> > > > > > > > the changelog topic will still be fine, so Streams would >> > > > > > > > just allocate a new state directory in the new task name and >> > > > > > > > restore the changelog into it. >> > > > > > > > >> > > > > > > > So, I think all this KIP is after is a way to preserve the >> > > > > > > > local state files of a named store in the face of task >> > > > > > > > renumbering. That's not to say that there's not some overlap >> > > > > > > > with the NamedTopologies work, or that there's no value in >> > > > > > > > being able to automatically reuse unnamed stores. But it >> > > > > > > > probably makes sense to let Nick fix this one specific >> > > > > > > > problem instead of coupling it to other large-scale >> > > > > > > > engineering projects. >> > > > > > > > >> > > > > > > > Regarding the KIP itself: >> > > > > > > > >> > > > > > > > (A) is quite clean, but it does make it more challenging to >> > > > > > > > clean up state when tasks migrate to other nodes. If that's >> > > > > > > > the only problem, then I agree this is probably the best >> > > > > > > > solution. >> > > > > > > > >> > > > > > > > (B) also makes a lot of sense to me, and I actually don't >> > > > > > > > think it's a hack. It might also be useful for detecting >> > > > > > > > when a topology has changed unexpectedly, for example. On >> > > > > > > > the other hand, to safely move a state directory from one >> > > > > > > > task directory to the other, we have to be sure no other >> > > > > > > > thread is using either directory. To do that, we could >> > > > > > > > either perform the operation in `KafkaStreams.start()` >> > > > > > > > before any threads are started (we already know the topology >> > > > > > > > at this point), or we can try to grab the directory locks on >> > > > > > > > both tasks (but that sounds like a recipe for deadlock). >> > > > > > > > >> > > > > > > > In a nutshell, I'm supportive of this KIP, and I'd sugest we >> > > > > > > > do a little more discovery on the implications of dropping >> > > > > > > > the task level of the directory hierarchy before committing >> > > > > > > > to A. And/or be a little more specific about how we can >> > > > > > > > safely move state directories around before committing to B. >> > > > > > > > >> > > > > > > > Thanks again! >> > > > > > > > -John >> > > > > > > > >> > > > > > > > On Fri, 2022-02-04 at 11:09 -0800, Guozhang Wang wrote: >> > > > > > > > > Hi folks, >> > > > > > > > > >> > > > > > > > > I think the NamedTopology work would help with the >> > convenience of >> > > > > > the >> > > > > > > > > solution for this KIP, but I feel it is not by itself the >> > solution >> > > > > > here. >> > > > > > > > If >> > > > > > > > > I'm not mistaken, the scope of this KIP is trying to >> tackle >> > that, >> > > > > > > > *assuming >> > > > > > > > > the developer already knows* a new topology or part of the >> > > > topology >> > > > > > e.g. >> > > > > > > > > like a state store of the topology does not change, then >> how >> > to >> > > > > > > > effectively >> > > > > > > > > be able to reuse that part of the topology. Today it is >> very >> > hard >> > > > to >> > > > > > > > reuse >> > > > > > > > > part (say a state store, an internal topic) of a previous >> > > > topology's >> > > > > > > > > persistent state because: >> > > > > > > > > >> > > > > > > > > 1) the names of those persistent states are prefixed by >> the >> > > > > > application >> > > > > > > > id. >> > > > > > > > > 2) the names of those persistent states are suffixed by >> the >> > index, >> > > > > > which >> > > > > > > > > reflects the structure of the topology. >> > > > > > > > > 3) the dir path of the persistent states are "prefixed" by >> > the >> > > > task >> > > > > > id, >> > > > > > > > > which is hence dependent on the sub-topology id. >> > > > > > > > > >> > > > > > > > > My quick thoughts are that 1) is easy to go around as >> long as >> > > > users >> > > > > > reuse >> > > > > > > > > the same appId, 3) can be tackled with the help of the >> named >> > > > > > topology but >> > > > > > > > > each named topology can still be composed of multiple >> > > > > > sub-topologies so >> > > > > > > > > extra work is still needed to align the sub-topology ids, >> > but we >> > > > > > still >> > > > > > > > need >> > > > > > > > > something to tackle 2) here, which I was pondering between >> > those >> > > > > > options >> > > > > > > > > and at the moment leaning towards option 2). >> > > > > > > > > >> > > > > > > > > Does that make sense to you? >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > > On Fri, Feb 4, 2022 at 4:38 AM Nick Telford < >> > > > nick.telf...@gmail.com >> > > > > > > >> > > > > > > > wrote: >> > > > > > > > > >> > > > > > > > > > Hi Guozhang, Sophie, >> > > > > > > > > > >> > > > > > > > > > Thanks for both taking the time to review my proposal. >> > > > > > > > > > >> > > > > > > > > > I did actually see the NamedTopology classes, and noted >> > that >> > > > they >> > > > > > were >> > > > > > > > > > internal. I didn't realise they are part of an intended >> > solution >> > > > > > to >> > > > > > > > this >> > > > > > > > > > problem, that's very interesting. I'm going to try to >> find >> > some >> > > > > > time to >> > > > > > > > > > take a look at your experimental work so I can >> understand >> > it a >> > > > bit >> > > > > > > > better. >> > > > > > > > > > >> > > > > > > > > > From your description, it sounds like the NamedTopology >> > approach >> > > > > > should >> > > > > > > > > > enable users to solve this problem at the level that >> they >> > wish >> > > > > > to. My >> > > > > > > > > > concern is that users will need to be explicit about how >> > their >> > > > > > > > Topology is >> > > > > > > > > > structured, and will need to know in advance how their >> > > > Topologies >> > > > > > might >> > > > > > > > > > evolve in the future in order to correctly break them >> up by >> > > > name. >> > > > > > For >> > > > > > > > > > example, if a user mistakenly assumes one particular >> > structure >> > > > for >> > > > > > > > their >> > > > > > > > > > application, but later makes changes that implicitly >> cause >> > an >> > > > > > existing >> > > > > > > > > > NamedTopology to have its internal Subtopologies >> > re-ordered, the >> > > > > > user >> > > > > > > > will >> > > > > > > > > > need to clear all the local state for that >> NamedTopology, >> > at >> > > > > > least. >> > > > > > > > > > >> > > > > > > > > > Unless I'm mistaken, StateStores are defined exclusively >> > by the >> > > > > > data in >> > > > > > > > > > their changelogs. Even if you make changes to a Topology >> > that >> > > > > > requires >> > > > > > > > > > clearing locally materialized state, the changelogs >> aren't >> > > > > > reset[1], >> > > > > > > > so the >> > > > > > > > > > newly rebuilt state is materialized from the >> pre-existing >> > > > values. >> > > > > > Even >> > > > > > > > if >> > > > > > > > > > changes are made to the Subtopology that writes to the >> > > > > > StateStore, the >> > > > > > > > > > existing data in the changelog hasn't changed. The >> > contents of >> > > > the >> > > > > > > > > > StateStore evolves. This is exactly the same as a >> > traditional >> > > > > > database >> > > > > > > > > > table, where a client may evolve its behaviour to subtly >> > change >> > > > > > the >> > > > > > > > > > semantics of the data written to the table, without >> > deleting the >> > > > > > > > existing >> > > > > > > > > > data. >> > > > > > > > > > >> > > > > > > > > > If a user makes a change that means a different >> Subtopology >> > > > reads >> > > > > > from >> > > > > > > > the >> > > > > > > > > > StateStore, the semantics of, and the data in the store >> > itself >> > > > > > hasn't >> > > > > > > > > > actually changed. The only reason we need to reset this >> > local >> > > > > > state at >> > > > > > > > all >> > > > > > > > > > is due to the conflict on-disk caused by the change in >> > > > Subtopology >> > > > > > > > ordinal. >> > > > > > > > > > If local StateStore data was decoupled from Tasks, this >> > conflict >> > > > > > would >> > > > > > > > > > disappear, and the application would work as expected. >> > > > > > > > > > >> > > > > > > > > > A Subtopology is defined by all connected topics, >> including >> > > > > > changelogs, >> > > > > > > > > > repartition topics, source topics and sink topics. >> Whereas >> > a >> > > > > > > > StateStore is >> > > > > > > > > > defined exclusively by its changelog. So why do we >> tightly >> > > > couple >> > > > > > > > > > StateStore to Subtopology? This is my central argument >> for >> > > > option >> > > > > > A >> > > > > > > > that I >> > > > > > > > > > outlined in the KIP, and I would like to discuss it >> > further, >> > > > even >> > > > > > if >> > > > > > > > only >> > > > > > > > > > to educate myself on why it's not possible :-D >> > > > > > > > > > >> > > > > > > > > > I still think the NamedTopology work is valuable, but >> more >> > as a >> > > > > > means >> > > > > > > > to >> > > > > > > > > > better organize large applications. >> > > > > > > > > > >> > > > > > > > > > Regards, >> > > > > > > > > > Nick >> > > > > > > > > > >> > > > > > > > > > 1: The only exception to this I can think of is when a >> user >> > > > > > decides to >> > > > > > > > > > change the format (Serdes) or semantics of the data in >> the >> > > > store, >> > > > > > in >> > > > > > > > which >> > > > > > > > > > case they would need to do a full reset by also clearing >> > the >> > > > > > changelog >> > > > > > > > > > topic for that store. Realistically, users that wish to >> do >> > this >> > > > > > would >> > > > > > > > be >> > > > > > > > > > better off just creating a new store and deleting the >> old >> > one, >> > > > so >> > > > > > I >> > > > > > > > don't >> > > > > > > > > > think it's a case worth optimizing for. >> > > > > > > > > > >> > > > > > > > > > On Fri, 4 Feb 2022 at 08:22, Sophie Blee-Goldman >> > > > > > > > > > <sop...@confluent.io.invalid> wrote: >> > > > > > > > > > >> > > > > > > > > > > Hey Nick, >> > > > > > > > > > > >> > > > > > > > > > > thanks for the KIP, this is definitely a much-needed >> > feature. >> > > > > > I've >> > > > > > > > > > actually >> > > > > > > > > > > been working on >> > > > > > > > > > > a somewhat similar feature for a while now and have a >> > good >> > > > > > chunk of >> > > > > > > > the >> > > > > > > > > > > implementation >> > > > > > > > > > > completed -- but so far it's only exposed via internal >> > APIs >> > > > and >> > > > > > > > hasn't >> > > > > > > > > > been >> > > > > > > > > > > brought to a KIP >> > > > > > > > > > > yet, as it's a fairly large and complex project and I >> > wanted >> > > > to >> > > > > > get >> > > > > > > > all >> > > > > > > > > > the >> > > > > > > > > > > details hashed out >> > > > > > > > > > > before settling on a public API. >> > > > > > > > > > > >> > > > > > > > > > > For some sense of how complicated it's been, you can >> > check out >> > > > > > the >> > > > > > > > JIRA >> > > > > > > > > > > ticket we've been >> > > > > > > > > > > filing PRs under -- there are already 25 PRs to the >> > feature. >> > > > See >> > > > > > > > > > > KAFKA-12648 >> > > > > > > > > > > <https://issues.apache.org/jira/browse/KAFKA-12648>. >> > You can >> > > > > > check >> > > > > > > > > > > out the new KafkaStreamsNamedTopologyWrapper class to >> > see what >> > > > > > the >> > > > > > > > > > current >> > > > > > > > > > > API looks like >> > > > > > > > > > > -- I recommend taking a look to see if this might >> cover >> > some >> > > > or >> > > > > > all >> > > > > > > > of >> > > > > > > > > > the >> > > > > > > > > > > things you wanted >> > > > > > > > > > > this KIP to do. >> > > > > > > > > > > >> > > > > > > > > > > For a high-level sketch, my work introduces the >> concept >> > of a >> > > > > > > > > > > "NamedTopology" (which will be >> > > > > > > > > > > renamed to "ModularTopology" in the future, but is >> still >> > > > > > referred to >> > > > > > > > as >> > > > > > > > > > > "named" in the codebase >> > > > > > > > > > > so I'll keep using it for now) . Each KafkaStreams app >> > can >> > > > > > execute >> > > > > > > > > > multiple >> > > > > > > > > > > named topologies, >> > > > > > > > > > > which are just regular topologies that are given a >> unique >> > > > name. >> > > > > > The >> > > > > > > > > > > essential feature of a >> > > > > > > > > > > named topology is that it can be dynamically added or >> > removed >> > > > > > without >> > > > > > > > > > even >> > > > > > > > > > > stopping the >> > > > > > > > > > > application, much less resetting it. Technically a >> > > > > > NamedTopology can >> > > > > > > > be >> > > > > > > > > > > composed or one >> > > > > > > > > > > or more subtopologies, but if you want to be able to >> > update >> > > > the >> > > > > > > > > > application >> > > > > > > > > > > at a subtopology >> > > > > > > > > > > level you can just name each subtopology. >> > > > > > > > > > > >> > > > > > > > > > > So I believe the feature you want is actually already >> > > > > > implemented, >> > > > > > > > for >> > > > > > > > > > the >> > > > > > > > > > > most part -- it's currently >> > > > > > > > > > > missing a few things that I just didn't bother to >> > implement >> > > > yet >> > > > > > since >> > > > > > > > > > I've >> > > > > > > > > > > been focused >> > > > > > > > > > > on getting a working, minimal POC that I could use for >> > > > testing. >> > > > > > (For >> > > > > > > > > > > example it doesn't yet >> > > > > > > > > > > support global state stores) But beyond that, the only >> > > > remaining >> > > > > > > > work to >> > > > > > > > > > > make this feature >> > > > > > > > > > > available is to settle on the APIs, get a KIP passed, >> and >> > > > > > implement >> > > > > > > > said >> > > > > > > > > > > APIs. >> > > > > > > > > > > >> > > > > > > > > > > Would you be interested in helping out with the >> > NamedTopology >> > > > > > work >> > > > > > > > so we >> > > > > > > > > > > can turn it into a >> > > > > > > > > > > a full-fledged public feature? I'm happy to let you >> take >> > the >> > > > > > lead on >> > > > > > > > the >> > > > > > > > > > > KIP, maybe by adapting >> > > > > > > > > > > this one if you think it makes sense to do so. The >> > > > NamedTopology >> > > > > > > > feature >> > > > > > > > > > is >> > > > > > > > > > > somewhat larger >> > > > > > > > > > > in scope than strictly necessary for your purposes, >> > however, >> > > > so >> > > > > > you >> > > > > > > > could >> > > > > > > > > > > take on just a part >> > > > > > > > > > > of it and leave anything beyond that for me to do as >> > followup. >> > > > > > > > > > > >> > > > > > > > > > > By the way: one advantage of the NamedTopology feature >> > is that >> > > > > > we >> > > > > > > > don't >> > > > > > > > > > > have to worry about >> > > > > > > > > > > any compatibility issues or upgrade/migration path -- >> > it's >> > > > > > opt-in by >> > > > > > > > > > > definition. (Of course we would >> > > > > > > > > > > recommend using it to all users, like we do with named >> > > > > > operators) >> > > > > > > > > > > >> > > > > > > > > > > Let me know what you think and how you want to proceed >> > from >> > > > > > here -- I >> > > > > > > > > > > wouldn't want you to >> > > > > > > > > > > spend time re-implementing more or less the same >> thing, >> > but I >> > > > > > most >> > > > > > > > likely >> > > > > > > > > > > wasn't going to find time >> > > > > > > > > > > to put out a KIP for the NamedTopology feature in the >> > near >> > > > > > future. >> > > > > > > > If you >> > > > > > > > > > > would be able to help >> > > > > > > > > > > drive this to completion, we'd each have significantly >> > less >> > > > > > work to >> > > > > > > > do to >> > > > > > > > > > > achieve our goals :) >> > > > > > > > > > > >> > > > > > > > > > > Cheers, >> > > > > > > > > > > Sophie >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > > On Thu, Feb 3, 2022 at 6:12 PM Guozhang Wang < >> > > > > > wangg...@gmail.com> >> > > > > > > > wrote: >> > > > > > > > > > > >> > > > > > > > > > > > Hello Nick, >> > > > > > > > > > > > >> > > > > > > > > > > > Thanks for bringing this up and for the proposed >> > options. I >> > > > > > read >> > > > > > > > though >> > > > > > > > > > > > your writeup and here are some of my thoughts: >> > > > > > > > > > > > >> > > > > > > > > > > > 1) When changing the topology of Kafka Streams, the >> > > > developer >> > > > > > need >> > > > > > > > to >> > > > > > > > > > > first >> > > > > > > > > > > > decide if the whole topology's persisted state >> > (including >> > > > > > both the >> > > > > > > > > > state >> > > > > > > > > > > > store as well as its changelogs, and the repartition >> > topics, >> > > > > > and >> > > > > > > > the >> > > > > > > > > > > > source/sink external topics) or part of the >> persisted >> > state >> > > > > > can be >> > > > > > > > > > > reused. >> > > > > > > > > > > > This involves two types of changes: >> > > > > > > > > > > > >> > > > > > > > > > > > a) structural change of the topology, such like a >> new >> > > > > > processor >> > > > > > > > node is >> > > > > > > > > > > > added/removed, a new intermediate topic is >> > added/removed >> > > > etc. >> > > > > > > > > > > > b) semantic change of a processor, such as a >> numerical >> > > > filter >> > > > > > node >> > > > > > > > > > > changing >> > > > > > > > > > > > its filter threshold etc. >> > > > > > > > > > > > >> > > > > > > > > > > > Today both of them are more or less determined by >> > developers >> > > > > > > > manually. >> > > > > > > > > > > > However, though automatically determining on changes >> > of type >> > > > > > b) is >> > > > > > > > hard >> > > > > > > > > > > if >> > > > > > > > > > > > not possible, automatic determining on the type of >> a) >> > is >> > > > > > doable >> > > > > > > > since >> > > > > > > > > > > it's >> > > > > > > > > > > > depend on just the information of: >> > > > > > > > > > > > * number of sub-topologies, and their orders (i.e. >> > sequence >> > > > > > of ids) >> > > > > > > > > > > > * used state stores and changelog topics within the >> > > > > > sub-topology >> > > > > > > > > > > > * used repartition topics >> > > > > > > > > > > > * etc >> > > > > > > > > > > > >> > > > > > > > > > > > So let's assume in the long run we can indeed >> > automatically >> > > > > > > > determine >> > > > > > > > > > if >> > > > > > > > > > > a >> > > > > > > > > > > > topology or part of it (a sub-topology) is >> > structurally the >> > > > > > same, >> > > > > > > > what >> > > > > > > > > > we >> > > > > > > > > > > > can do is to "translate" the old persisted state >> names >> > to >> > > > the >> > > > > > > > > > > > new, isomorphic topology's names. Following this >> > thought I'm >> > > > > > > > leaning >> > > > > > > > > > > > towards the direction of option B in your proposal. >> But >> > > > since >> > > > > > in >> > > > > > > > this >> > > > > > > > > > KIP >> > > > > > > > > > > > automatic determining structural changes are out of >> the >> > > > > > scope, I >> > > > > > > > feel >> > > > > > > > > > we >> > > > > > > > > > > > can consider adding some sort of a "migration tool" >> > from an >> > > > > > old >> > > > > > > > > > topology >> > > > > > > > > > > to >> > > > > > > > > > > > new topology by renaming all the persisted states >> > (store >> > > > dirs >> > > > > > and >> > > > > > > > > > names, >> > > > > > > > > > > > topic names). >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > Guozhang >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > On Tue, Jan 25, 2022 at 9:10 AM Nick Telford < >> > > > > > > > nick.telf...@gmail.com> >> > > > > > > > > > > > wrote: >> > > > > > > > > > > > >> > > > > > > > > > > > > Hi everyone, >> > > > > > > > > > > > > >> > > > > > > > > > > > > I'd like to start a discussion on Kafka Streams >> > KIP-816 ( >> > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > >> > > > > > >> > > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-816%3A+Topology+changes+without+local+state+reset >> > > > > > > > > > > > > ) >> > > > > > > > > > > > > >> > > > > > > > > > > > > This KIP outlines 3 possible solutions to the >> > problem, >> > > > and I >> > > > > > > > plan to >> > > > > > > > > > > > > whittle this down to a definitive solution based >> on >> > this >> > > > > > > > discussion. >> > > > > > > > > > > > > >> > > > > > > > > > > > > Of the 3 proposed solutions: >> > > > > > > > > > > > > * 'A' is probably the "correct" solution, but is >> also >> > > > quite >> > > > > > a >> > > > > > > > > > > significant >> > > > > > > > > > > > > change. >> > > > > > > > > > > > > * 'B' is the least invasive, but most "hacky" >> > solution. >> > > > > > > > > > > > > * 'C' requires a change to the wire protocol and >> will >> > > > > > likely have >> > > > > > > > > > > > > unintended consequences. C is also the least >> complete >> > > > > > solution, >> > > > > > > > and >> > > > > > > > > > > will >> > > > > > > > > > > > > need significant additional work to make it work. >> > > > > > > > > > > > > >> > > > > > > > > > > > > Please let me know if the Motivation and >> Background >> > > > > > sections need >> > > > > > > > > > more >> > > > > > > > > > > > > clarity. >> > > > > > > > > > > > > >> > > > > > > > > > > > > Regards, >> > > > > > > > > > > > > >> > > > > > > > > > > > > Nick Telford >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > > -- >> > > > > > > > > > > > -- Guozhang >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > >> > > > > > >> > > > > >> > > > > -- >> > > > > -- Guozhang >> > > > > >> > > > >> > > > >> > > > -- >> > > > -- Guozhang >> > >> > >> >> -- >> -- Guozhang >> >