Hi everyone,

I'm going to resurrect this KIP, because I would like the community to
benefit from our solution.

In the end, we internally solved this problem using Option B: automatically
moving state directories to the correct location whenever they're no longer
aligned with the Topology. We implemented this for ourselves externally to
Kafka Streams, by using Topology#describe() to analyse the Topology, and
then moving state directories before calling KafkaStreams#start().

I've updated/re-written the KIP to focus on this solution, albeit properly
integrated into Kafka Streams.

Let me know what you think,

Nick

On Tue, 15 Feb 2022 at 16:23, Nick Telford <nick.telf...@gmail.com> wrote:

> In the KIP, for Option A I suggested a new path of:
>
> /state/dir/stores/<store name>/<partition number>
>
> I made the mistake of thinking that the rocksdb/ segment goes *after* the
> store name in the current scheme, e.g.
>
> /state/dir/<task id>/<store name>[/rocksdb]
>
> This is a mistake. I'd always intended for a combination of the store name
> and partition number to be encoded in the new path (instead of the store
> name and task ID, that we have now). The exact encoding doesn't really
> bother me too much, so if you have any conventions you think we should
> follow here (hyphenated vs. underscored vs. directory separator, etc.)
> please let me know.
>
> I should be able to find some time hopefully next week to start working on
> this, which should shed some more light on issues that might arise.
>
> In the meantime I'll correct the KIP to include the rocksdb segment.
>
> Thanks everyone for your input so far!
>
> Nick
>
> On Mon, 14 Feb 2022 at 22:02, Guozhang Wang <wangg...@gmail.com> wrote:
>
>> Thanks for the clarification John!
>>
>> Nick, sorry that I was not super clear in my latest email. I meant exactly
>> what John said.
>>
>> Just to clarify, I do think that this KIP is relatively orthogonal to the
>> named topology work; as long as we still keep the topo name encoded it
>> should be fine since two named topologies can indeed have the same store
>> name, but that would not need to be considered as part of this KIP.
>>
>>
>> Guozhang
>>
>> On Mon, Feb 14, 2022 at 9:02 AM John Roesler <vvcep...@apache.org> wrote:
>>
>> > Hi Nick,
>> >
>> > When Guozgang and I were chatting, we realized that it’s not completely
>> > sufficient just to move the state store directories, because their names
>> > are not unique. In particular, more than one partition of the store may
>> be
>> > assigned to the same instance. Right now, this is handled because the
>> task
>> > is encoded the partition number.
>> >
>> > For example, if we have a store "mystore" in subtopology 1 and we have
>> two
>> > out of four partitions (0 and 3) assigned to the local node, the disk
>> will
>> > have these paths:
>> >
>> > {app_id}/1_0/rocksdb/mystore
>> > {app_id}/1_3/rocksdb/mystore
>> >
>> > Clearly, we can't just elevate both "mystore" directories to reside
>> under
>> > {appid}, because
>> > they have the same name. When I think of option (A), here's what I
>> picture:
>> >
>> > {app_id}/rocksdb/mystore-0
>> > {app_id}/rocksdb/mystore-3
>> >
>> > In the future, one thing we're considering to do is actually store all
>> the
>> > positions in the same rocksDB database, which is a pretty convenient
>> step
>> > away from option (A) (another reason to prefer it to option (B) ).
>> >
>> > I just took a look at how named topologies are handled, and they're
>> > actually
>> > a separate path segment, not part of the task id, like this:
>> >
>> > {app_id}/__{topo_name}__/1_0/rocksdb/mystore
>> > {app_id}/__{topo_name}__/1_3/rocksdb/mystore
>> >
>> > Which is pretty convenient because it means there are no
>> > implications for your proposal. If you implement the above
>> > code, then we'll just wind up with:
>> >
>> > {app_id}/__{topo_name}__/rocksdb/mystore-0
>> > {app_id}/__{topo_name}__/rocksdb/mystore-3
>> >
>> > Does that make sense?
>> >
>> > Thanks,
>> > -John
>> >
>> >
>> > On Mon, Feb 14, 2022, at 03:57, Nick Telford wrote:
>> > > Hi Guozhang,
>> > >
>> > > Sorry I haven't had the time to respond to your earlier email, but I
>> just
>> > > wanted to clarify something with respect to your most recent email.
>> > >
>> > > My original plan in option A is to remove the entire Task ID from the
>> > State
>> > > Store path, which would insulate it from any changes to the Task ID
>> > format
>> > > introduced by Named Topologies or anything else. This would in fact
>> > > consolidate the store for the instance, rather than by-Task (which I
>> > think
>> > > is what you meant by "one physical store per state"?).
>> > >
>> > > I did highlight in option C the possibility of changing the format of
>> the
>> > > Task ID to change the sub-topology ID from an ordinal to a stable
>> > > identifier. Although I'm not convinced that this option is viable, or
>> > even
>> > > desirable.
>> > >
>> > > Regards,
>> > >
>> > > Nick
>> > >
>> > > On Sat, 12 Feb 2022 at 00:36, Guozhang Wang <wangg...@gmail.com>
>> wrote:
>> > >
>> > > > Just to follow-up on this thread, I had another chat with John
>> > regarding
>> > > > option a) and I think the key thought is that, today the task-id is
>> in
>> > the
>> > > > form of [sub-topologyID]-[partitionID] --- and in the future with
>> > > > named-topology it could be extended to three digits as
>> > > > [named-topologyID]-[sub-topologyID]-[partitionID] --- and for the
>> > purpose
>> > > > of this KIP's option A), we actually just want to remove the
>> > > > [sub-topologyID] from the taskID as part of the file path hierarchy,
>> > right?
>> > > >
>> > > > If yes, given that in the future we want:
>> > > >
>> > > > * allow topology evolution with compatibility validations.
>> > > > * consolidating persistent state stores so that we do not have one
>> > physical
>> > > > store per state, but potentially one store for the whole instance.
>> > > >
>> > > > No matter if we want to provide certain tooling for mapping the
>> > persistent
>> > > > state path / names as in option B), pursuing some solutions in the
>> > > > direction of option A) to be independent of the sub-topologyID since
>> > state
>> > > > store names within a topology should be sufficiently unique would
>> make
>> > a
>> > > > lot of sense.
>> > > >
>> > > >
>> > > > On Mon, Feb 7, 2022 at 3:52 PM Guozhang Wang <wangg...@gmail.com>
>> > wrote:
>> > > >
>> > > > > Hello Nick,
>> > > > >
>> > > > > I think I'm on the same page of the scope of your KIP, and what I
>> was
>> > > > > trying to get is that, there are some other efforts going on in
>> > parallel
>> > > > > that tries to identify if two topologies, or some part of them,
>> are
>> > > > > isomorphic in structure, and hence their corresponding persistent
>> > states
>> > > > > may be reusable. That's why I was saying that "assume, we know
>> which
>> > > > > persistent states can be reusable". i.e. let's say we know that
>> the
>> > new
>> > > > > topology's sub-topology 1's state store A-0005 is the same as the
>> old
>> > > > > topology's sub-topology 0' state store A-0004, then what we can do
>> > to let
>> > > > > the new topology state store to be loaded as the old state store.
>> > With
>> > > > that
>> > > > > in my mind originally, I said maybe option B) is sufficient to
>> > rename the
>> > > > > dir path / state store names before we start the new app's
>> topology.
>> > But
>> > > > > that's just one aspect of it and we do not necessarily need to
>> > follow :)
>> > > > If
>> > > > > you are up to do a prototype following option A) with a good
>> upgrade
>> > > > path,
>> > > > > it would be a great solution too.
>> > > > >
>> > > > >
>> > > > > On Mon, Feb 7, 2022 at 8:59 AM John Roesler <vvcep...@apache.org>
>> > wrote:
>> > > > >
>> > > > > > Thanks, Nick,
>> > > > > >
>> > > > > > It sounds like we're on the same page. I didn't think (A)
>> > > > > > would be fundamentally "hard", just that it might be a pain
>> > > > > > in practice. Reading your response, if you're up for it, it
>> > > > > > sounds like a prototype of (A) would be the tie-breaker
>> > > > > > between the two approaches.
>> > > > > >
>> > > > > > To be honest, I've been burned enough times that I tend to
>> > > > > > prototype my KIPs more often than not anyway.
>> > > > > >
>> > > > > > Thanks,
>> > > > > > -John
>> > > > > >
>> > > > > > On Mon, 2022-02-07 at 11:42 +0000, Nick Telford wrote:
>> > > > > > > Hi everyone,
>> > > > > > >
>> > > > > > > Guozhang, the scope of my KIP is specifically about deploying
>> > > > structural
>> > > > > > > changes to existing applications, i.e. "upgrades". Sharing
>> state
>> > > > between
>> > > > > > > different applications was not in the scope of my original
>> > proposal.
>> > > > > > >
>> > > > > > > John's email has it exactly right, and I think this points to
>> my
>> > KIP
>> > > > not
>> > > > > > > explaining the problem correctly. Any suggestions on how I
>> could
>> > > > better
>> > > > > > > clarify the intent of my proposal in the KIP?
>> > > > > > >
>> > > > > > > John, regarding your comments:
>> > > > > > >
>> > > > > > > A) being difficult to clean up state after migrations. Unless
>> > I've
>> > > > > > missed
>> > > > > > > something, this shouldn't be a problem. Tasks are already
>> > internally
>> > > > > > aware
>> > > > > > > of which stores they own from the Topology structure,
>> > irrespective of
>> > > > > > where
>> > > > > > > on-disk the StateStore data is, they should be able to find
>> it. I
>> > > > think
>> > > > > > the
>> > > > > > > only real issue with this approach is that it will require
>> > changing,
>> > > > > > most
>> > > > > > > likely, quite a bit of code. We'll need to separate the
>> concept
>> > of
>> > > > > > "state
>> > > > > > > directory" from "task directory", (which will still be needed
>> to
>> > store
>> > > > > > Task
>> > > > > > > meta-data, like .lock files). At the very least, I think
>> > significant
>> > > > > > > changes may need to be made to StateDirectory and
>> StateManager,
>> > but I
>> > > > > > > haven't investigated in detail. Perhaps it would make sense to
>> > first
>> > > > > > > explore this approach with a prototype to see how invasive it
>> > would
>> > > > > > become?
>> > > > > > >
>> > > > > > > B) My intent was always that this process would occur between
>> > > > > > > KafkaStreams.start() and threads actually starting, so that
>> the
>> > > > > > migration
>> > > > > > > would occur safely. I'm not sure what kind of unexpected
>> > structural
>> > > > > > changes
>> > > > > > > could be detected by such a process; it might just be useful
>> for
>> > > > general
>> > > > > > > validation. The main reasons I prefer (A) is that: 1) (B)
>> > requires
>> > > > > > > additional state meta-data, whereas (A) does not, which is an
>> > increase
>> > > > > > in
>> > > > > > > system complexity and; 2) I believe that (A) actually
>> addresses a
>> > > > > > semantic
>> > > > > > > bug: specifically that StateStores are tightly coupled to
>> Tasks,
>> > which
>> > > > > > is
>> > > > > > > unnecessary. Reducing this coupling would add no complexity,
>> and
>> > > > > > > potentially simplify other processes in the future.
>> > > > > > >
>> > > > > > > Regards,
>> > > > > > >
>> > > > > > > Nick
>> > > > > > >
>> > > > > > > On Sat, 5 Feb 2022 at 17:19, John Roesler <
>> vvcep...@apache.org>
>> > > > wrote:
>> > > > > > >
>> > > > > > > > Hello all,
>> > > > > > > >
>> > > > > > > > Thanks for the KIP, Nick!
>> > > > > > > >
>> > > > > > > > Based on this conversation, I think I might have misread the
>> > > > > > > > KIP, but it looks like Nick is just proposing a small fix to
>> > > > > > > > the existing compatability mechanism.
>> > > > > > > >
>> > > > > > > > Although we tell people to avoid changing topologies on the
>> > > > > > > > fly in general, we also tell them that, if they name all the
>> > > > > > > > persistent resources (stores and repartition nodes), then
>> > > > > > > > they can change the topologies without breaking anything
>> > > > > > > > (provided the change itself is logically sound).
>> > > > > > > >
>> > > > > > > > It seems like this KIP is just pointing out a flaw in that
>> > > > > > > > mechanism, that the (named) stores are kept inside the task
>> > > > > > > > directories, so if some change renumbers the tasks, Streams
>> > > > > > > > won't be able to find the local store files anymore. IIUC,
>> > > > > > > > the changelog topic will still be fine, so Streams would
>> > > > > > > > just allocate a new state directory in the new task name and
>> > > > > > > > restore the changelog into it.
>> > > > > > > >
>> > > > > > > > So, I think all this KIP is after is a way to preserve the
>> > > > > > > > local state files of a named store in the face of task
>> > > > > > > > renumbering. That's not to say that there's not some overlap
>> > > > > > > > with the NamedTopologies work, or that there's no value in
>> > > > > > > > being able to automatically reuse unnamed stores. But it
>> > > > > > > > probably makes sense to let Nick fix this one specific
>> > > > > > > > problem instead of coupling it to other large-scale
>> > > > > > > > engineering projects.
>> > > > > > > >
>> > > > > > > > Regarding the KIP itself:
>> > > > > > > >
>> > > > > > > > (A) is quite clean, but it does make it more challenging to
>> > > > > > > > clean up state when tasks migrate to other nodes. If that's
>> > > > > > > > the only problem, then I agree this is probably the best
>> > > > > > > > solution.
>> > > > > > > >
>> > > > > > > > (B) also makes a lot of sense to me, and I actually don't
>> > > > > > > > think it's a hack. It might also be useful for detecting
>> > > > > > > > when a topology has changed unexpectedly, for example. On
>> > > > > > > > the other hand, to safely move a state directory from one
>> > > > > > > > task directory to the other, we have to be sure no other
>> > > > > > > > thread is using either directory. To do that, we could
>> > > > > > > > either perform the operation in `KafkaStreams.start()`
>> > > > > > > > before any threads are started (we already know the topology
>> > > > > > > > at this point), or we can try to grab the directory locks on
>> > > > > > > > both tasks (but that sounds like a recipe for deadlock).
>> > > > > > > >
>> > > > > > > > In a nutshell, I'm supportive of this KIP, and I'd sugest we
>> > > > > > > > do a little more discovery on the implications of dropping
>> > > > > > > > the task level of the directory hierarchy before committing
>> > > > > > > > to A. And/or be a little more specific about how we can
>> > > > > > > > safely move state directories around before committing to B.
>> > > > > > > >
>> > > > > > > > Thanks again!
>> > > > > > > > -John
>> > > > > > > >
>> > > > > > > > On Fri, 2022-02-04 at 11:09 -0800, Guozhang Wang wrote:
>> > > > > > > > > Hi folks,
>> > > > > > > > >
>> > > > > > > > > I think the NamedTopology work would help with the
>> > convenience of
>> > > > > > the
>> > > > > > > > > solution for this KIP, but I feel it is not by itself the
>> > solution
>> > > > > > here.
>> > > > > > > > If
>> > > > > > > > > I'm not mistaken, the scope of this KIP is trying to
>> tackle
>> > that,
>> > > > > > > > *assuming
>> > > > > > > > > the developer already knows* a new topology or part of the
>> > > > topology
>> > > > > > e.g.
>> > > > > > > > > like a state store of the topology does not change, then
>> how
>> > to
>> > > > > > > > effectively
>> > > > > > > > > be able to reuse that part of the topology. Today it is
>> very
>> > hard
>> > > > to
>> > > > > > > > reuse
>> > > > > > > > > part (say a state store, an internal topic) of a previous
>> > > > topology's
>> > > > > > > > > persistent state because:
>> > > > > > > > >
>> > > > > > > > > 1) the names of those persistent states are prefixed by
>> the
>> > > > > > application
>> > > > > > > > id.
>> > > > > > > > > 2) the names of those persistent states are suffixed by
>> the
>> > index,
>> > > > > > which
>> > > > > > > > > reflects the structure of the topology.
>> > > > > > > > > 3) the dir path of the persistent states are "prefixed" by
>> > the
>> > > > task
>> > > > > > id,
>> > > > > > > > > which is hence dependent on the sub-topology id.
>> > > > > > > > >
>> > > > > > > > > My quick thoughts are that 1) is easy to go around as
>> long as
>> > > > users
>> > > > > > reuse
>> > > > > > > > > the same appId, 3) can be tackled with the help of the
>> named
>> > > > > > topology but
>> > > > > > > > > each named topology can still be composed of multiple
>> > > > > > sub-topologies so
>> > > > > > > > > extra work is still needed to align the sub-topology ids,
>> > but we
>> > > > > > still
>> > > > > > > > need
>> > > > > > > > > something to tackle 2) here, which I was pondering between
>> > those
>> > > > > > options
>> > > > > > > > > and at the moment leaning towards option 2).
>> > > > > > > > >
>> > > > > > > > > Does that make sense to you?
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > > > On Fri, Feb 4, 2022 at 4:38 AM Nick Telford <
>> > > > nick.telf...@gmail.com
>> > > > > > >
>> > > > > > > > wrote:
>> > > > > > > > >
>> > > > > > > > > > Hi Guozhang, Sophie,
>> > > > > > > > > >
>> > > > > > > > > > Thanks for both taking the time to review my proposal.
>> > > > > > > > > >
>> > > > > > > > > > I did actually see the NamedTopology classes, and noted
>> > that
>> > > > they
>> > > > > > were
>> > > > > > > > > > internal. I didn't realise they are part of an intended
>> > solution
>> > > > > > to
>> > > > > > > > this
>> > > > > > > > > > problem, that's very interesting. I'm going to try to
>> find
>> > some
>> > > > > > time to
>> > > > > > > > > > take a look at your experimental work so I can
>> understand
>> > it a
>> > > > bit
>> > > > > > > > better.
>> > > > > > > > > >
>> > > > > > > > > > From your description, it sounds like the NamedTopology
>> > approach
>> > > > > > should
>> > > > > > > > > > enable users to solve this problem at the level that
>> they
>> > wish
>> > > > > > to. My
>> > > > > > > > > > concern is that users will need to be explicit about how
>> > their
>> > > > > > > > Topology is
>> > > > > > > > > > structured, and will need to know in advance how their
>> > > > Topologies
>> > > > > > might
>> > > > > > > > > > evolve in the future in order to correctly break them
>> up by
>> > > > name.
>> > > > > > For
>> > > > > > > > > > example, if a user mistakenly assumes one particular
>> > structure
>> > > > for
>> > > > > > > > their
>> > > > > > > > > > application, but later makes changes that implicitly
>> cause
>> > an
>> > > > > > existing
>> > > > > > > > > > NamedTopology to have its internal Subtopologies
>> > re-ordered, the
>> > > > > > user
>> > > > > > > > will
>> > > > > > > > > > need to clear all the local state for that
>> NamedTopology,
>> > at
>> > > > > > least.
>> > > > > > > > > >
>> > > > > > > > > > Unless I'm mistaken, StateStores are defined exclusively
>> > by the
>> > > > > > data in
>> > > > > > > > > > their changelogs. Even if you make changes to a Topology
>> > that
>> > > > > > requires
>> > > > > > > > > > clearing locally materialized state, the changelogs
>> aren't
>> > > > > > reset[1],
>> > > > > > > > so the
>> > > > > > > > > > newly rebuilt state is materialized from the
>> pre-existing
>> > > > values.
>> > > > > > Even
>> > > > > > > > if
>> > > > > > > > > > changes are made to the Subtopology that writes to the
>> > > > > > StateStore, the
>> > > > > > > > > > existing data in the changelog hasn't changed. The
>> > contents of
>> > > > the
>> > > > > > > > > > StateStore evolves. This is exactly the same as a
>> > traditional
>> > > > > > database
>> > > > > > > > > > table, where a client may evolve its behaviour to subtly
>> > change
>> > > > > > the
>> > > > > > > > > > semantics of the data written to the table, without
>> > deleting the
>> > > > > > > > existing
>> > > > > > > > > > data.
>> > > > > > > > > >
>> > > > > > > > > > If a user makes a change that means a different
>> Subtopology
>> > > > reads
>> > > > > > from
>> > > > > > > > the
>> > > > > > > > > > StateStore, the semantics of, and the data in the store
>> > itself
>> > > > > > hasn't
>> > > > > > > > > > actually changed. The only reason we need to reset this
>> > local
>> > > > > > state at
>> > > > > > > > all
>> > > > > > > > > > is due to the conflict on-disk caused by the change in
>> > > > Subtopology
>> > > > > > > > ordinal.
>> > > > > > > > > > If local StateStore data was decoupled from Tasks, this
>> > conflict
>> > > > > > would
>> > > > > > > > > > disappear, and the application would work as expected.
>> > > > > > > > > >
>> > > > > > > > > > A Subtopology is defined by all connected topics,
>> including
>> > > > > > changelogs,
>> > > > > > > > > > repartition topics, source topics and sink topics.
>> Whereas
>> > a
>> > > > > > > > StateStore is
>> > > > > > > > > > defined exclusively by its changelog. So why do we
>> tightly
>> > > > couple
>> > > > > > > > > > StateStore to Subtopology? This is my central argument
>> for
>> > > > option
>> > > > > > A
>> > > > > > > > that I
>> > > > > > > > > > outlined in the KIP, and I would like to discuss it
>> > further,
>> > > > even
>> > > > > > if
>> > > > > > > > only
>> > > > > > > > > > to educate myself on why it's not possible :-D
>> > > > > > > > > >
>> > > > > > > > > > I still think the NamedTopology work is valuable, but
>> more
>> > as a
>> > > > > > means
>> > > > > > > > to
>> > > > > > > > > > better organize large applications.
>> > > > > > > > > >
>> > > > > > > > > > Regards,
>> > > > > > > > > > Nick
>> > > > > > > > > >
>> > > > > > > > > > 1: The only exception to this I can think of is when a
>> user
>> > > > > > decides to
>> > > > > > > > > > change the format (Serdes) or semantics of the data in
>> the
>> > > > store,
>> > > > > > in
>> > > > > > > > which
>> > > > > > > > > > case they would need to do a full reset by also clearing
>> > the
>> > > > > > changelog
>> > > > > > > > > > topic for that store. Realistically, users that wish to
>> do
>> > this
>> > > > > > would
>> > > > > > > > be
>> > > > > > > > > > better off just creating a new store and deleting the
>> old
>> > one,
>> > > > so
>> > > > > > I
>> > > > > > > > don't
>> > > > > > > > > > think it's a case worth optimizing for.
>> > > > > > > > > >
>> > > > > > > > > > On Fri, 4 Feb 2022 at 08:22, Sophie Blee-Goldman
>> > > > > > > > > > <sop...@confluent.io.invalid> wrote:
>> > > > > > > > > >
>> > > > > > > > > > > Hey Nick,
>> > > > > > > > > > >
>> > > > > > > > > > > thanks for the KIP, this is definitely a much-needed
>> > feature.
>> > > > > > I've
>> > > > > > > > > > actually
>> > > > > > > > > > > been working on
>> > > > > > > > > > > a somewhat similar feature for a while now and have a
>> > good
>> > > > > > chunk of
>> > > > > > > > the
>> > > > > > > > > > > implementation
>> > > > > > > > > > > completed -- but so far it's only exposed via internal
>> > APIs
>> > > > and
>> > > > > > > > hasn't
>> > > > > > > > > > been
>> > > > > > > > > > > brought to a KIP
>> > > > > > > > > > > yet, as it's a fairly large and complex project and I
>> > wanted
>> > > > to
>> > > > > > get
>> > > > > > > > all
>> > > > > > > > > > the
>> > > > > > > > > > > details hashed out
>> > > > > > > > > > > before settling on a public API.
>> > > > > > > > > > >
>> > > > > > > > > > > For some sense of how complicated it's been, you can
>> > check out
>> > > > > > the
>> > > > > > > > JIRA
>> > > > > > > > > > > ticket we've been
>> > > > > > > > > > > filing PRs under -- there are already 25 PRs to the
>> > feature.
>> > > > See
>> > > > > > > > > > > KAFKA-12648
>> > > > > > > > > > > <https://issues.apache.org/jira/browse/KAFKA-12648>.
>> > You can
>> > > > > > check
>> > > > > > > > > > > out the new KafkaStreamsNamedTopologyWrapper class to
>> > see what
>> > > > > > the
>> > > > > > > > > > current
>> > > > > > > > > > > API looks like
>> > > > > > > > > > > -- I recommend taking a look to see if this might
>> cover
>> > some
>> > > > or
>> > > > > > all
>> > > > > > > > of
>> > > > > > > > > > the
>> > > > > > > > > > > things you wanted
>> > > > > > > > > > > this KIP to do.
>> > > > > > > > > > >
>> > > > > > > > > > > For a high-level sketch, my work introduces the
>> concept
>> > of a
>> > > > > > > > > > > "NamedTopology" (which will be
>> > > > > > > > > > > renamed to "ModularTopology" in the future, but is
>> still
>> > > > > > referred to
>> > > > > > > > as
>> > > > > > > > > > > "named" in the codebase
>> > > > > > > > > > > so I'll keep using it for now) . Each KafkaStreams app
>> > can
>> > > > > > execute
>> > > > > > > > > > multiple
>> > > > > > > > > > > named topologies,
>> > > > > > > > > > > which are just regular topologies that are given a
>> unique
>> > > > name.
>> > > > > > The
>> > > > > > > > > > > essential feature of a
>> > > > > > > > > > > named topology is that it can be dynamically added or
>> > removed
>> > > > > > without
>> > > > > > > > > > even
>> > > > > > > > > > > stopping the
>> > > > > > > > > > > application, much less resetting it. Technically a
>> > > > > > NamedTopology can
>> > > > > > > > be
>> > > > > > > > > > > composed or one
>> > > > > > > > > > > or more subtopologies, but if you want to be able to
>> > update
>> > > > the
>> > > > > > > > > > application
>> > > > > > > > > > > at a subtopology
>> > > > > > > > > > > level you can just name each  subtopology.
>> > > > > > > > > > >
>> > > > > > > > > > > So I believe the feature you want is actually already
>> > > > > > implemented,
>> > > > > > > > for
>> > > > > > > > > > the
>> > > > > > > > > > > most part -- it's currently
>> > > > > > > > > > > missing a few things that I just didn't bother to
>> > implement
>> > > > yet
>> > > > > > since
>> > > > > > > > > > I've
>> > > > > > > > > > > been focused
>> > > > > > > > > > > on getting a working, minimal POC that I could use for
>> > > > testing.
>> > > > > > (For
>> > > > > > > > > > > example it doesn't yet
>> > > > > > > > > > > support global state stores) But beyond that, the only
>> > > > remaining
>> > > > > > > > work to
>> > > > > > > > > > > make this feature
>> > > > > > > > > > > available is to settle on the APIs, get a KIP passed,
>> and
>> > > > > > implement
>> > > > > > > > said
>> > > > > > > > > > > APIs.
>> > > > > > > > > > >
>> > > > > > > > > > > Would you be interested in helping out with the
>> > NamedTopology
>> > > > > > work
>> > > > > > > > so we
>> > > > > > > > > > > can turn it into a
>> > > > > > > > > > > a full-fledged public feature? I'm happy to let you
>> take
>> > the
>> > > > > > lead on
>> > > > > > > > the
>> > > > > > > > > > > KIP, maybe by adapting
>> > > > > > > > > > > this one if you think it makes sense to do so. The
>> > > > NamedTopology
>> > > > > > > > feature
>> > > > > > > > > > is
>> > > > > > > > > > > somewhat larger
>> > > > > > > > > > > in scope than strictly necessary for your purposes,
>> > however,
>> > > > so
>> > > > > > you
>> > > > > > > > could
>> > > > > > > > > > > take on just a part
>> > > > > > > > > > > of it and leave anything beyond that for me to do as
>> > followup.
>> > > > > > > > > > >
>> > > > > > > > > > > By the way: one advantage of the NamedTopology feature
>> > is that
>> > > > > > we
>> > > > > > > > don't
>> > > > > > > > > > > have to worry about
>> > > > > > > > > > > any compatibility issues or upgrade/migration path --
>> > it's
>> > > > > > opt-in by
>> > > > > > > > > > > definition. (Of course we would
>> > > > > > > > > > > recommend using it to all users, like we do with named
>> > > > > > operators)
>> > > > > > > > > > >
>> > > > > > > > > > > Let me know what you think and how you want to proceed
>> > from
>> > > > > > here -- I
>> > > > > > > > > > > wouldn't want you to
>> > > > > > > > > > > spend time re-implementing more or less the same
>> thing,
>> > but I
>> > > > > > most
>> > > > > > > > likely
>> > > > > > > > > > > wasn't going to find time
>> > > > > > > > > > > to put out a KIP for the NamedTopology feature in the
>> > near
>> > > > > > future.
>> > > > > > > > If you
>> > > > > > > > > > > would be able to help
>> > > > > > > > > > > drive this to completion, we'd each have significantly
>> > less
>> > > > > > work to
>> > > > > > > > do to
>> > > > > > > > > > > achieve our goals :)
>> > > > > > > > > > >
>> > > > > > > > > > > Cheers,
>> > > > > > > > > > > Sophie
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > On Thu, Feb 3, 2022 at 6:12 PM Guozhang Wang <
>> > > > > > wangg...@gmail.com>
>> > > > > > > > wrote:
>> > > > > > > > > > >
>> > > > > > > > > > > > Hello Nick,
>> > > > > > > > > > > >
>> > > > > > > > > > > > Thanks for bringing this up and for the proposed
>> > options. I
>> > > > > > read
>> > > > > > > > though
>> > > > > > > > > > > > your writeup and here are some of my thoughts:
>> > > > > > > > > > > >
>> > > > > > > > > > > > 1) When changing the topology of Kafka Streams, the
>> > > > developer
>> > > > > > need
>> > > > > > > > to
>> > > > > > > > > > > first
>> > > > > > > > > > > > decide if the whole topology's persisted state
>> > (including
>> > > > > > both the
>> > > > > > > > > > state
>> > > > > > > > > > > > store as well as its changelogs, and the repartition
>> > topics,
>> > > > > > and
>> > > > > > > > the
>> > > > > > > > > > > > source/sink external topics) or part of the
>> persisted
>> > state
>> > > > > > can be
>> > > > > > > > > > > reused.
>> > > > > > > > > > > > This involves two types of changes:
>> > > > > > > > > > > >
>> > > > > > > > > > > > a) structural change of the topology, such like a
>> new
>> > > > > > processor
>> > > > > > > > node is
>> > > > > > > > > > > > added/removed, a new intermediate topic is
>> > added/removed
>> > > > etc.
>> > > > > > > > > > > > b) semantic change of a processor, such as a
>> numerical
>> > > > filter
>> > > > > > node
>> > > > > > > > > > > changing
>> > > > > > > > > > > > its filter threshold etc.
>> > > > > > > > > > > >
>> > > > > > > > > > > > Today both of them are more or less determined by
>> > developers
>> > > > > > > > manually.
>> > > > > > > > > > > > However, though automatically determining on changes
>> > of type
>> > > > > > b) is
>> > > > > > > > hard
>> > > > > > > > > > > if
>> > > > > > > > > > > > not possible, automatic determining on the type of
>> a)
>> > is
>> > > > > > doable
>> > > > > > > > since
>> > > > > > > > > > > it's
>> > > > > > > > > > > > depend on just the information of:
>> > > > > > > > > > > > * number of sub-topologies, and their orders (i.e.
>> > sequence
>> > > > > > of ids)
>> > > > > > > > > > > > * used state stores and changelog topics within the
>> > > > > > sub-topology
>> > > > > > > > > > > > * used repartition topics
>> > > > > > > > > > > > * etc
>> > > > > > > > > > > >
>> > > > > > > > > > > > So let's assume in the long run we can indeed
>> > automatically
>> > > > > > > > determine
>> > > > > > > > > > if
>> > > > > > > > > > > a
>> > > > > > > > > > > > topology or part of it (a sub-topology) is
>> > structurally the
>> > > > > > same,
>> > > > > > > > what
>> > > > > > > > > > we
>> > > > > > > > > > > > can do is to "translate" the old persisted state
>> names
>> > to
>> > > > the
>> > > > > > > > > > > > new, isomorphic topology's names. Following this
>> > thought I'm
>> > > > > > > > leaning
>> > > > > > > > > > > > towards the direction of option B in your proposal.
>> But
>> > > > since
>> > > > > > in
>> > > > > > > > this
>> > > > > > > > > > KIP
>> > > > > > > > > > > > automatic determining structural changes are out of
>> the
>> > > > > > scope, I
>> > > > > > > > feel
>> > > > > > > > > > we
>> > > > > > > > > > > > can consider adding some sort of a "migration tool"
>> > from an
>> > > > > > old
>> > > > > > > > > > topology
>> > > > > > > > > > > to
>> > > > > > > > > > > > new topology by renaming all the persisted states
>> > (store
>> > > > dirs
>> > > > > > and
>> > > > > > > > > > names,
>> > > > > > > > > > > > topic names).
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > > Guozhang
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > > On Tue, Jan 25, 2022 at 9:10 AM Nick Telford <
>> > > > > > > > nick.telf...@gmail.com>
>> > > > > > > > > > > > wrote:
>> > > > > > > > > > > >
>> > > > > > > > > > > > > Hi everyone,
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > I'd like to start a discussion on Kafka Streams
>> > KIP-816 (
>> > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > >
>> > > > > >
>> > > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-816%3A+Topology+changes+without+local+state+reset
>> > > > > > > > > > > > > )
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > This KIP outlines 3 possible solutions to the
>> > problem,
>> > > > and I
>> > > > > > > > plan to
>> > > > > > > > > > > > > whittle this down to a definitive solution based
>> on
>> > this
>> > > > > > > > discussion.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Of the 3 proposed solutions:
>> > > > > > > > > > > > > * 'A' is probably the "correct" solution, but is
>> also
>> > > > quite
>> > > > > > a
>> > > > > > > > > > > significant
>> > > > > > > > > > > > > change.
>> > > > > > > > > > > > > * 'B' is the least invasive, but most "hacky"
>> > solution.
>> > > > > > > > > > > > > * 'C' requires a change to the wire protocol and
>> will
>> > > > > > likely have
>> > > > > > > > > > > > > unintended consequences. C is also the least
>> complete
>> > > > > > solution,
>> > > > > > > > and
>> > > > > > > > > > > will
>> > > > > > > > > > > > > need significant additional work to make it work.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Please let me know if the Motivation and
>> Background
>> > > > > > sections need
>> > > > > > > > > > more
>> > > > > > > > > > > > > clarity.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Regards,
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Nick Telford
>> > > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > > --
>> > > > > > > > > > > > -- Guozhang
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > >
>> > > > > >
>> > > > >
>> > > > > --
>> > > > > -- Guozhang
>> > > > >
>> > > >
>> > > >
>> > > > --
>> > > > -- Guozhang
>> >
>> >
>>
>> --
>> -- Guozhang
>>
>

Reply via email to