I've updated the KIP with the following:

   - Deprecation of StateStore#managesOffsets
   - Change StateStore#commit to throw UnsupportedOperationException when
   called from a Processor (via AbstractReadWriteDecorator)
   - Updated consumer rebalance lag computation strategy
   
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets#KIP1035:StateStoremanagedchangelogoffsets-ConsumerRebalanceMetadata>
   based on our Meet discussion
      - I've added a bit more detail here than we discussed, in particular
      around how we handle the offsets for tasks assigned to our local
instance,
      and how we handle offsets when Tasks are closed/revoked.
   - Improved downgrade behaviour
      - Note: users that don't downgrade with upgrade.from will still get
      the wipe-and-restore behaviour by-default.

I believe this covers all the outstanding changes that were requested.
Please let me know if I've missed anything or you think further changes are
needed.

Regards,
Nick

On Wed, 29 May 2024 at 09:28, Nick Telford <nick.telf...@gmail.com> wrote:

> Hi everyone,
>
> Sorry I haven't got around to updating the KIP yet. Now that I've wrapped
> up KIP-989, I'm going to be working on 1035 starting today.
>
> I'll update the KIP first, and then call a vote.
>
> Regards,
> Nick
>
> On Wed, 29 May 2024 at 07:25, Bruno Cadonna <cado...@apache.org> wrote:
>
>> Totally agree on moving forward and starting the VOTE!
>>
>> However, the KIP should be updated with the new info before starting the
>> VOTE.
>>
>> Best,
>> Bruno
>>
>> On 5/29/24 2:36 AM, Matthias J. Sax wrote:
>> > Sounds like a good plan. -- I think we are still wrapping up 3.8
>> > release, but would also like to move forward with with one.
>> >
>> > Should we start a VOTE?
>> >
>> > For merging PRs we need to wait after code freeze, and 3.8 branch was
>> > but. But we could start reviewing PRs before this already.
>> >
>> >
>> > -Matthias
>> >
>> > On 5/17/24 3:05 AM, Nick Telford wrote:
>> >> Hi everyone,
>> >>
>> >> As discussed on the Zoom call, we're going to handle rebalance
>> >> meta-data by:
>> >>
>> >> - On start-up, Streams will open each store and read its changelog
>> >> offsets
>> >> into an in-memory cache. This cache will be shared among all
>> >> StreamThreads.
>> >> - On rebalance, the cache will be consulted for Task offsets for any
>> Task
>> >> that is not active on any instance-local StreamThreads. If the Task is
>> >> active on *any* instance-local StreamThread, we will report the Task
>> >> lag as
>> >> "up to date" (i.e. -1), because we know that the local state is
>> currently
>> >> up-to-date.
>> >>
>> >> We will avoid caching offsets across restarts in the legacy
>> ".checkpoint"
>> >> file, so that we can eliminate the logic for handling this class. If
>> >> performance of opening/closing many state stores is poor, we can
>> >> parallelise it by forking off a thread for each Task directory when
>> >> reading
>> >> the offsets.
>> >>
>> >> I'll update the KIP later today to reflect this design, but I will try
>> to
>> >> keep it high-level, so that the exact implementation can vary.
>> >>
>> >> Regards,
>> >>
>> >> Nick
>> >>
>> >> On Thu, 16 May 2024 at 03:12, Sophie Blee-Goldman <
>> sop...@responsive.dev>
>> >> wrote:
>> >>
>> >>> 103: I like the idea of immediately deprecating #managesOffsets and
>> >>> aiming
>> >>> to make offset management mandatory in the long run. I assume we
>> >>> would also
>> >>> log a warning for any custom stores that return "false" from this
>> >>> method to
>> >>> encourage custom store implementations to start doing so? My only
>> >>> question/concern is that if we want folks to start managing their own
>> >>> offsets then we should make this transition easy for them, perhaps by
>> >>> exposing some public utility APIs for things that are currently
>> >>> handled by
>> >>> Kafka Streams such as reading/writing checkpoint files. Maybe it
>> >>> would be
>> >>> useful to include a small example in the KIP of what it would
>> >>> actually mean
>> >>> to "manage your own offsets" -- I know (all too well) that plugging in
>> >>> custom storage implementations is not easy and most people who do
>> >>> this are
>> >>> probably fairly advanced users, but offset management will be a
>> >>> totally new
>> >>> ballgame to most people people and this kind of feels like throwing
>> them
>> >>> off the deep end. We should at least provide a lifejacket via some
>> >>> kind of
>> >>> utility API and/or example
>> >>>
>> >>> 200. There's been a lot of back and forth on the rebalance
>> metadata/task
>> >>> lag computation question, so forgive me if I missed any part of this,
>> >>> but I
>> >>> think we've landed at the right idea here. To summarize: the "tl;dr"
>> >>> explanation is that we'll write the checkpoint file only on close and
>> >>> will
>> >>> account for hard-crash scenarios by opening up the stores on startup
>> and
>> >>> writing a checkpoint file for any missing tasks. Does that sound about
>> >>> right?
>> >>>
>> >>> A few clarifications:
>> >>> I think we're all more or less on the same page here but just to be
>> >>> absolutely clear, the task lags for each task directory found on disk
>> >>> will
>> >>> be reported by only one of the StreamThreads, and each StreamThread
>> will
>> >>> report lags only for tasks that it already owns or are not assigned
>> >>> to any
>> >>> other StreamThread in the client. In other words, we only need to get
>> >>> the
>> >>> task lag for completely unassigned/unlocked tasks, which means if
>> >>> there is
>> >>> a checkpoint file at all then it must be up-to-date, because there is
>> no
>> >>> other StreamThread actively writing to that state store (if so then
>> only
>> >>> that StreamThread would report lag for that particular task).
>> >>>
>> >>> This still leaves the "no checkpoint at all" case which as previously
>> >>> mentioned can occur after a  hard-crash. Luckily we only have to worry
>> >>> about this once, after starting up again following said hard crash.
>> >>> We can
>> >>> simply open up each of the state stores before ever joining the
>> >>> group, get
>> >>> the offsets from rocksdb, and write them to a new checkpoint file.
>> After
>> >>> that, we can depend on the checkpoints written at close and won't
>> >>> have to
>> >>> open up any stores that aren't already assigned for the reasons laid
>> >>> out in
>> >>> the paragraph above.
>> >>>
>> >>> As for the specific mechanism and which thread-does-what, since there
>> >>> were
>> >>> some questions, this is how I'm imagining the process:
>> >>>
>> >>>     1.   The general idea is that we simply go through each task
>> >>> directories
>> >>>     with state but no checkpoint file and open the StateStore, call
>> >>>     #committedOffset, and then write it to the checkpoint file. We
>> >>> can then
>> >>>     close these stores and let things proceed as normal.
>> >>>     2.  This only has to happen once, during startup, but we have two
>> >>>     options:
>> >>>        1. Do this from KafkaStreams#start, ie before we even create
>> the
>> >>>        StreamThreads
>> >>>        2.  Do this from StreamThread#start, following a similar
>> >>> lock-based
>> >>>        approach to the one used #computeTaskLags, where each
>> >>> StreamThread
>> >>> just
>> >>>        makes a pass over the task directories on disk and attempts to
>> >>> lock
>> >>> them
>> >>>        one by one. If they obtain the lock, check whether there is
>> state
>> >>> but no
>> >>>        checkpoint, and write the checkpoint if needed. If it can't
>> grab
>> >>> the lock,
>> >>>        then we know one of the other StreamThreads must be handling
>> the
>> >>> checkpoint
>> >>>        file for that task directory, and we can move on.
>> >>>
>> >>> Don't really feel too strongly about which approach is best,  doing
>> >>> it in
>> >>> KafkaStreams#start is certainly the most simple while doing it in the
>> >>> StreamThread's startup is more efficient. If we're worried about
>> >>> adding too
>> >>> much weight to KafkaStreams#start then the 2nd option is probably
>> best,
>> >>> though slightly more complicated.
>> >>>
>> >>> Thoughts?
>> >>>
>> >>> On Tue, May 14, 2024 at 10:02 AM Nick Telford <nick.telf...@gmail.com
>> >
>> >>> wrote:
>> >>>
>> >>>> Hi everyone,
>> >>>>
>> >>>> Sorry for the delay in replying. I've finally now got some time to
>> work
>> >>> on
>> >>>> this.
>> >>>>
>> >>>> Addressing Matthias's comments:
>> >>>>
>> >>>> 100.
>> >>>> Good point. As Bruno mentioned, there's already
>> >>> AbstractReadWriteDecorator
>> >>>> which we could leverage to provide that protection. I'll add details
>> on
>> >>>> this to the KIP.
>> >>>>
>> >>>> 101,102.
>> >>>> It looks like these points have already been addressed by Bruno. Let
>> me
>> >>>> know if anything here is still unclear or you feel needs to be
>> detailed
>> >>>> more in the KIP.
>> >>>>
>> >>>> 103.
>> >>>> I'm in favour of anything that gets the old code removed sooner, but
>> >>>> wouldn't deprecating an API that we expect (some) users to implement
>> >>> cause
>> >>>> problems?
>> >>>> I'm thinking about implementers of custom StateStores, as they may be
>> >>>> confused by managesOffsets() being deprecated, especially since they
>> >>> would
>> >>>> have to mark their implementation as @Deprecated in order to avoid
>> >>> compile
>> >>>> warnings.
>> >>>> If deprecating an API *while it's still expected to be implemented*
>> is
>> >>>> something that's generally done in the project, then I'm happy to do
>> so
>> >>>> here.
>> >>>>
>> >>>> 104.
>> >>>> I think this is technically possible, but at the cost of considerable
>> >>>> additional code to maintain. Would we ever have a pathway to remove
>> >>>> this
>> >>>> downgrade code in the future?
>> >>>>
>> >>>>
>> >>>> Regarding rebalance metadata:
>> >>>> Opening all stores on start-up to read and cache their offsets is an
>> >>>> interesting idea, especially if we can avoid re-opening the stores
>> once
>> >>> the
>> >>>> Tasks have been assigned. Scalability shouldn't be too much of a
>> >>>> problem,
>> >>>> because typically users have a fairly short state.cleanup.delay, so
>> the
>> >>>> number of on-disk Task directories should rarely exceed the number of
>> >>> Tasks
>> >>>> previously assigned to that instance.
>> >>>> An advantage of this approach is that it would also simplify
>> StateStore
>> >>>> implementations, as they would only need to guarantee that committed
>> >>>> offsets are available when the store is open.
>> >>>>
>> >>>> I'll investigate this approach this week for feasibility and report
>> >>>> back.
>> >>>>
>> >>>> I think that covers all the outstanding feedback, unless I missed
>> >>> anything?
>> >>>>
>> >>>> Regards,
>> >>>> Nick
>> >>>>
>> >>>> On Mon, 6 May 2024 at 14:06, Bruno Cadonna <cado...@apache.org>
>> wrote:
>> >>>>
>> >>>>> Hi Matthias,
>> >>>>>
>> >>>>> I see what you mean.
>> >>>>>
>> >>>>> To sum up:
>> >>>>>
>> >>>>> With this KIP the .checkpoint file is written when the store closes.
>> >>>>> That is when:
>> >>>>> 1. a task moves away from Kafka Streams client
>> >>>>> 2. Kafka Streams client shuts down
>> >>>>>
>> >>>>> A Kafka Streams client needs the information in the .checkpoint file
>> >>>>> 1. on startup because it does not have any open stores yet.
>> >>>>> 2. during rebalances for non-empty state directories of tasks that
>> are
>> >>>>> not assigned to the Kafka Streams client.
>> >>>>>
>> >>>>> With hard crashes, i.e., when the Streams client is not able to
>> close
>> >>>>> its state stores and write the .checkpoint file, the .checkpoint
>> file
>> >>>>> might be quite stale. That influences the next rebalance after
>> >>>>> failover
>> >>>>> negatively.
>> >>>>>
>> >>>>>
>> >>>>> My conclusion is that Kafka Streams either needs to open the state
>> >>>>> stores at start up or we write the checkpoint file more often.
>> >>>>>
>> >>>>> Writing the .checkpoint file during processing more often without
>> >>>>> controlling the flush to disk would work. However, Kafka Streams
>> would
>> >>>>> checkpoint offsets that are not yet persisted on disk by the state
>> >>>>> store. That is with a hard crash the offsets in the .checkpoint file
>> >>>>> might be larger than the offsets checkpointed in the state store.
>> That
>> >>>>> might not be a problem if Kafka Streams uses the .checkpoint file
>> only
>> >>>>> to compute the task lag. The downside is that it makes the managing
>> of
>> >>>>> checkpoints more complex because now we have to maintain two
>> >>>>> checkpoints: one for restoration and one for computing the task lag.
>> >>>>> I think we should explore the option where Kafka Streams opens the
>> >>> state
>> >>>>> stores at start up to get the offsets.
>> >>>>>
>> >>>>> I also checked when Kafka Streams needs the checkpointed offsets to
>> >>>>> compute the task lag during a rebalance. Turns out Kafka Streams
>> needs
>> >>>>> them before sending the join request. Now, I am wondering if opening
>> >>> the
>> >>>>> state stores of unassigned tasks whose state directory exists
>> locally
>> >>> is
>> >>>>> actually such a big issue due to the expected higher latency since
>> it
>> >>>>> happens actually before the Kafka Streams client joins the
>> rebalance.
>> >>>>>
>> >>>>> Best,
>> >>>>> Bruno
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>>
>> >>>>> On 5/4/24 12:05 AM, Matthias J. Sax wrote:
>> >>>>>> That's good questions... I could think of a few approaches, but I
>> >>> admit
>> >>>>>> it might all be a little bit tricky to code up...
>> >>>>>>
>> >>>>>> However if we don't solve this problem, I think this KIP does not
>> >>>> really
>> >>>>>> solve the core issue we are facing? In the end, if we rely on the
>> >>>>>> `.checkpoint` file to compute a task assignment, but the
>> >>> `.checkpoint`
>> >>>>>> file can be arbitrary stale after a crash because we only write it
>> >>> on a
>> >>>>>> clean close, there would be still a huge gap that this KIP does not
>> >>>>> close?
>> >>>>>>
>> >>>>>> For the case in which we keep the checkpoint file, this KIP would
>> >>> still
>> >>>>>> help for "soft errors" in which KS can recover, and roll back the
>> >>>> store.
>> >>>>>> A significant win for sure. -- But hard crashes would still be an
>> >>>>>> problem? We might assign tasks to "wrong" instance, ie, which are
>> not
>> >>>>>> most up to date, as the checkpoint information could be very
>> >>> outdated?
>> >>>>>> Would we end up with a half-baked solution? Would this be good
>> enough
>> >>>> to
>> >>>>>> justify the introduced complexity? In the, for soft failures it's
>> >>> still
>> >>>>>> a win. Just want to make sure we understand the limitations and
>> make
>> >>> an
>> >>>>>> educated decision.
>> >>>>>>
>> >>>>>> Or do I miss something?
>> >>>>>>
>> >>>>>>
>> >>>>>> -Matthias
>> >>>>>>
>> >>>>>> On 5/3/24 10:20 AM, Bruno Cadonna wrote:
>> >>>>>>> Hi Matthias,
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> 200:
>> >>>>>>> I like the idea in general. However, it is not clear to me how the
>> >>>>>>> behavior should be with multiple stream threads in the same Kafka
>> >>>>>>> Streams client. What stream thread opens which store? How can a
>> >>> stream
>> >>>>>>> thread pass an open store to another stream thread that got the
>> >>>>>>> corresponding task assigned? How does a stream thread know that a
>> >>> task
>> >>>>>>> was not assigned to any of the stream threads of the Kafka Streams
>> >>>>>>> client? I have the feeling we should just keep the .checkpoint
>> file
>> >>> on
>> >>>>>>> close for now to unblock this KIP and try to find a solution to
>> get
>> >>>>>>> totally rid of it later.
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> Best,
>> >>>>>>> Bruno
>> >>>>>>>
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> On 5/3/24 6:29 PM, Matthias J. Sax wrote:
>> >>>>>>>> 101: Yes, but what I am saying is, that we don't need to flush
>> the
>> >>>>>>>> .position file to disk periodically, but only maintain it in main
>> >>>>>>>> memory, and only write it to disk on close() to preserve it
>> across
>> >>>>>>>> restarts. This way, it would never be ahead, but might only lag?
>> >>> But
>> >>>>>>>> with my better understanding about (102) it might be mood
>> anyway...
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> 102: Thanks for clarifying. Looked into the code now. Makes
>> sense.
>> >>>>>>>> Might be something to be worth calling out explicitly in the KIP
>> >>>>>>>> writeup. -- Now that I realize that the position is tracked
>> inside
>> >>>>>>>> the store (not outside as the changelog offsets) it makes much
>> more
>> >>>>>>>> sense to pull position into RocksDB itself. In the end, it's
>> >>> actually
>> >>>>>>>> a "store implementation" detail how it tracks the position (and
>> >>> kinda
>> >>>>>>>> leaky abstraction currently, that we re-use the checkpoint file
>> >>>>>>>> mechanism to track it and flush to disk).
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> 200: I was thinking about this a little bit more, and maybe it's
>> >>> not
>> >>>>>>>> too bad? When KS starts up, we could upon all stores we find on
>> >>> local
>> >>>>>>>> disk pro-actively, and keep them all open until the first
>> rebalance
>> >>>>>>>> finishes: For tasks we get assigned, we hand in the already
>> opened
>> >>>>>>>> store (this would amortize the cost to open the store before the
>> >>>>>>>> rebalance) and for non-assigned tasks, we know the offset
>> >>> information
>> >>>>>>>> won't change and we could just cache it in-memory for later reuse
>> >>>>>>>> (ie, next rebalance) and close the store to free up resources? --
>> >>>>>>>> Assuming that we would get a large percentage of opened stores
>> >>>>>>>> assigned as tasks anyway, this could work?
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> -Matthias
>> >>>>>>>>
>> >>>>>>>> On 5/3/24 1:29 AM, Bruno Cadonna wrote:
>> >>>>>>>>> Hi Matthias,
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> 101:
>> >>>>>>>>> Let's assume a RocksDB store, but I think the following might be
>> >>>>>>>>> true also for other store implementations. With this KIP, if
>> Kafka
>> >>>>>>>>> Streams commits the offsets, the committed offsets will be
>> stored
>> >>> in
>> >>>>>>>>> an in-memory data structure (i.e. the memtable) and stay there
>> >>> until
>> >>>>>>>>> RocksDB decides that it is time to persist its in-memory data
>> >>>>>>>>> structure. If Kafka Streams writes its position to the .position
>> >>>>>>>>> file during a commit and a crash happens before RocksDB persist
>> >>> the
>> >>>>>>>>> memtable then the position in the .position file is ahead of the
>> >>>>>>>>> persisted offset. If IQ is done between the crash and the state
>> >>>>>>>>> store fully restored the changelog, the position might tell IQ
>> >>> that
>> >>>>>>>>> the state store is more up-to-date than it actually is.
>> >>>>>>>>> In contrast, if Kafka Streams handles persisting positions the
>> >>> same
>> >>>>>>>>> as persisting offset, the position should always be consistent
>> >>> with
>> >>>>>>>>> the offset, because they are persisted together.
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> 102:
>> >>>>>>>>> I am confused about your confusion which tells me that we are
>> >>>>>>>>> talking about two different things.
>> >>>>>>>>> You asked
>> >>>>>>>>>
>> >>>>>>>>> "Do you intent to add this information [i.e. position] to the
>> map
>> >>>>>>>>> passed via commit(final Map<TopicPartition, Long>
>> >>>> changelogOffsets)?"
>> >>>>>>>>>
>> >>>>>>>>> and with what I wrote I meant that we do not need to pass the
>> >>>>>>>>> position into the implementation of the StateStore interface
>> since
>> >>>>>>>>> the position is updated within the implementation of the
>> >>> StateStore
>> >>>>>>>>> interface (e.g. RocksDBStore [1]). My statement describes the
>> >>>>>>>>> behavior now, not the change proposed in this KIP, so it does
>> not
>> >>>>>>>>> contradict what is stated in the KIP.
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> 200:
>> >>>>>>>>> This is about Matthias' main concern about rebalance metadata.
>> >>>>>>>>> As far as I understand the KIP, Kafka Streams will only use the
>> >>>>>>>>> .checkpoint files to compute the task lag for unassigned tasks
>> >>> whose
>> >>>>>>>>> state is locally available. For assigned tasks, it will use the
>> >>>>>>>>> offsets managed by the open state store.
>> >>>>>>>>>
>> >>>>>>>>> Best,
>> >>>>>>>>> Bruno
>> >>>>>>>>>
>> >>>>>>>>> [1]
>> >>>>>>>>>
>> >>>>>
>> >>>>
>> >>>
>> https://github.com/apache/kafka/blob/fcbfd3412eb746a0c81374eb55ad0f73de6b1e71/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L397
>> >>>>>>>>>
>> >>>>>>>>> On 5/1/24 3:00 AM, Matthias J. Sax wrote:
>> >>>>>>>>>> Thanks Bruno.
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> 101: I think I understand this better now. But just want to
>> make
>> >>>>>>>>>> sure I do. What do you mean by "they can diverge" and
>> "Recovering
>> >>>>>>>>>> after a failure might load inconsistent offsets and positions."
>> >>>>>>>>>>
>> >>>>>>>>>> The checkpoint is the offset from the changelog, while the
>> >>> position
>> >>>>>>>>>> is the offset from the upstream source topic, right? -- In the
>> >>> end,
>> >>>>>>>>>> the position is about IQ, and if we fail to update it, it only
>> >>>>>>>>>> means that there is some gap when we might not be able to
>> query a
>> >>>>>>>>>> standby task, because we think it's not up-to-date enough even
>> if
>> >>>>>>>>>> it is, which would resolve itself soon? Ie, the position might
>> >>>>>>>>>> "lag", but it's not "inconsistent". Do we believe that this lag
>> >>>>>>>>>> would be highly problematic?
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> 102: I am confused.
>> >>>>>>>>>>
>> >>>>>>>>>>> The position is maintained inside the state store, but is
>> >>>>>>>>>>> persisted in the .position file when the state store closes.
>> >>>>>>>>>>
>> >>>>>>>>>> This contradicts the KIP:
>> >>>>>>>>>>
>> >>>>>>>>>>>   these position offsets will be stored in RocksDB, in the
>> same
>> >>>>>>>>>>> column family as the changelog offsets, instead of the
>> .position
>> >>>>> file
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> My main concern is currently about rebalance metadata --
>> opening
>> >>>>>>>>>> RocksDB stores seems to be very expensive, but if we follow the
>> >>>> KIP:
>> >>>>>>>>>>
>> >>>>>>>>>>> We will do this under EOS by updating the .checkpoint file
>> >>>>>>>>>>> whenever a store is close()d.
>> >>>>>>>>>>
>> >>>>>>>>>> It seems, having the offset inside RocksDB does not help us at
>> >>> all?
>> >>>>>>>>>> In the end, when we crash, we don't want to lose the state, but
>> >>>>>>>>>> when we update the .checkpoint only on a clean close, the
>> >>>>>>>>>> .checkpoint might be stale (ie, still contains the checkpoint
>> >>> when
>> >>>>>>>>>> we opened the store when we got a task assigned).
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> -Matthias
>> >>>>>>>>>>
>> >>>>>>>>>> On 4/30/24 2:40 AM, Bruno Cadonna wrote:
>> >>>>>>>>>>> Hi all,
>> >>>>>>>>>>>
>> >>>>>>>>>>> 100
>> >>>>>>>>>>> I think we already have such a wrapper. It is called
>> >>>>>>>>>>> AbstractReadWriteDecorator.
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>> 101
>> >>>>>>>>>>> Currently, the position is checkpointed when a offset
>> checkpoint
>> >>>>>>>>>>> is written. If we let the state store manage the committed
>> >>>>>>>>>>> offsets, we need to also let the state store also manage the
>> >>>>>>>>>>> position otherwise they might diverge. State store managed
>> >>> offsets
>> >>>>>>>>>>> can get flushed (i.e. checkpointed) to the disk when the state
>> >>>>>>>>>>> store decides to flush its in-memory data structures, but the
>> >>>>>>>>>>> position is only checkpointed at commit time. Recovering
>> after a
>> >>>>>>>>>>> failure might load inconsistent offsets and positions.
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>> 102
>> >>>>>>>>>>> The position is maintained inside the state store, but is
>> >>>>>>>>>>> persisted in the .position file when the state store closes.
>> The
>> >>>>>>>>>>> only public interface that uses the position is IQv2 in a
>> >>>>>>>>>>> read-only mode. So the position is only updated within the
>> state
>> >>>>>>>>>>> store and read from IQv2. No need to add anything to the
>> public
>> >>>>>>>>>>> StateStore interface.
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>> 103
>> >>>>>>>>>>> Deprecating managesOffsets() right away might be a good idea.
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>> 104
>> >>>>>>>>>>> I agree that we should try to support downgrades without
>> wipes.
>> >>> At
>> >>>>>>>>>>> least Nick should state in the KIP why we do not support it.
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>> Best,
>> >>>>>>>>>>> Bruno
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>> On 4/23/24 8:13 AM, Matthias J. Sax wrote:
>> >>>>>>>>>>>> Thanks for splitting out this KIP. The discussion shows, that
>> >>> it
>> >>>>>>>>>>>> is a complex beast by itself, so worth to discuss by its own.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Couple of question / comment:
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> 100 `StateStore#commit()`: The JavaDoc says "must not be
>> called
>> >>>>>>>>>>>> by users" -- I would propose to put a guard in place for
>> this,
>> >>> by
>> >>>>>>>>>>>> either throwing an exception (preferable) or adding a no-op
>> >>>>>>>>>>>> implementation (at least for our own stores, by wrapping them
>> >>> --
>> >>>>>>>>>>>> we cannot enforce it for custom stores I assume), and
>> document
>> >>>>>>>>>>>> this contract explicitly.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> 101 adding `.position` to the store: Why do we actually need
>> >>>>>>>>>>>> this? The KIP says "To ensure consistency with the committed
>> >>> data
>> >>>>>>>>>>>> and changelog offsets" but I am not sure if I can follow? Can
>> >>> you
>> >>>>>>>>>>>> elaborate why leaving the `.position` file as-is won't work?
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>> If it's possible at all, it will need to be done by
>> >>>>>>>>>>>>> creating temporary StateManagers and StateStores during
>> >>>>>>>>>>>>> rebalance. I think
>> >>>>>>>>>>>>> it is possible, and probably not too expensive, but the
>> devil
>> >>>>>>>>>>>>> will be in
>> >>>>>>>>>>>>> the detail.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> This sounds like a significant overhead to me. We know that
>> >>>>>>>>>>>> opening a single RocksDB takes about 500ms, and thus opening
>> >>>>>>>>>>>> RocksDB to get this information might slow down rebalances
>> >>>>>>>>>>>> significantly.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> 102: It's unclear to me, how `.position` information is
>> added.
>> >>>>>>>>>>>> The KIP only says: "position offsets will be stored in
>> RocksDB,
>> >>>>>>>>>>>> in the same column family as the changelog offsets". Do you
>> >>>>>>>>>>>> intent to add this information to the map passed via
>> >>>>>>>>>>>> `commit(final Map<TopicPartition, Long> changelogOffsets)`?
>> The
>> >>>>>>>>>>>> KIP should describe this in more detail. Also, if my
>> assumption
>> >>>>>>>>>>>> is correct, we might want to rename the parameter and also
>> >>> have a
>> >>>>>>>>>>>> better JavaDoc description?
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> 103: Should we make it mandatory (long-term) that all stores
>> >>>>>>>>>>>> (including custom stores) manage their offsets internally?
>> >>>>>>>>>>>> Maintaining both options and thus both code paths puts a
>> burden
>> >>>>>>>>>>>> on everyone and make the code messy. I would strongly prefer
>> if
>> >>>>>>>>>>>> we could have mid-term path to get rid of supporting both.
>> --
>> >>>>>>>>>>>> For this case, we should deprecate the newly added
>> >>>>>>>>>>>> `managesOffsets()` method right away, to point out that we
>> >>> intend
>> >>>>>>>>>>>> to remove it. If it's mandatory to maintain offsets for
>> stores,
>> >>>>>>>>>>>> we won't need this method any longer. In memory stores can
>> just
>> >>>>>>>>>>>> return null from #committedOffset().
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> 104 "downgrading": I think it might be worth to add support
>> for
>> >>>>>>>>>>>> downgrading w/o the need to wipe stores? Leveraging
>> >>>>>>>>>>>> `upgrade.from` parameter, we could build a two rolling bounce
>> >>>>>>>>>>>> downgrade: (1) the new code is started with `upgrade.from`
>> set
>> >>> to
>> >>>>>>>>>>>> a lower version, telling the runtime to do the cleanup on
>> >>>>>>>>>>>> `close()` -- (ie, ensure that all data is written into
>> >>>>>>>>>>>> `.checkpoint` and `.position` file, and the newly added CL is
>> >>>>>>>>>>>> deleted). In a second, rolling bounce, the old code would be
>> >>> able
>> >>>>>>>>>>>> to open RocksDB. -- I understand that this implies much more
>> >>>>>>>>>>>> work, but downgrade seems to be common enough, that it might
>> be
>> >>>>>>>>>>>> worth it? Even if we did not always support this in the past,
>> >>> we
>> >>>>>>>>>>>> have the face the fact that KS is getting more and more
>> adopted
>> >>>>>>>>>>>> and as a more mature product should support this?
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> -Matthias
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> On 4/21/24 11:58 PM, Bruno Cadonna wrote:
>> >>>>>>>>>>>>> Hi all,
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> How should we proceed here?
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> 1. with the plain .checkpoint file
>> >>>>>>>>>>>>> 2. with a way to use the state store interface on unassigned
>> >>> but
>> >>>>>>>>>>>>> locally existing task state
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> While I like option 2, I think option 1 is less risky and
>> will
>> >>>>>>>>>>>>> give us the benefits of transactional state stores sooner.
>> We
>> >>>>>>>>>>>>> should consider the interface approach afterwards, though.
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> Best,
>> >>>>>>>>>>>>> Bruno
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> On 4/17/24 3:15 PM, Bruno Cadonna wrote:
>> >>>>>>>>>>>>>> Hi Nick and Sophie,
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> I think the task ID is not enough to create a state store
>> >>> that
>> >>>>>>>>>>>>>> can read the offsets of non-assigned tasks for lag
>> >>> computation
>> >>>>>>>>>>>>>> during rebalancing. The state store also needs the state
>> >>>>>>>>>>>>>> directory so that it knows where to find the information
>> that
>> >>>>>>>>>>>>>> it needs to return from changelogOffsets().
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> In general, I think we should proceed with the plain
>> >>>>>>>>>>>>>> .checkpoint file for now and iterate back to the state
>> store
>> >>>>>>>>>>>>>> solution later since it seems it is not that
>> straightforward.
>> >>>>>>>>>>>>>> Alternatively, Nick could timebox an effort to better
>> >>>>>>>>>>>>>> understand what would be needed for the state store
>> solution.
>> >>>>>>>>>>>>>> Nick, let us know your decision.
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Regarding your question about the state store instance. I
>> am
>> >>>>>>>>>>>>>> not too familiar with that part of the code, but I think
>> the
>> >>>>>>>>>>>>>> state store is build when the processor topology is build
>> and
>> >>>>>>>>>>>>>> the processor topology is build per stream task. So there
>> is
>> >>>>>>>>>>>>>> one instance of processor topology and state store per
>> stream
>> >>>>>>>>>>>>>> task. Try to follow the call in [1].
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Best,
>> >>>>>>>>>>>>>> Bruno
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> [1]
>> >>>>>>>>>>>>>>
>> >>>>>
>> >>>>
>> >>>
>> https://github.com/apache/kafka/blob/f52575b17225828d2ff11996030ab7304667deab/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java#L153
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> On 4/16/24 8:59 PM, Nick Telford wrote:
>> >>>>>>>>>>>>>>> That does make sense. The one thing I can't figure out is
>> >>> how
>> >>>>>>>>>>>>>>> per-Task
>> >>>>>>>>>>>>>>> StateStore instances are constructed.
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> It looks like we construct one StateStore instance for the
>> >>>>>>>>>>>>>>> whole Topology
>> >>>>>>>>>>>>>>> (in InternalTopologyBuilder), and pass that into
>> >>>>>>>>>>>>>>> ProcessorStateManager (via
>> >>>>>>>>>>>>>>> StateManagerUtil) for each Task, which then initializes
>> it.
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> This can't be the case though, otherwise multiple
>> partitions
>> >>>>>>>>>>>>>>> of the same
>> >>>>>>>>>>>>>>> sub-topology (aka Tasks) would share the same StateStore
>> >>>>>>>>>>>>>>> instance, which
>> >>>>>>>>>>>>>>> they don't.
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> What am I missing?
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> On Tue, 16 Apr 2024 at 16:22, Sophie Blee-Goldman
>> >>>>>>>>>>>>>>> <sop...@responsive.dev>
>> >>>>>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> I don't think we need to *require* a constructor accept
>> the
>> >>>>>>>>>>>>>>>> TaskId, but we
>> >>>>>>>>>>>>>>>> would definitely make sure that the RocksDB state store
>> >>>>>>>>>>>>>>>> changes its
>> >>>>>>>>>>>>>>>> constructor to one that accepts the TaskID (which we can
>> do
>> >>>>>>>>>>>>>>>> without
>> >>>>>>>>>>>>>>>> deprecation since its an internal API), and custom state
>> >>>>>>>>>>>>>>>> stores can just
>> >>>>>>>>>>>>>>>> decide for themselves whether they want to opt-in/use the
>> >>>>>>>>>>>>>>>> TaskId param
>> >>>>>>>>>>>>>>>> or not. I mean custom state stores would have to opt-in
>> >>>>>>>>>>>>>>>> anyways by
>> >>>>>>>>>>>>>>>> implementing the new StoreSupplier#get(TaskId) API and
>> the
>> >>>> only
>> >>>>>>>>>>>>>>>> reason to do that would be to have created a constructor
>> >>> that
>> >>>>>>>>>>>>>>>> accepts
>> >>>>>>>>>>>>>>>> a TaskId
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> Just to be super clear about the proposal, this is what I
>> >>> had
>> >>>>>>>>>>>>>>>> in mind.
>> >>>>>>>>>>>>>>>> It's actually fairly simple and wouldn't add much to the
>> >>>>>>>>>>>>>>>> scope of the
>> >>>>>>>>>>>>>>>> KIP (I think -- if it turns out to be more complicated
>> than
>> >>>>>>>>>>>>>>>> I'm assuming,
>> >>>>>>>>>>>>>>>> we should definitely do whatever has the smallest LOE to
>> >>> get
>> >>>>>>>>>>>>>>>> this done
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> Anyways, the (only) public API changes would be to add
>> this
>> >>>> new
>> >>>>>>>>>>>>>>>> method to the StoreSupplier API:
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> default T get(final TaskId taskId) {
>> >>>>>>>>>>>>>>>>       return get();
>> >>>>>>>>>>>>>>>> }
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> We can decide whether or not to deprecate the old #get
>> but
>> >>>>>>>>>>>>>>>> it's not
>> >>>>>>>>>>>>>>>> really necessary and might cause a lot of turmoil, so I'd
>> >>>>>>>>>>>>>>>> personally
>> >>>>>>>>>>>>>>>> say we just leave both APIs in place.
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> And that's it for public API changes! Internally, we
>> would
>> >>>>>>>>>>>>>>>> just adapt
>> >>>>>>>>>>>>>>>> each of the rocksdb StoreSupplier classes to implement
>> this
>> >>>> new
>> >>>>>>>>>>>>>>>> API. So for example with the
>> >>>> RocksDBKeyValueBytesStoreSupplier,
>> >>>>>>>>>>>>>>>> we just add
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> @Override
>> >>>>>>>>>>>>>>>> public KeyValueStore<Bytes, byte[]> get(final TaskId
>> >>> taskId)
>> >>>> {
>> >>>>>>>>>>>>>>>>       return returnTimestampedStore ?
>> >>>>>>>>>>>>>>>>           new RocksDBTimestampedStore(name,
>> metricsScope(),
>> >>>>>>>>>>>>>>>> taskId) :
>> >>>>>>>>>>>>>>>>           new RocksDBStore(name, metricsScope(), taskId);
>> >>>>>>>>>>>>>>>> }
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> And of course add the TaskId parameter to each of the
>> >>> actual
>> >>>>>>>>>>>>>>>> state store constructors returned here.
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> Does that make sense? It's entirely possible I'm missing
>> >>>>>>>>>>>>>>>> something
>> >>>>>>>>>>>>>>>> important here, but I think this would be a pretty small
>> >>>>>>>>>>>>>>>> addition that
>> >>>>>>>>>>>>>>>> would solve the problem you mentioned earlier while also
>> >>>> being
>> >>>>>>>>>>>>>>>> useful to anyone who uses custom state stores.
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> On Mon, Apr 15, 2024 at 10:21 AM Nick Telford
>> >>>>>>>>>>>>>>>> <nick.telf...@gmail.com>
>> >>>>>>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> Hi Sophie,
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> Interesting idea! Although what would that mean for the
>> >>>>>>>>>>>>>>>>> StateStore
>> >>>>>>>>>>>>>>>>> interface? Obviously we can't require that the
>> constructor
>> >>>>>>>>>>>>>>>>> take the
>> >>>>>>>>>>>>>>>> TaskId.
>> >>>>>>>>>>>>>>>>> Is it enough to add the parameter to the StoreSupplier?
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> Would doing this be in-scope for this KIP, or are we
>> >>>>>>>>>>>>>>>>> over-complicating
>> >>>>>>>>>>>>>>>> it?
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> Nick
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> On Fri, 12 Apr 2024 at 21:30, Sophie Blee-Goldman
>> >>>>>>>>>>>>>>>>> <sop...@responsive.dev
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> Somewhat minor point overall, but it actually drives me
>> >>>>>>>>>>>>>>>>>> crazy that you
>> >>>>>>>>>>>>>>>>>> can't get access to the taskId of a StateStore until
>> >>> #init
>> >>>>>>>>>>>>>>>>>> is called.
>> >>>>>>>>>>>>>>>>> This
>> >>>>>>>>>>>>>>>>>> has caused me a huge headache personally (since the
>> same
>> >>> is
>> >>>>>>>>>>>>>>>>>> true for
>> >>>>>>>>>>>>>>>>>> processors and I was trying to do something that's
>> >>> probably
>> >>>>>>>>>>>>>>>>>> too hacky
>> >>>>>>>>>>>>>>>> to
>> >>>>>>>>>>>>>>>>>> actually complain about here lol)
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> Can we just change the StateStoreSupplier to receive
>> and
>> >>>>>>>>>>>>>>>>>> pass along the
>> >>>>>>>>>>>>>>>>>> taskId when creating a new store? Presumably by adding
>> a
>> >>>>>>>>>>>>>>>>>> new version of
>> >>>>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>> #get method that takes in a taskId parameter? We can
>> have
>> >>>>>>>>>>>>>>>>>> it default to
>> >>>>>>>>>>>>>>>>>> invoking the old one for compatibility reasons and it
>> >>>>>>>>>>>>>>>>>> should be
>> >>>>>>>>>>>>>>>>> completely
>> >>>>>>>>>>>>>>>>>> safe to tack on.
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> Would also prefer the same for a ProcessorSupplier, but
>> >>>>> that's
>> >>>>>>>>>>>>>>>> definitely
>> >>>>>>>>>>>>>>>>>> outside the scope of this KIP
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> On Fri, Apr 12, 2024 at 3:31 AM Nick Telford
>> >>>>>>>>>>>>>>>>>> <nick.telf...@gmail.com>
>> >>>>>>>>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>> On further thought, it's clear that this can't work
>> for
>> >>>>>>>>>>>>>>>>>>> one simple
>> >>>>>>>>>>>>>>>>>> reason:
>> >>>>>>>>>>>>>>>>>>> StateStores don't know their associated TaskId (and
>> >>> hence,
>> >>>>>>>>>>>>>>>>>>> their
>> >>>>>>>>>>>>>>>>>>> StateDirectory) until the init() call. Therefore,
>> >>>>>>>>>>>>>>>>>>> committedOffset()
>> >>>>>>>>>>>>>>>>> can't
>> >>>>>>>>>>>>>>>>>>> be called before init(), unless we also added a
>> >>>>>>>>>>>>>>>>>>> StateStoreContext
>> >>>>>>>>>>>>>>>>>> argument
>> >>>>>>>>>>>>>>>>>>> to committedOffset(), which I think might be trying to
>> >>>>>>>>>>>>>>>>>>> shoehorn too
>> >>>>>>>>>>>>>>>>> much
>> >>>>>>>>>>>>>>>>>>> into committedOffset().
>> >>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>> I still don't like the idea of the Streams engine
>> >>>>>>>>>>>>>>>>>>> maintaining the
>> >>>>>>>>>>>>>>>> cache
>> >>>>>>>>>>>>>>>>>> of
>> >>>>>>>>>>>>>>>>>>> changelog offsets independently of stores, mostly
>> >>> because
>> >>>>>>>>>>>>>>>>>>> of the
>> >>>>>>>>>>>>>>>>>>> maintenance burden of the code duplication, but it
>> looks
>> >>>>>>>>>>>>>>>>>>> like we'll
>> >>>>>>>>>>>>>>>>> have
>> >>>>>>>>>>>>>>>>>> to
>> >>>>>>>>>>>>>>>>>>> live with it.
>> >>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>> Unless you have any better ideas?
>> >>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>> Regards,
>> >>>>>>>>>>>>>>>>>>> Nick
>> >>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>> On Wed, 10 Apr 2024 at 14:12, Nick Telford
>> >>>>>>>>>>>>>>>>>>> <nick.telf...@gmail.com>
>> >>>>>>>>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>> Hi Bruno,
>> >>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>> Immediately after I sent my response, I looked at the
>> >>>>>>>>>>>>>>>>>>>> codebase and
>> >>>>>>>>>>>>>>>>> came
>> >>>>>>>>>>>>>>>>>>> to
>> >>>>>>>>>>>>>>>>>>>> the same conclusion. If it's possible at all, it will
>> >>>>>>>>>>>>>>>>>>>> need to be
>> >>>>>>>>>>>>>>>> done
>> >>>>>>>>>>>>>>>>>> by
>> >>>>>>>>>>>>>>>>>>>> creating temporary StateManagers and StateStores
>> during
>> >>>>>>>>>>>>>>>>>>>> rebalance.
>> >>>>>>>>>>>>>>>> I
>> >>>>>>>>>>>>>>>>>>> think
>> >>>>>>>>>>>>>>>>>>>> it is possible, and probably not too expensive, but
>> the
>> >>>>>>>>>>>>>>>>>>>> devil will
>> >>>>>>>>>>>>>>>> be
>> >>>>>>>>>>>>>>>>>> in
>> >>>>>>>>>>>>>>>>>>>> the detail.
>> >>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>> I'll try to find some time to explore the idea to see
>> >>> if
>> >>>>>>>>>>>>>>>>>>>> it's
>> >>>>>>>>>>>>>>>>> possible
>> >>>>>>>>>>>>>>>>>>> and
>> >>>>>>>>>>>>>>>>>>>> report back, because we'll need to determine this
>> >>> before
>> >>>>>>>>>>>>>>>>>>>> we can
>> >>>>>>>>>>>>>>>> vote
>> >>>>>>>>>>>>>>>>> on
>> >>>>>>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>>>> KIP.
>> >>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>> Regards,
>> >>>>>>>>>>>>>>>>>>>> Nick
>> >>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>> On Wed, 10 Apr 2024 at 11:36, Bruno Cadonna
>> >>>>>>>>>>>>>>>>>>>> <cado...@apache.org>
>> >>>>>>>>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>> Hi Nick,
>> >>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>> Thanks for reacting on my comments so quickly!
>> >>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>> 2.
>> >>>>>>>>>>>>>>>>>>>>> Some thoughts on your proposal.
>> >>>>>>>>>>>>>>>>>>>>> State managers (and state stores) are parts of
>> tasks.
>> >>> If
>> >>>>>>>>>>>>>>>>>>>>> the task
>> >>>>>>>>>>>>>>>> is
>> >>>>>>>>>>>>>>>>>> not
>> >>>>>>>>>>>>>>>>>>>>> assigned locally, we do not create those tasks. To
>> get
>> >>>>>>>>>>>>>>>>>>>>> the offsets
>> >>>>>>>>>>>>>>>>>> with
>> >>>>>>>>>>>>>>>>>>>>> your approach, we would need to either create kind
>> of
>> >>>>>>>>>>>>>>>>>>>>> inactive
>> >>>>>>>>>>>>>>>> tasks
>> >>>>>>>>>>>>>>>>>>>>> besides active and standby tasks or store and manage
>> >>>> state
>> >>>>>>>>>>>>>>>> managers
>> >>>>>>>>>>>>>>>>> of
>> >>>>>>>>>>>>>>>>>>>>> non-assigned tasks differently than the state
>> managers
>> >>>>>>>>>>>>>>>>>>>>> of assigned
>> >>>>>>>>>>>>>>>>>>>>> tasks. Additionally, the cleanup thread that removes
>> >>>>>>>>>>>>>>>>>>>>> unassigned
>> >>>>>>>>>>>>>>>> task
>> >>>>>>>>>>>>>>>>>>>>> directories needs to concurrently delete those
>> >>> inactive
>> >>>>>>>>>>>>>>>>>>>>> tasks or
>> >>>>>>>>>>>>>>>>>>>>> task-less state managers of unassigned tasks. This
>> >>> seems
>> >>>>>>>>>>>>>>>>>>>>> all quite
>> >>>>>>>>>>>>>>>>>> messy
>> >>>>>>>>>>>>>>>>>>>>> to me.
>> >>>>>>>>>>>>>>>>>>>>> Could we create those state managers (or state
>> stores)
>> >>>>>>>>>>>>>>>>>>>>> for locally
>> >>>>>>>>>>>>>>>>>>>>> existing but unassigned tasks on demand when
>> >>>>>>>>>>>>>>>>>>>>> TaskManager#getTaskOffsetSums() is executed? Or
>> have a
>> >>>>>>>>>>>>>>>>>>>>> different
>> >>>>>>>>>>>>>>>>>>>>> encapsulation for the unused task directories?
>> >>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>> Best,
>> >>>>>>>>>>>>>>>>>>>>> Bruno
>> >>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>> On 4/10/24 11:31 AM, Nick Telford wrote:
>> >>>>>>>>>>>>>>>>>>>>>> Hi Bruno,
>> >>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>> Thanks for the review!
>> >>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>> 1, 4, 5.
>> >>>>>>>>>>>>>>>>>>>>>> Done
>> >>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>> 3.
>> >>>>>>>>>>>>>>>>>>>>>> You're right. I've removed the offending
>> paragraph. I
>> >>>> had
>> >>>>>>>>>>>>>>>>> originally
>> >>>>>>>>>>>>>>>>>>>>>> adapted this from the guarantees outlined in
>> KIP-892.
>> >>>>>>>>>>>>>>>>>>>>>> But it's
>> >>>>>>>>>>>>>>>>>>>>> difficult to
>> >>>>>>>>>>>>>>>>>>>>>> provide these guarantees without the KIP-892
>> >>>> transaction
>> >>>>>>>>>>>>>>>> buffers.
>> >>>>>>>>>>>>>>>>>>>>> Instead,
>> >>>>>>>>>>>>>>>>>>>>>> we'll add the guarantees back into the JavaDoc when
>> >>>>>>>>>>>>>>>>>>>>>> KIP-892
>> >>>>>>>>>>>>>>>> lands.
>> >>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>> 2.
>> >>>>>>>>>>>>>>>>>>>>>> Good point! This is the only part of the KIP that
>> was
>> >>>>>>>>>>>>>>>>>> (significantly)
>> >>>>>>>>>>>>>>>>>>>>>> changed when I extracted it from KIP-892. My
>> >>> prototype
>> >>>>>>>>>>>>>>>>>>>>>> currently
>> >>>>>>>>>>>>>>>>>>>>> maintains
>> >>>>>>>>>>>>>>>>>>>>>> this "cache" of changelog offsets in .checkpoint,
>> but
>> >>>>>>>>>>>>>>>>>>>>>> doing so
>> >>>>>>>>>>>>>>>>>> becomes
>> >>>>>>>>>>>>>>>>>>>>> very
>> >>>>>>>>>>>>>>>>>>>>>> messy. My intent with this change was to try to
>> >>> better
>> >>>>>>>>>>>>>>>> encapsulate
>> >>>>>>>>>>>>>>>>>>> this
>> >>>>>>>>>>>>>>>>>>>>>> offset "caching", especially for StateStores that
>> can
>> >>>>>>>>>>>>>>>>>>>>>> cheaply
>> >>>>>>>>>>>>>>>>>> provide
>> >>>>>>>>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>>>>>> offsets stored directly in them without needing to
>> >>>>>>>>>>>>>>>>>>>>>> duplicate
>> >>>>>>>>>>>>>>>> them
>> >>>>>>>>>>>>>>>>> in
>> >>>>>>>>>>>>>>>>>>>>> this
>> >>>>>>>>>>>>>>>>>>>>>> cache.
>> >>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>> It's clear some more work is needed here to better
>> >>>>>>>>>>>>>>>>>>>>>> encapsulate
>> >>>>>>>>>>>>>>>>> this.
>> >>>>>>>>>>>>>>>>>>> My
>> >>>>>>>>>>>>>>>>>>>>>> immediate thought is: what if we construct *but
>> don't
>> >>>>>>>>>>>>>>>> initialize*
>> >>>>>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>>>>>> StateManager and StateStores for every Task
>> directory
>> >>>>>>>>>>>>>>>>>>>>>> on-disk?
>> >>>>>>>>>>>>>>>>> That
>> >>>>>>>>>>>>>>>>>>>>> should
>> >>>>>>>>>>>>>>>>>>>>>> still be quite cheap to do, and would enable us to
>> >>>>>>>>>>>>>>>>>>>>>> query the
>> >>>>>>>>>>>>>>>>> offsets
>> >>>>>>>>>>>>>>>>>>> for
>> >>>>>>>>>>>>>>>>>>>>>> all on-disk stores, even if they're not open. If
>> the
>> >>>>>>>>>>>>>>>> StateManager
>> >>>>>>>>>>>>>>>>>>> (aka.
>> >>>>>>>>>>>>>>>>>>>>>> ProcessorStateManager/GlobalStateManager) proves
>> too
>> >>>>>>>>>>>>>>>>>>>>>> expensive
>> >>>>>>>>>>>>>>>> to
>> >>>>>>>>>>>>>>>>>> hold
>> >>>>>>>>>>>>>>>>>>>>> open
>> >>>>>>>>>>>>>>>>>>>>>> for closed stores, we could always have a
>> >>>>>>>>>>>>>>>>>>>>>> "StubStateManager" in
>> >>>>>>>>>>>>>>>>> its
>> >>>>>>>>>>>>>>>>>>>>> place,
>> >>>>>>>>>>>>>>>>>>>>>> that enables the querying of offsets, but nothing
>> >>> else?
>> >>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>> IDK, what do you think?
>> >>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>> Regards,
>> >>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>> Nick
>> >>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>> On Tue, 9 Apr 2024 at 15:00, Bruno Cadonna
>> >>>>>>>>>>>>>>>>>>>>>> <cado...@apache.org>
>> >>>>>>>>>>>>>>>>>>> wrote:
>> >>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>> Hi Nick,
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>> Thanks for breaking out the KIP from KIP-892!
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>> Here a couple of comments/questions:
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>> 1.
>> >>>>>>>>>>>>>>>>>>>>>>> In Kafka Streams, we have a design guideline which
>> >>>>>>>>>>>>>>>>>>>>>>> says to not
>> >>>>>>>>>>>>>>>>> use
>> >>>>>>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>>>>>>> "get"-prefix for getters on the public API. Could
>> >>> you
>> >>>>>>>>>>>>>>>>>>>>>>> please
>> >>>>>>>>>>>>>>>>> change
>> >>>>>>>>>>>>>>>>>>>>>>> getCommittedOffsets() to committedOffsets()?
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>> 2.
>> >>>>>>>>>>>>>>>>>>>>>>> It is not clear to me how
>> >>>>> TaskManager#getTaskOffsetSums()
>> >>>>>>>>>>>>>>>> should
>> >>>>>>>>>>>>>>>>>> read
>> >>>>>>>>>>>>>>>>>>>>>>> offsets of tasks the stream thread does not own
>> but
>> >>>>>>>>>>>>>>>>>>>>>>> that have a
>> >>>>>>>>>>>>>>>>>> state
>> >>>>>>>>>>>>>>>>>>>>>>> directory on the Streams client by calling
>> >>>>>>>>>>>>>>>>>>>>>>> StateStore#getCommittedOffsets(). If the thread
>> does
>> >>>>>>>>>>>>>>>>>>>>>>> not own a
>> >>>>>>>>>>>>>>>>> task
>> >>>>>>>>>>>>>>>>>>> it
>> >>>>>>>>>>>>>>>>>>>>>>> does also not create any state stores for the
>> task,
>> >>>>>>>>>>>>>>>>>>>>>>> which means
>> >>>>>>>>>>>>>>>>>> there
>> >>>>>>>>>>>>>>>>>>>>> is
>> >>>>>>>>>>>>>>>>>>>>>>> no state store on which to call
>> >>> getCommittedOffsets().
>> >>>>>>>>>>>>>>>>>>>>>>> I would have rather expected that a checkpoint
>> file
>> >>> is
>> >>>>>>>>>>>>>>>>>>>>>>> written
>> >>>>>>>>>>>>>>>>> for
>> >>>>>>>>>>>>>>>>>>> all
>> >>>>>>>>>>>>>>>>>>>>>>> state stores on close -- not only for the
>> >>> RocksDBStore
>> >>>>>>>>>>>>>>>>>>>>>>> -- and
>> >>>>>>>>>>>>>>>>> that
>> >>>>>>>>>>>>>>>>>>> this
>> >>>>>>>>>>>>>>>>>>>>>>> checkpoint file is read in
>> >>>>>>>>>>>>>>>>>>>>>>> TaskManager#getTaskOffsetSums() for
>> >>>>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>>>>> tasks
>> >>>>>>>>>>>>>>>>>>>>>>> that have a state directory on the client but are
>> >>> not
>> >>>>>>>>>>>>>>>>>>>>>>> currently
>> >>>>>>>>>>>>>>>>>>>>> assigned
>> >>>>>>>>>>>>>>>>>>>>>>> to any stream thread of the Streams client.
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>> 3.
>> >>>>>>>>>>>>>>>>>>>>>>> In the javadocs for commit() you write
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>> "... all writes since the last commit(Map), or
>> since
>> >>>>>>>>>>>>>>>>>> init(StateStore)
>> >>>>>>>>>>>>>>>>>>>>>>> *MUST* be available to readers, even after a
>> >>> restart."
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>> This is only true for a clean close before the
>> >>>>>>>>>>>>>>>>>>>>>>> restart, isn't
>> >>>>>>>>>>>>>>>> it?
>> >>>>>>>>>>>>>>>>>>>>>>> If the task fails with a dirty close, Kafka
>> Streams
>> >>>>>>>>>>>>>>>>>>>>>>> cannot
>> >>>>>>>>>>>>>>>>>> guarantee
>> >>>>>>>>>>>>>>>>>>>>>>> that the in-memory structures of the state store
>> >>> (e.g.
>> >>>>>>>>>>>>>>>>>>>>>>> memtable
>> >>>>>>>>>>>>>>>>> in
>> >>>>>>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>>>>>>> case of RocksDB) are flushed so that the records
>> and
>> >>>> the
>> >>>>>>>>>>>>>>>>> committed
>> >>>>>>>>>>>>>>>>>>>>>>> offsets are persisted.
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>> 4.
>> >>>>>>>>>>>>>>>>>>>>>>> The wrapper that provides the legacy checkpointing
>> >>>>>>>>>>>>>>>>>>>>>>> behavior is
>> >>>>>>>>>>>>>>>>>>> actually
>> >>>>>>>>>>>>>>>>>>>>>>> an implementation detail. I would remove it from
>> the
>> >>>>>>>>>>>>>>>>>>>>>>> KIP, but
>> >>>>>>>>>>>>>>>>> still
>> >>>>>>>>>>>>>>>>>>>>>>> state that the legacy checkpointing behavior will
>> be
>> >>>>>>>>>>>>>>>>>>>>>>> supported
>> >>>>>>>>>>>>>>>>> when
>> >>>>>>>>>>>>>>>>>>> the
>> >>>>>>>>>>>>>>>>>>>>>>> state store does not manage the checkpoints.
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>> 5.
>> >>>>>>>>>>>>>>>>>>>>>>> Regarding the metrics, could you please add the
>> >>> tags,
>> >>>>>>>>>>>>>>>>>>>>>>> and the
>> >>>>>>>>>>>>>>>>>>> recording
>> >>>>>>>>>>>>>>>>>>>>>>> level (DEBUG or INFO) as done in KIP-607 or
>> KIP-444.
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>> Best,
>> >>>>>>>>>>>>>>>>>>>>>>> Bruno
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>> On 4/7/24 5:35 PM, Nick Telford wrote:
>> >>>>>>>>>>>>>>>>>>>>>>>> Hi everyone,
>> >>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>> Based on some offline discussion, I've split out
>> >>> the
>> >>>>>>>>>>>>>>>>>>>>>>>> "Atomic
>> >>>>>>>>>>>>>>>>>>>>>>> Checkpointing"
>> >>>>>>>>>>>>>>>>>>>>>>>> section from KIP-892: Transactional Semantics for
>> >>>>>>>>>>>>>>>>>>>>>>>> StateStores,
>> >>>>>>>>>>>>>>>>>> into
>> >>>>>>>>>>>>>>>>>>>>> its
>> >>>>>>>>>>>>>>>>>>>>>>> own
>> >>>>>>>>>>>>>>>>>>>>>>>> KIP
>> >>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>> KIP-1035: StateStore managed changelog offsets
>> >>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>
>> >>>>>
>> >>>>
>> >>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets
>> >>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>> While KIP-892 was adopted *with* the changes
>> >>> outlined
>> >>>>> in
>> >>>>>>>>>>>>>>>>> KIP-1035,
>> >>>>>>>>>>>>>>>>>>>>> these
>> >>>>>>>>>>>>>>>>>>>>>>>> changes were always the most contentious part,
>> and
>> >>>>>>>>>>>>>>>>>>>>>>>> continued
>> >>>>>>>>>>>>>>>> to
>> >>>>>>>>>>>>>>>>>> spur
>> >>>>>>>>>>>>>>>>>>>>>>>> discussion even after KIP-892 was adopted.
>> >>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>> All the changes introduced in KIP-1035 have been
>> >>>>>>>>>>>>>>>>>>>>>>>> removed from
>> >>>>>>>>>>>>>>>>>>> KIP-892,
>> >>>>>>>>>>>>>>>>>>>>>>> and
>> >>>>>>>>>>>>>>>>>>>>>>>> a hard dependency on KIP-1035 has been added to
>> >>>>>>>>>>>>>>>>>>>>>>>> KIP-892 in
>> >>>>>>>>>>>>>>>> their
>> >>>>>>>>>>>>>>>>>>>>> place.
>> >>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>> I'm hopeful that with some more focus on this set
>> >>> of
>> >>>>>>>>>>>>>>>>>>>>>>>> changes,
>> >>>>>>>>>>>>>>>> we
>> >>>>>>>>>>>>>>>>>> can
>> >>>>>>>>>>>>>>>>>>>>>>>> deliver something that we're all happy with.
>> >>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>> Regards,
>> >>>>>>>>>>>>>>>>>>>>>>>> Nick
>> >>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>
>>
>

Reply via email to