Thanks, Nick,

It sounds like we're on the same page. I didn't think (A)
would be fundamentally "hard", just that it might be a pain
in practice. Reading your response, if you're up for it, it
sounds like a prototype of (A) would be the tie-breaker
between the two approaches.

To be honest, I've been burned enough times that I tend to
prototype my KIPs more often than not anyway.

Thanks,
-John

On Mon, 2022-02-07 at 11:42 +0000, Nick Telford wrote:
> Hi everyone,
> 
> Guozhang, the scope of my KIP is specifically about deploying structural
> changes to existing applications, i.e. "upgrades". Sharing state between
> different applications was not in the scope of my original proposal.
> 
> John's email has it exactly right, and I think this points to my KIP not
> explaining the problem correctly. Any suggestions on how I could better
> clarify the intent of my proposal in the KIP?
> 
> John, regarding your comments:
> 
> A) being difficult to clean up state after migrations. Unless I've missed
> something, this shouldn't be a problem. Tasks are already internally aware
> of which stores they own from the Topology structure, irrespective of where
> on-disk the StateStore data is, they should be able to find it. I think the
> only real issue with this approach is that it will require changing, most
> likely, quite a bit of code. We'll need to separate the concept of "state
> directory" from "task directory", (which will still be needed to store Task
> meta-data, like .lock files). At the very least, I think significant
> changes may need to be made to StateDirectory and StateManager, but I
> haven't investigated in detail. Perhaps it would make sense to first
> explore this approach with a prototype to see how invasive it would become?
> 
> B) My intent was always that this process would occur between
> KafkaStreams.start() and threads actually starting, so that the migration
> would occur safely. I'm not sure what kind of unexpected structural changes
> could be detected by such a process; it might just be useful for general
> validation. The main reasons I prefer (A) is that: 1) (B) requires
> additional state meta-data, whereas (A) does not, which is an increase in
> system complexity and; 2) I believe that (A) actually addresses a semantic
> bug: specifically that StateStores are tightly coupled to Tasks, which is
> unnecessary. Reducing this coupling would add no complexity, and
> potentially simplify other processes in the future.
> 
> Regards,
> 
> Nick
> 
> On Sat, 5 Feb 2022 at 17:19, John Roesler <vvcep...@apache.org> wrote:
> 
> > Hello all,
> > 
> > Thanks for the KIP, Nick!
> > 
> > Based on this conversation, I think I might have misread the
> > KIP, but it looks like Nick is just proposing a small fix to
> > the existing compatability mechanism.
> > 
> > Although we tell people to avoid changing topologies on the
> > fly in general, we also tell them that, if they name all the
> > persistent resources (stores and repartition nodes), then
> > they can change the topologies without breaking anything
> > (provided the change itself is logically sound).
> > 
> > It seems like this KIP is just pointing out a flaw in that
> > mechanism, that the (named) stores are kept inside the task
> > directories, so if some change renumbers the tasks, Streams
> > won't be able to find the local store files anymore. IIUC,
> > the changelog topic will still be fine, so Streams would
> > just allocate a new state directory in the new task name and
> > restore the changelog into it.
> > 
> > So, I think all this KIP is after is a way to preserve the
> > local state files of a named store in the face of task
> > renumbering. That's not to say that there's not some overlap
> > with the NamedTopologies work, or that there's no value in
> > being able to automatically reuse unnamed stores. But it
> > probably makes sense to let Nick fix this one specific
> > problem instead of coupling it to other large-scale
> > engineering projects.
> > 
> > Regarding the KIP itself:
> > 
> > (A) is quite clean, but it does make it more challenging to
> > clean up state when tasks migrate to other nodes. If that's
> > the only problem, then I agree this is probably the best
> > solution.
> > 
> > (B) also makes a lot of sense to me, and I actually don't
> > think it's a hack. It might also be useful for detecting
> > when a topology has changed unexpectedly, for example. On
> > the other hand, to safely move a state directory from one
> > task directory to the other, we have to be sure no other
> > thread is using either directory. To do that, we could
> > either perform the operation in `KafkaStreams.start()`
> > before any threads are started (we already know the topology
> > at this point), or we can try to grab the directory locks on
> > both tasks (but that sounds like a recipe for deadlock).
> > 
> > In a nutshell, I'm supportive of this KIP, and I'd sugest we
> > do a little more discovery on the implications of dropping
> > the task level of the directory hierarchy before committing
> > to A. And/or be a little more specific about how we can
> > safely move state directories around before committing to B.
> > 
> > Thanks again!
> > -John
> > 
> > On Fri, 2022-02-04 at 11:09 -0800, Guozhang Wang wrote:
> > > Hi folks,
> > > 
> > > I think the NamedTopology work would help with the convenience of the
> > > solution for this KIP, but I feel it is not by itself the solution here.
> > If
> > > I'm not mistaken, the scope of this KIP is trying to tackle that,
> > *assuming
> > > the developer already knows* a new topology or part of the topology e.g.
> > > like a state store of the topology does not change, then how to
> > effectively
> > > be able to reuse that part of the topology. Today it is very hard to
> > reuse
> > > part (say a state store, an internal topic) of a previous topology's
> > > persistent state because:
> > > 
> > > 1) the names of those persistent states are prefixed by the application
> > id.
> > > 2) the names of those persistent states are suffixed by the index, which
> > > reflects the structure of the topology.
> > > 3) the dir path of the persistent states are "prefixed" by the task id,
> > > which is hence dependent on the sub-topology id.
> > > 
> > > My quick thoughts are that 1) is easy to go around as long as users reuse
> > > the same appId, 3) can be tackled with the help of the named topology but
> > > each named topology can still be composed of multiple sub-topologies so
> > > extra work is still needed to align the sub-topology ids, but we still
> > need
> > > something to tackle 2) here, which I was pondering between those options
> > > and at the moment leaning towards option 2).
> > > 
> > > Does that make sense to you?
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > 
> > > On Fri, Feb 4, 2022 at 4:38 AM Nick Telford <nick.telf...@gmail.com>
> > wrote:
> > > 
> > > > Hi Guozhang, Sophie,
> > > > 
> > > > Thanks for both taking the time to review my proposal.
> > > > 
> > > > I did actually see the NamedTopology classes, and noted that they were
> > > > internal. I didn't realise they are part of an intended solution to
> > this
> > > > problem, that's very interesting. I'm going to try to find some time to
> > > > take a look at your experimental work so I can understand it a bit
> > better.
> > > > 
> > > > From your description, it sounds like the NamedTopology approach should
> > > > enable users to solve this problem at the level that they wish to. My
> > > > concern is that users will need to be explicit about how their
> > Topology is
> > > > structured, and will need to know in advance how their Topologies might
> > > > evolve in the future in order to correctly break them up by name. For
> > > > example, if a user mistakenly assumes one particular structure for
> > their
> > > > application, but later makes changes that implicitly cause an existing
> > > > NamedTopology to have its internal Subtopologies re-ordered, the user
> > will
> > > > need to clear all the local state for that NamedTopology, at least.
> > > > 
> > > > Unless I'm mistaken, StateStores are defined exclusively by the data in
> > > > their changelogs. Even if you make changes to a Topology that requires
> > > > clearing locally materialized state, the changelogs aren't reset[1],
> > so the
> > > > newly rebuilt state is materialized from the pre-existing values. Even
> > if
> > > > changes are made to the Subtopology that writes to the StateStore, the
> > > > existing data in the changelog hasn't changed. The contents of the
> > > > StateStore evolves. This is exactly the same as a traditional database
> > > > table, where a client may evolve its behaviour to subtly change the
> > > > semantics of the data written to the table, without deleting the
> > existing
> > > > data.
> > > > 
> > > > If a user makes a change that means a different Subtopology reads from
> > the
> > > > StateStore, the semantics of, and the data in the store itself hasn't
> > > > actually changed. The only reason we need to reset this local state at
> > all
> > > > is due to the conflict on-disk caused by the change in Subtopology
> > ordinal.
> > > > If local StateStore data was decoupled from Tasks, this conflict would
> > > > disappear, and the application would work as expected.
> > > > 
> > > > A Subtopology is defined by all connected topics, including changelogs,
> > > > repartition topics, source topics and sink topics. Whereas a
> > StateStore is
> > > > defined exclusively by its changelog. So why do we tightly couple
> > > > StateStore to Subtopology? This is my central argument for option A
> > that I
> > > > outlined in the KIP, and I would like to discuss it further, even if
> > only
> > > > to educate myself on why it's not possible :-D
> > > > 
> > > > I still think the NamedTopology work is valuable, but more as a means
> > to
> > > > better organize large applications.
> > > > 
> > > > Regards,
> > > > Nick
> > > > 
> > > > 1: The only exception to this I can think of is when a user decides to
> > > > change the format (Serdes) or semantics of the data in the store, in
> > which
> > > > case they would need to do a full reset by also clearing the changelog
> > > > topic for that store. Realistically, users that wish to do this would
> > be
> > > > better off just creating a new store and deleting the old one, so I
> > don't
> > > > think it's a case worth optimizing for.
> > > > 
> > > > On Fri, 4 Feb 2022 at 08:22, Sophie Blee-Goldman
> > > > <sop...@confluent.io.invalid> wrote:
> > > > 
> > > > > Hey Nick,
> > > > > 
> > > > > thanks for the KIP, this is definitely a much-needed feature. I've
> > > > actually
> > > > > been working on
> > > > > a somewhat similar feature for a while now and have a good chunk of
> > the
> > > > > implementation
> > > > > completed -- but so far it's only exposed via internal APIs and
> > hasn't
> > > > been
> > > > > brought to a KIP
> > > > > yet, as it's a fairly large and complex project and I wanted to get
> > all
> > > > the
> > > > > details hashed out
> > > > > before settling on a public API.
> > > > > 
> > > > > For some sense of how complicated it's been, you can check out the
> > JIRA
> > > > > ticket we've been
> > > > > filing PRs under -- there are already 25 PRs to the feature. See
> > > > > KAFKA-12648
> > > > > <https://issues.apache.org/jira/browse/KAFKA-12648>. You can check
> > > > > out the new KafkaStreamsNamedTopologyWrapper class to see what the
> > > > current
> > > > > API looks like
> > > > > -- I recommend taking a look to see if this might cover some or all
> > of
> > > > the
> > > > > things you wanted
> > > > > this KIP to do.
> > > > > 
> > > > > For a high-level sketch, my work introduces the concept of a
> > > > > "NamedTopology" (which will be
> > > > > renamed to "ModularTopology" in the future, but is still referred to
> > as
> > > > > "named" in the codebase
> > > > > so I'll keep using it for now) . Each KafkaStreams app can execute
> > > > multiple
> > > > > named topologies,
> > > > > which are just regular topologies that are given a unique name. The
> > > > > essential feature of a
> > > > > named topology is that it can be dynamically added or removed without
> > > > even
> > > > > stopping the
> > > > > application, much less resetting it. Technically a NamedTopology can
> > be
> > > > > composed or one
> > > > > or more subtopologies, but if you want to be able to update the
> > > > application
> > > > > at a subtopology
> > > > > level you can just name each  subtopology.
> > > > > 
> > > > > So I believe the feature you want is actually already implemented,
> > for
> > > > the
> > > > > most part -- it's currently
> > > > > missing a few things that I just didn't bother to implement yet since
> > > > I've
> > > > > been focused
> > > > > on getting a working, minimal POC that I could use for testing. (For
> > > > > example it doesn't yet
> > > > > support global state stores) But beyond that, the only remaining
> > work to
> > > > > make this feature
> > > > > available is to settle on the APIs, get a KIP passed, and implement
> > said
> > > > > APIs.
> > > > > 
> > > > > Would you be interested in helping out with the NamedTopology work
> > so we
> > > > > can turn it into a
> > > > > a full-fledged public feature? I'm happy to let you take the lead on
> > the
> > > > > KIP, maybe by adapting
> > > > > this one if you think it makes sense to do so. The NamedTopology
> > feature
> > > > is
> > > > > somewhat larger
> > > > > in scope than strictly necessary for your purposes, however, so you
> > could
> > > > > take on just a part
> > > > > of it and leave anything beyond that for me to do as followup.
> > > > > 
> > > > > By the way: one advantage of the NamedTopology feature is that we
> > don't
> > > > > have to worry about
> > > > > any compatibility issues or upgrade/migration path -- it's opt-in by
> > > > > definition. (Of course we would
> > > > > recommend using it to all users, like we do with named operators)
> > > > > 
> > > > > Let me know what you think and how you want to proceed from here -- I
> > > > > wouldn't want you to
> > > > > spend time re-implementing more or less the same thing, but I most
> > likely
> > > > > wasn't going to find time
> > > > > to put out a KIP for the NamedTopology feature in the near future.
> > If you
> > > > > would be able to help
> > > > > drive this to completion, we'd each have significantly less work to
> > do to
> > > > > achieve our goals :)
> > > > > 
> > > > > Cheers,
> > > > > Sophie
> > > > > 
> > > > > 
> > > > > On Thu, Feb 3, 2022 at 6:12 PM Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > > > > 
> > > > > > Hello Nick,
> > > > > > 
> > > > > > Thanks for bringing this up and for the proposed options. I read
> > though
> > > > > > your writeup and here are some of my thoughts:
> > > > > > 
> > > > > > 1) When changing the topology of Kafka Streams, the developer need
> > to
> > > > > first
> > > > > > decide if the whole topology's persisted state (including both the
> > > > state
> > > > > > store as well as its changelogs, and the repartition topics, and
> > the
> > > > > > source/sink external topics) or part of the persisted state can be
> > > > > reused.
> > > > > > This involves two types of changes:
> > > > > > 
> > > > > > a) structural change of the topology, such like a new processor
> > node is
> > > > > > added/removed, a new intermediate topic is added/removed etc.
> > > > > > b) semantic change of a processor, such as a numerical filter node
> > > > > changing
> > > > > > its filter threshold etc.
> > > > > > 
> > > > > > Today both of them are more or less determined by developers
> > manually.
> > > > > > However, though automatically determining on changes of type b) is
> > hard
> > > > > if
> > > > > > not possible, automatic determining on the type of a) is doable
> > since
> > > > > it's
> > > > > > depend on just the information of:
> > > > > > * number of sub-topologies, and their orders (i.e. sequence of ids)
> > > > > > * used state stores and changelog topics within the sub-topology
> > > > > > * used repartition topics
> > > > > > * etc
> > > > > > 
> > > > > > So let's assume in the long run we can indeed automatically
> > determine
> > > > if
> > > > > a
> > > > > > topology or part of it (a sub-topology) is structurally the same,
> > what
> > > > we
> > > > > > can do is to "translate" the old persisted state names to the
> > > > > > new, isomorphic topology's names. Following this thought I'm
> > leaning
> > > > > > towards the direction of option B in your proposal. But since in
> > this
> > > > KIP
> > > > > > automatic determining structural changes are out of the scope, I
> > feel
> > > > we
> > > > > > can consider adding some sort of a "migration tool" from an old
> > > > topology
> > > > > to
> > > > > > new topology by renaming all the persisted states (store dirs and
> > > > names,
> > > > > > topic names).
> > > > > > 
> > > > > > 
> > > > > > Guozhang
> > > > > > 
> > > > > > 
> > > > > > On Tue, Jan 25, 2022 at 9:10 AM Nick Telford <
> > nick.telf...@gmail.com>
> > > > > > wrote:
> > > > > > 
> > > > > > > Hi everyone,
> > > > > > > 
> > > > > > > I'd like to start a discussion on Kafka Streams KIP-816 (
> > > > > > > 
> > > > > > > 
> > > > > > 
> > > > > 
> > > > 
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-816%3A+Topology+changes+without+local+state+reset
> > > > > > > )
> > > > > > > 
> > > > > > > This KIP outlines 3 possible solutions to the problem, and I
> > plan to
> > > > > > > whittle this down to a definitive solution based on this
> > discussion.
> > > > > > > 
> > > > > > > Of the 3 proposed solutions:
> > > > > > > * 'A' is probably the "correct" solution, but is also quite a
> > > > > significant
> > > > > > > change.
> > > > > > > * 'B' is the least invasive, but most "hacky" solution.
> > > > > > > * 'C' requires a change to the wire protocol and will likely have
> > > > > > > unintended consequences. C is also the least complete solution,
> > and
> > > > > will
> > > > > > > need significant additional work to make it work.
> > > > > > > 
> > > > > > > Please let me know if the Motivation and Background sections need
> > > > more
> > > > > > > clarity.
> > > > > > > 
> > > > > > > Regards,
> > > > > > > 
> > > > > > > Nick Telford
> > > > > > > 
> > > > > > 
> > > > > > 
> > > > > > --
> > > > > > -- Guozhang
> > > > > > 
> > > > > 
> > > > 
> > > 
> > > 
> > 
> > 

Reply via email to