Hi Matthias,

Thanks for your thorough review.

200. (#managesOffsets requirements)
Done

201. (#commit atomicity recommendation vs. guarantee)
There are possible StateStore implementations, including existing ones,
that can't guarantee atomicity - because the underlying database/system
doesn't support it. It might be hypothetically possible for atomicity to be
layered on top of these systems, but doing so would likely be complex and
potentially introduce performance issues. For this reason, I chose to make
this a recommendation over a requirement. The only hard requirement is that
writes are persisted to disk *before* offsets, so there's no data loss.

202. (Metrics descriptions)
I've actually updated the metric descriptions to match those of the
existing "flush" metrics. Is that ok?

203a. (Consumer Rebalance Metadata)
This is definitely implementation detail, so I've removed the "and close()
these stores" from the KIP, as well as the last paragraph about performance
testing and using separate threads, as they are also implementation details.
I'm actually working on this logic right now. Currently I have it just
initializing a StandbyTask for each found directory and then closing them
after we get the offsets. The reason we need to initialize an entire Task
is because StateStores are constructed by the ProcessorTopology when it's
initialized.
I'm going to spend some time today, looking into whether we can keep these
StandbyTasks open but not RUNNING, and then on-assignment either start
running them (if assigned a Standby) or upgrade it to a StreamTask (if
assigned an Active). I think it *should* be possible, but the devil is
usually in the details!

203b. (managesOffsets deprecation)
Done

204. (Downgrade)
In my testing, when an older version of RocksDBStore attempts to open a
database with an unknown column family (i.e the offsets cf), it throws an
Exception, which is caught and rethrown as TaskCorruptedException; this
triggers a wipe of local Task state and crashes the StreamThread. On
restart, it restores as normal.

205. (Segment stores implementation)
I'm deliberately not detailing this implementation in the KIP, because all
SegmentStore APIs are internal, so this is really just an implementation
detail.
What I'm (currently) doing is storing offsets in each Segment. Obviously
the currently live segment will be the only one with offsets being
advanced, so we always return offsets from the currently live segment, but
if there isn't one, then we go backwards through the existing offsets (i.e.
starting with the most recent) and return the first available offset.
I confess that SegmentStores are not an area I know much about, but I
believe this should work.

206. (KeyValueStoreTestDriver)
Hmm, good point. I thought this was part of the test-utils package along
with TopologyTestDriver. I'll keep it out of the KIP.

General status update:
I've begun implementing this KIP in a branch separate from my earlier work.
One of my primary goals is to implement it incrementally, in a way that
allows each commit to be independently reviewed and merged to trunk without
breaking anything. That should keep reviews much more concise. I'll start
opening PRs once the KIP has been accepted *and* I'm close enough to
completion that we can guarantee getting it all done by the next release.
--
Cheers,
Nick

On Tue, 4 Jun 2024 at 20:34, Matthias J. Sax <mj...@apache.org> wrote:

> Nick,
>
> Thanks a lot for updating the KIP. I made a pass over it. Overall LGTM.
> A few nits and some more minor questions:
>
>
>
> 200: nit (Javadocs for `StateStore.managesOffsets()`):
>
> > This is highly
> > recommended, if possible, to ensure that custom StateStores provide the
> consistency guarantees that Kafka Streams
> > expects when operating under the {@code exactly-once} {@code
> processing.mode}.
>
> Given that we make it mandatory, we should rephrase this: "high
> recommended" does not seems to be strong enough wording.
>
>
>
> 201: Javadocs for `StateStore.commit(final Map<TopicPartition, Long>
> changelogOffsets)`:
>
> > Implementations <em>SHOULD</em> ensure that {@code changelogOffsets} are
> committed to disk atomically with the
> > records they represent, if possible.
>
> Not sure if I can follow? Why "should ensure", but not "must ensure"?
>
>
>
> 202: New metrics:
>
> `commit-rate` -> Description says "The number of calls to..." -- Should
> be "The number of calls per second to..."?
>
> `commit-latency-[]` -> Description says "The [] time taken to" -- Should
> be "The [] time in nanoseconds taken to..."? (or milliseconds in case we
> report in millis?)
>
>
>
> 203: Section "Consumer Rebalance Metadata"
>
> > We will then cache these offsets in-memory and close() these stores.
>
> I think we should not pro-actively close the store, but keep them open,
> until we get tasks assigned. For assigned tasks, we don't need to
> re-open the store, what provides a nice optimization. For other stores,
> we could close them at this point as there is no need to keep them open.
> -- However, this might all be internal implementation details and maybe
> we don't need to specify this on the KIP at all (might be best to just
> not say anything about this part)?
>
>
>
> 203: "managesOffsets deprecation"
>
> >  to allow for its removal in the next major release of Kafka Streams
>
> We don't release Kafka Streams, but Kafka :) -- Also, it's not
> necessarily the next major release, as we have a one year / 3 releases
> guarantee to keep deprecated APIs and we don't know when the next major
> release will happen. Let's just rephrase this in a some more generic
> way: "for its removal in a future [major] release" or something like this.
>
>
>
> 204: "Downgrade":
>
> > by default the on-disk state for any Task containing a RocksDBStore will
> be wiped and restored from their changelogs.
>
> This seems not to be correct? For this case, won't Kafka Streams just
> crash? And a manual store deletions would be required?
>
>
>
> 205: How do we intent to implement offset management for segmented
> stores? Are we going to add this new CL to _all_ segments? From a
> structure POV is seems best to add to all segments, but it seem
> sufficient to keep the information up-to-date only in the latest
> segments (what would imply that we need to copy the information from the
> current latest segment to a newly created segment explicitly) and only
> _read_ the information from the latest segment as older segments might
> contain stale metadata?
>
>
>
> 206: `KeyValueStoreTestDriver`:
>
> This is a test class, right? So we don't need to cover it in the KIP,
> and can rename w/o a deprecation phase, as it's all internal code.
>
>
>
>
> -Matthias
>
>
>
>
> On 5/30/24 8:57 AM, Nick Telford wrote:
> > Hi everyone,
> >
> > I didn't spot this before, but it looks like the API of
> > KeyValueStoreTestDriver will need to be updated to change the
> nomenclature
> > from "flushed" to "committed":
> >
> > numFlushedEntryRemoved() -> numCommittedEntryRemoved()
> > numFlushedEntryStored() -> numCommittedEntryStored()
> > flushedEntryRemoved(K) -> committedEntryRemoved(K)
> > flushedEntryStored(K) -> committedEntryStored(K)
> >
> > The old methods will obviously be marked as @Deprecated.
> >
> > Any objections before I add this to the KIP?
> >
> > Regards,
> > Nick
> >
> >
> > On Wed, 29 May 2024 at 11:20, Nick Telford <nick.telf...@gmail.com>
> wrote:
> >
> >> I've updated the KIP with the following:
> >>
> >>     - Deprecation of StateStore#managesOffsets
> >>     - Change StateStore#commit to throw UnsupportedOperationException
> when
> >>     called from a Processor (via AbstractReadWriteDecorator)
> >>     - Updated consumer rebalance lag computation strategy
> >>     <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets#KIP1035:StateStoremanagedchangelogoffsets-ConsumerRebalanceMetadata
> >
> >>     based on our Meet discussion
> >>        - I've added a bit more detail here than we discussed, in
> >>        particular around how we handle the offsets for tasks assigned
> to our local
> >>        instance, and how we handle offsets when Tasks are
> closed/revoked.
> >>     - Improved downgrade behaviour
> >>        - Note: users that don't downgrade with upgrade.from will still
> get
> >>        the wipe-and-restore behaviour by-default.
> >>
> >> I believe this covers all the outstanding changes that were requested.
> >> Please let me know if I've missed anything or you think further changes
> are
> >> needed.
> >>
> >> Regards,
> >> Nick
> >>
> >> On Wed, 29 May 2024 at 09:28, Nick Telford <nick.telf...@gmail.com>
> wrote:
> >>
> >>> Hi everyone,
> >>>
> >>> Sorry I haven't got around to updating the KIP yet. Now that I've
> wrapped
> >>> up KIP-989, I'm going to be working on 1035 starting today.
> >>>
> >>> I'll update the KIP first, and then call a vote.
> >>>
> >>> Regards,
> >>> Nick
> >>>
> >>> On Wed, 29 May 2024 at 07:25, Bruno Cadonna <cado...@apache.org>
> wrote:
> >>>
> >>>> Totally agree on moving forward and starting the VOTE!
> >>>>
> >>>> However, the KIP should be updated with the new info before starting
> the
> >>>> VOTE.
> >>>>
> >>>> Best,
> >>>> Bruno
> >>>>
> >>>> On 5/29/24 2:36 AM, Matthias J. Sax wrote:
> >>>>> Sounds like a good plan. -- I think we are still wrapping up 3.8
> >>>>> release, but would also like to move forward with with one.
> >>>>>
> >>>>> Should we start a VOTE?
> >>>>>
> >>>>> For merging PRs we need to wait after code freeze, and 3.8 branch was
> >>>>> but. But we could start reviewing PRs before this already.
> >>>>>
> >>>>>
> >>>>> -Matthias
> >>>>>
> >>>>> On 5/17/24 3:05 AM, Nick Telford wrote:
> >>>>>> Hi everyone,
> >>>>>>
> >>>>>> As discussed on the Zoom call, we're going to handle rebalance
> >>>>>> meta-data by:
> >>>>>>
> >>>>>> - On start-up, Streams will open each store and read its changelog
> >>>>>> offsets
> >>>>>> into an in-memory cache. This cache will be shared among all
> >>>>>> StreamThreads.
> >>>>>> - On rebalance, the cache will be consulted for Task offsets for any
> >>>> Task
> >>>>>> that is not active on any instance-local StreamThreads. If the Task
> is
> >>>>>> active on *any* instance-local StreamThread, we will report the Task
> >>>>>> lag as
> >>>>>> "up to date" (i.e. -1), because we know that the local state is
> >>>> currently
> >>>>>> up-to-date.
> >>>>>>
> >>>>>> We will avoid caching offsets across restarts in the legacy
> >>>> ".checkpoint"
> >>>>>> file, so that we can eliminate the logic for handling this class. If
> >>>>>> performance of opening/closing many state stores is poor, we can
> >>>>>> parallelise it by forking off a thread for each Task directory when
> >>>>>> reading
> >>>>>> the offsets.
> >>>>>>
> >>>>>> I'll update the KIP later today to reflect this design, but I will
> >>>> try to
> >>>>>> keep it high-level, so that the exact implementation can vary.
> >>>>>>
> >>>>>> Regards,
> >>>>>>
> >>>>>> Nick
> >>>>>>
> >>>>>> On Thu, 16 May 2024 at 03:12, Sophie Blee-Goldman <
> >>>> sop...@responsive.dev>
> >>>>>> wrote:
> >>>>>>
> >>>>>>> 103: I like the idea of immediately deprecating #managesOffsets and
> >>>>>>> aiming
> >>>>>>> to make offset management mandatory in the long run. I assume we
> >>>>>>> would also
> >>>>>>> log a warning for any custom stores that return "false" from this
> >>>>>>> method to
> >>>>>>> encourage custom store implementations to start doing so? My only
> >>>>>>> question/concern is that if we want folks to start managing their
> own
> >>>>>>> offsets then we should make this transition easy for them, perhaps
> by
> >>>>>>> exposing some public utility APIs for things that are currently
> >>>>>>> handled by
> >>>>>>> Kafka Streams such as reading/writing checkpoint files. Maybe it
> >>>>>>> would be
> >>>>>>> useful to include a small example in the KIP of what it would
> >>>>>>> actually mean
> >>>>>>> to "manage your own offsets" -- I know (all too well) that plugging
> >>>> in
> >>>>>>> custom storage implementations is not easy and most people who do
> >>>>>>> this are
> >>>>>>> probably fairly advanced users, but offset management will be a
> >>>>>>> totally new
> >>>>>>> ballgame to most people people and this kind of feels like throwing
> >>>> them
> >>>>>>> off the deep end. We should at least provide a lifejacket via some
> >>>>>>> kind of
> >>>>>>> utility API and/or example
> >>>>>>>
> >>>>>>> 200. There's been a lot of back and forth on the rebalance
> >>>> metadata/task
> >>>>>>> lag computation question, so forgive me if I missed any part of
> >>>> this,
> >>>>>>> but I
> >>>>>>> think we've landed at the right idea here. To summarize: the
> "tl;dr"
> >>>>>>> explanation is that we'll write the checkpoint file only on close
> >>>> and
> >>>>>>> will
> >>>>>>> account for hard-crash scenarios by opening up the stores on
> startup
> >>>> and
> >>>>>>> writing a checkpoint file for any missing tasks. Does that sound
> >>>> about
> >>>>>>> right?
> >>>>>>>
> >>>>>>> A few clarifications:
> >>>>>>> I think we're all more or less on the same page here but just to be
> >>>>>>> absolutely clear, the task lags for each task directory found on
> >>>> disk
> >>>>>>> will
> >>>>>>> be reported by only one of the StreamThreads, and each StreamThread
> >>>> will
> >>>>>>> report lags only for tasks that it already owns or are not assigned
> >>>>>>> to any
> >>>>>>> other StreamThread in the client. In other words, we only need to
> >>>> get
> >>>>>>> the
> >>>>>>> task lag for completely unassigned/unlocked tasks, which means if
> >>>>>>> there is
> >>>>>>> a checkpoint file at all then it must be up-to-date, because there
> >>>> is no
> >>>>>>> other StreamThread actively writing to that state store (if so then
> >>>> only
> >>>>>>> that StreamThread would report lag for that particular task).
> >>>>>>>
> >>>>>>> This still leaves the "no checkpoint at all" case which as
> previously
> >>>>>>> mentioned can occur after a  hard-crash. Luckily we only have to
> >>>> worry
> >>>>>>> about this once, after starting up again following said hard crash.
> >>>>>>> We can
> >>>>>>> simply open up each of the state stores before ever joining the
> >>>>>>> group, get
> >>>>>>> the offsets from rocksdb, and write them to a new checkpoint file.
> >>>> After
> >>>>>>> that, we can depend on the checkpoints written at close and won't
> >>>>>>> have to
> >>>>>>> open up any stores that aren't already assigned for the reasons
> laid
> >>>>>>> out in
> >>>>>>> the paragraph above.
> >>>>>>>
> >>>>>>> As for the specific mechanism and which thread-does-what, since
> >>>> there
> >>>>>>> were
> >>>>>>> some questions, this is how I'm imagining the process:
> >>>>>>>
> >>>>>>>      1.   The general idea is that we simply go through each task
> >>>>>>> directories
> >>>>>>>      with state but no checkpoint file and open the StateStore,
> call
> >>>>>>>      #committedOffset, and then write it to the checkpoint file. We
> >>>>>>> can then
> >>>>>>>      close these stores and let things proceed as normal.
> >>>>>>>      2.  This only has to happen once, during startup, but we have
> two
> >>>>>>>      options:
> >>>>>>>         1. Do this from KafkaStreams#start, ie before we even
> create
> >>>> the
> >>>>>>>         StreamThreads
> >>>>>>>         2.  Do this from StreamThread#start, following a similar
> >>>>>>> lock-based
> >>>>>>>         approach to the one used #computeTaskLags, where each
> >>>>>>> StreamThread
> >>>>>>> just
> >>>>>>>         makes a pass over the task directories on disk and attempts
> >>>> to
> >>>>>>> lock
> >>>>>>> them
> >>>>>>>         one by one. If they obtain the lock, check whether there is
> >>>> state
> >>>>>>> but no
> >>>>>>>         checkpoint, and write the checkpoint if needed. If it can't
> >>>> grab
> >>>>>>> the lock,
> >>>>>>>         then we know one of the other StreamThreads must be
> handling
> >>>> the
> >>>>>>> checkpoint
> >>>>>>>         file for that task directory, and we can move on.
> >>>>>>>
> >>>>>>> Don't really feel too strongly about which approach is best,  doing
> >>>>>>> it in
> >>>>>>> KafkaStreams#start is certainly the most simple while doing it in
> the
> >>>>>>> StreamThread's startup is more efficient. If we're worried about
> >>>>>>> adding too
> >>>>>>> much weight to KafkaStreams#start then the 2nd option is probably
> >>>> best,
> >>>>>>> though slightly more complicated.
> >>>>>>>
> >>>>>>> Thoughts?
> >>>>>>>
> >>>>>>> On Tue, May 14, 2024 at 10:02 AM Nick Telford <
> >>>> nick.telf...@gmail.com>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Hi everyone,
> >>>>>>>>
> >>>>>>>> Sorry for the delay in replying. I've finally now got some time to
> >>>> work
> >>>>>>> on
> >>>>>>>> this.
> >>>>>>>>
> >>>>>>>> Addressing Matthias's comments:
> >>>>>>>>
> >>>>>>>> 100.
> >>>>>>>> Good point. As Bruno mentioned, there's already
> >>>>>>> AbstractReadWriteDecorator
> >>>>>>>> which we could leverage to provide that protection. I'll add
> >>>> details on
> >>>>>>>> this to the KIP.
> >>>>>>>>
> >>>>>>>> 101,102.
> >>>>>>>> It looks like these points have already been addressed by Bruno.
> >>>> Let me
> >>>>>>>> know if anything here is still unclear or you feel needs to be
> >>>> detailed
> >>>>>>>> more in the KIP.
> >>>>>>>>
> >>>>>>>> 103.
> >>>>>>>> I'm in favour of anything that gets the old code removed sooner,
> but
> >>>>>>>> wouldn't deprecating an API that we expect (some) users to
> implement
> >>>>>>> cause
> >>>>>>>> problems?
> >>>>>>>> I'm thinking about implementers of custom StateStores, as they may
> >>>> be
> >>>>>>>> confused by managesOffsets() being deprecated, especially since
> they
> >>>>>>> would
> >>>>>>>> have to mark their implementation as @Deprecated in order to avoid
> >>>>>>> compile
> >>>>>>>> warnings.
> >>>>>>>> If deprecating an API *while it's still expected to be
> implemented*
> >>>> is
> >>>>>>>> something that's generally done in the project, then I'm happy to
> >>>> do so
> >>>>>>>> here.
> >>>>>>>>
> >>>>>>>> 104.
> >>>>>>>> I think this is technically possible, but at the cost of
> >>>> considerable
> >>>>>>>> additional code to maintain. Would we ever have a pathway to
> remove
> >>>>>>>> this
> >>>>>>>> downgrade code in the future?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Regarding rebalance metadata:
> >>>>>>>> Opening all stores on start-up to read and cache their offsets is
> an
> >>>>>>>> interesting idea, especially if we can avoid re-opening the stores
> >>>> once
> >>>>>>> the
> >>>>>>>> Tasks have been assigned. Scalability shouldn't be too much of a
> >>>>>>>> problem,
> >>>>>>>> because typically users have a fairly short state.cleanup.delay,
> so
> >>>> the
> >>>>>>>> number of on-disk Task directories should rarely exceed the number
> >>>> of
> >>>>>>> Tasks
> >>>>>>>> previously assigned to that instance.
> >>>>>>>> An advantage of this approach is that it would also simplify
> >>>> StateStore
> >>>>>>>> implementations, as they would only need to guarantee that
> committed
> >>>>>>>> offsets are available when the store is open.
> >>>>>>>>
> >>>>>>>> I'll investigate this approach this week for feasibility and
> report
> >>>>>>>> back.
> >>>>>>>>
> >>>>>>>> I think that covers all the outstanding feedback, unless I missed
> >>>>>>> anything?
> >>>>>>>>
> >>>>>>>> Regards,
> >>>>>>>> Nick
> >>>>>>>>
> >>>>>>>> On Mon, 6 May 2024 at 14:06, Bruno Cadonna <cado...@apache.org>
> >>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi Matthias,
> >>>>>>>>>
> >>>>>>>>> I see what you mean.
> >>>>>>>>>
> >>>>>>>>> To sum up:
> >>>>>>>>>
> >>>>>>>>> With this KIP the .checkpoint file is written when the store
> >>>> closes.
> >>>>>>>>> That is when:
> >>>>>>>>> 1. a task moves away from Kafka Streams client
> >>>>>>>>> 2. Kafka Streams client shuts down
> >>>>>>>>>
> >>>>>>>>> A Kafka Streams client needs the information in the .checkpoint
> >>>> file
> >>>>>>>>> 1. on startup because it does not have any open stores yet.
> >>>>>>>>> 2. during rebalances for non-empty state directories of tasks
> that
> >>>> are
> >>>>>>>>> not assigned to the Kafka Streams client.
> >>>>>>>>>
> >>>>>>>>> With hard crashes, i.e., when the Streams client is not able to
> >>>> close
> >>>>>>>>> its state stores and write the .checkpoint file, the .checkpoint
> >>>> file
> >>>>>>>>> might be quite stale. That influences the next rebalance after
> >>>>>>>>> failover
> >>>>>>>>> negatively.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> My conclusion is that Kafka Streams either needs to open the
> state
> >>>>>>>>> stores at start up or we write the checkpoint file more often.
> >>>>>>>>>
> >>>>>>>>> Writing the .checkpoint file during processing more often without
> >>>>>>>>> controlling the flush to disk would work. However, Kafka Streams
> >>>> would
> >>>>>>>>> checkpoint offsets that are not yet persisted on disk by the
> state
> >>>>>>>>> store. That is with a hard crash the offsets in the .checkpoint
> >>>> file
> >>>>>>>>> might be larger than the offsets checkpointed in the state store.
> >>>> That
> >>>>>>>>> might not be a problem if Kafka Streams uses the .checkpoint file
> >>>> only
> >>>>>>>>> to compute the task lag. The downside is that it makes the
> >>>> managing of
> >>>>>>>>> checkpoints more complex because now we have to maintain two
> >>>>>>>>> checkpoints: one for restoration and one for computing the task
> >>>> lag.
> >>>>>>>>> I think we should explore the option where Kafka Streams opens
> the
> >>>>>>> state
> >>>>>>>>> stores at start up to get the offsets.
> >>>>>>>>>
> >>>>>>>>> I also checked when Kafka Streams needs the checkpointed offsets
> to
> >>>>>>>>> compute the task lag during a rebalance. Turns out Kafka Streams
> >>>> needs
> >>>>>>>>> them before sending the join request. Now, I am wondering if
> >>>> opening
> >>>>>>> the
> >>>>>>>>> state stores of unassigned tasks whose state directory exists
> >>>> locally
> >>>>>>> is
> >>>>>>>>> actually such a big issue due to the expected higher latency
> since
> >>>> it
> >>>>>>>>> happens actually before the Kafka Streams client joins the
> >>>> rebalance.
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Bruno
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On 5/4/24 12:05 AM, Matthias J. Sax wrote:
> >>>>>>>>>> That's good questions... I could think of a few approaches, but
> I
> >>>>>>> admit
> >>>>>>>>>> it might all be a little bit tricky to code up...
> >>>>>>>>>>
> >>>>>>>>>> However if we don't solve this problem, I think this KIP does
> not
> >>>>>>>> really
> >>>>>>>>>> solve the core issue we are facing? In the end, if we rely on
> the
> >>>>>>>>>> `.checkpoint` file to compute a task assignment, but the
> >>>>>>> `.checkpoint`
> >>>>>>>>>> file can be arbitrary stale after a crash because we only write
> it
> >>>>>>> on a
> >>>>>>>>>> clean close, there would be still a huge gap that this KIP does
> >>>> not
> >>>>>>>>> close?
> >>>>>>>>>>
> >>>>>>>>>> For the case in which we keep the checkpoint file, this KIP
> would
> >>>>>>> still
> >>>>>>>>>> help for "soft errors" in which KS can recover, and roll back
> the
> >>>>>>>> store.
> >>>>>>>>>> A significant win for sure. -- But hard crashes would still be
> an
> >>>>>>>>>> problem? We might assign tasks to "wrong" instance, ie, which
> are
> >>>> not
> >>>>>>>>>> most up to date, as the checkpoint information could be very
> >>>>>>> outdated?
> >>>>>>>>>> Would we end up with a half-baked solution? Would this be good
> >>>> enough
> >>>>>>>> to
> >>>>>>>>>> justify the introduced complexity? In the, for soft failures
> it's
> >>>>>>> still
> >>>>>>>>>> a win. Just want to make sure we understand the limitations and
> >>>> make
> >>>>>>> an
> >>>>>>>>>> educated decision.
> >>>>>>>>>>
> >>>>>>>>>> Or do I miss something?
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> -Matthias
> >>>>>>>>>>
> >>>>>>>>>> On 5/3/24 10:20 AM, Bruno Cadonna wrote:
> >>>>>>>>>>> Hi Matthias,
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> 200:
> >>>>>>>>>>> I like the idea in general. However, it is not clear to me how
> >>>> the
> >>>>>>>>>>> behavior should be with multiple stream threads in the same
> Kafka
> >>>>>>>>>>> Streams client. What stream thread opens which store? How can a
> >>>>>>> stream
> >>>>>>>>>>> thread pass an open store to another stream thread that got the
> >>>>>>>>>>> corresponding task assigned? How does a stream thread know
> that a
> >>>>>>> task
> >>>>>>>>>>> was not assigned to any of the stream threads of the Kafka
> >>>> Streams
> >>>>>>>>>>> client? I have the feeling we should just keep the .checkpoint
> >>>> file
> >>>>>>> on
> >>>>>>>>>>> close for now to unblock this KIP and try to find a solution to
> >>>> get
> >>>>>>>>>>> totally rid of it later.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>> Bruno
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On 5/3/24 6:29 PM, Matthias J. Sax wrote:
> >>>>>>>>>>>> 101: Yes, but what I am saying is, that we don't need to flush
> >>>> the
> >>>>>>>>>>>> .position file to disk periodically, but only maintain it in
> >>>> main
> >>>>>>>>>>>> memory, and only write it to disk on close() to preserve it
> >>>> across
> >>>>>>>>>>>> restarts. This way, it would never be ahead, but might only
> lag?
> >>>>>>> But
> >>>>>>>>>>>> with my better understanding about (102) it might be mood
> >>>> anyway...
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> 102: Thanks for clarifying. Looked into the code now. Makes
> >>>> sense.
> >>>>>>>>>>>> Might be something to be worth calling out explicitly in the
> KIP
> >>>>>>>>>>>> writeup. -- Now that I realize that the position is tracked
> >>>> inside
> >>>>>>>>>>>> the store (not outside as the changelog offsets) it makes much
> >>>> more
> >>>>>>>>>>>> sense to pull position into RocksDB itself. In the end, it's
> >>>>>>> actually
> >>>>>>>>>>>> a "store implementation" detail how it tracks the position
> (and
> >>>>>>> kinda
> >>>>>>>>>>>> leaky abstraction currently, that we re-use the checkpoint
> file
> >>>>>>>>>>>> mechanism to track it and flush to disk).
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> 200: I was thinking about this a little bit more, and maybe
> it's
> >>>>>>> not
> >>>>>>>>>>>> too bad? When KS starts up, we could upon all stores we find
> on
> >>>>>>> local
> >>>>>>>>>>>> disk pro-actively, and keep them all open until the first
> >>>> rebalance
> >>>>>>>>>>>> finishes: For tasks we get assigned, we hand in the already
> >>>> opened
> >>>>>>>>>>>> store (this would amortize the cost to open the store before
> the
> >>>>>>>>>>>> rebalance) and for non-assigned tasks, we know the offset
> >>>>>>> information
> >>>>>>>>>>>> won't change and we could just cache it in-memory for later
> >>>> reuse
> >>>>>>>>>>>> (ie, next rebalance) and close the store to free up resources?
> >>>> --
> >>>>>>>>>>>> Assuming that we would get a large percentage of opened stores
> >>>>>>>>>>>> assigned as tasks anyway, this could work?
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 5/3/24 1:29 AM, Bruno Cadonna wrote:
> >>>>>>>>>>>>> Hi Matthias,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 101:
> >>>>>>>>>>>>> Let's assume a RocksDB store, but I think the following might
> >>>> be
> >>>>>>>>>>>>> true also for other store implementations. With this KIP, if
> >>>> Kafka
> >>>>>>>>>>>>> Streams commits the offsets, the committed offsets will be
> >>>> stored
> >>>>>>> in
> >>>>>>>>>>>>> an in-memory data structure (i.e. the memtable) and stay
> there
> >>>>>>> until
> >>>>>>>>>>>>> RocksDB decides that it is time to persist its in-memory data
> >>>>>>>>>>>>> structure. If Kafka Streams writes its position to the
> >>>> .position
> >>>>>>>>>>>>> file during a commit and a crash happens before RocksDB
> persist
> >>>>>>> the
> >>>>>>>>>>>>> memtable then the position in the .position file is ahead of
> >>>> the
> >>>>>>>>>>>>> persisted offset. If IQ is done between the crash and the
> state
> >>>>>>>>>>>>> store fully restored the changelog, the position might tell
> IQ
> >>>>>>> that
> >>>>>>>>>>>>> the state store is more up-to-date than it actually is.
> >>>>>>>>>>>>> In contrast, if Kafka Streams handles persisting positions
> the
> >>>>>>> same
> >>>>>>>>>>>>> as persisting offset, the position should always be
> consistent
> >>>>>>> with
> >>>>>>>>>>>>> the offset, because they are persisted together.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 102:
> >>>>>>>>>>>>> I am confused about your confusion which tells me that we are
> >>>>>>>>>>>>> talking about two different things.
> >>>>>>>>>>>>> You asked
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> "Do you intent to add this information [i.e. position] to the
> >>>> map
> >>>>>>>>>>>>> passed via commit(final Map<TopicPartition, Long>
> >>>>>>>> changelogOffsets)?"
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> and with what I wrote I meant that we do not need to pass the
> >>>>>>>>>>>>> position into the implementation of the StateStore interface
> >>>> since
> >>>>>>>>>>>>> the position is updated within the implementation of the
> >>>>>>> StateStore
> >>>>>>>>>>>>> interface (e.g. RocksDBStore [1]). My statement describes the
> >>>>>>>>>>>>> behavior now, not the change proposed in this KIP, so it does
> >>>> not
> >>>>>>>>>>>>> contradict what is stated in the KIP.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> 200:
> >>>>>>>>>>>>> This is about Matthias' main concern about rebalance
> metadata.
> >>>>>>>>>>>>> As far as I understand the KIP, Kafka Streams will only use
> the
> >>>>>>>>>>>>> .checkpoint files to compute the task lag for unassigned
> tasks
> >>>>>>> whose
> >>>>>>>>>>>>> state is locally available. For assigned tasks, it will use
> the
> >>>>>>>>>>>>> offsets managed by the open state store.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Best,
> >>>>>>>>>>>>> Bruno
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>
> https://github.com/apache/kafka/blob/fcbfd3412eb746a0c81374eb55ad0f73de6b1e71/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L397
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On 5/1/24 3:00 AM, Matthias J. Sax wrote:
> >>>>>>>>>>>>>> Thanks Bruno.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 101: I think I understand this better now. But just want to
> >>>> make
> >>>>>>>>>>>>>> sure I do. What do you mean by "they can diverge" and
> >>>> "Recovering
> >>>>>>>>>>>>>> after a failure might load inconsistent offsets and
> >>>> positions."
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> The checkpoint is the offset from the changelog, while the
> >>>>>>> position
> >>>>>>>>>>>>>> is the offset from the upstream source topic, right? -- In
> the
> >>>>>>> end,
> >>>>>>>>>>>>>> the position is about IQ, and if we fail to update it, it
> only
> >>>>>>>>>>>>>> means that there is some gap when we might not be able to
> >>>> query a
> >>>>>>>>>>>>>> standby task, because we think it's not up-to-date enough
> >>>> even if
> >>>>>>>>>>>>>> it is, which would resolve itself soon? Ie, the position
> might
> >>>>>>>>>>>>>> "lag", but it's not "inconsistent". Do we believe that this
> >>>> lag
> >>>>>>>>>>>>>> would be highly problematic?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 102: I am confused.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> The position is maintained inside the state store, but is
> >>>>>>>>>>>>>>> persisted in the .position file when the state store
> closes.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> This contradicts the KIP:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>    these position offsets will be stored in RocksDB, in the
> >>>> same
> >>>>>>>>>>>>>>> column family as the changelog offsets, instead of the
> >>>> .position
> >>>>>>>>> file
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> My main concern is currently about rebalance metadata --
> >>>> opening
> >>>>>>>>>>>>>> RocksDB stores seems to be very expensive, but if we follow
> >>>> the
> >>>>>>>> KIP:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> We will do this under EOS by updating the .checkpoint file
> >>>>>>>>>>>>>>> whenever a store is close()d.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> It seems, having the offset inside RocksDB does not help us
> at
> >>>>>>> all?
> >>>>>>>>>>>>>> In the end, when we crash, we don't want to lose the state,
> >>>> but
> >>>>>>>>>>>>>> when we update the .checkpoint only on a clean close, the
> >>>>>>>>>>>>>> .checkpoint might be stale (ie, still contains the
> checkpoint
> >>>>>>> when
> >>>>>>>>>>>>>> we opened the store when we got a task assigned).
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On 4/30/24 2:40 AM, Bruno Cadonna wrote:
> >>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 100
> >>>>>>>>>>>>>>> I think we already have such a wrapper. It is called
> >>>>>>>>>>>>>>> AbstractReadWriteDecorator.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 101
> >>>>>>>>>>>>>>> Currently, the position is checkpointed when a offset
> >>>> checkpoint
> >>>>>>>>>>>>>>> is written. If we let the state store manage the committed
> >>>>>>>>>>>>>>> offsets, we need to also let the state store also manage
> the
> >>>>>>>>>>>>>>> position otherwise they might diverge. State store managed
> >>>>>>> offsets
> >>>>>>>>>>>>>>> can get flushed (i.e. checkpointed) to the disk when the
> >>>> state
> >>>>>>>>>>>>>>> store decides to flush its in-memory data structures, but
> the
> >>>>>>>>>>>>>>> position is only checkpointed at commit time. Recovering
> >>>> after a
> >>>>>>>>>>>>>>> failure might load inconsistent offsets and positions.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 102
> >>>>>>>>>>>>>>> The position is maintained inside the state store, but is
> >>>>>>>>>>>>>>> persisted in the .position file when the state store
> closes.
> >>>> The
> >>>>>>>>>>>>>>> only public interface that uses the position is IQv2 in a
> >>>>>>>>>>>>>>> read-only mode. So the position is only updated within the
> >>>> state
> >>>>>>>>>>>>>>> store and read from IQv2. No need to add anything to the
> >>>> public
> >>>>>>>>>>>>>>> StateStore interface.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 103
> >>>>>>>>>>>>>>> Deprecating managesOffsets() right away might be a good
> idea.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> 104
> >>>>>>>>>>>>>>> I agree that we should try to support downgrades without
> >>>> wipes.
> >>>>>>> At
> >>>>>>>>>>>>>>> least Nick should state in the KIP why we do not support
> it.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>> Bruno
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On 4/23/24 8:13 AM, Matthias J. Sax wrote:
> >>>>>>>>>>>>>>>> Thanks for splitting out this KIP. The discussion shows,
> >>>> that
> >>>>>>> it
> >>>>>>>>>>>>>>>> is a complex beast by itself, so worth to discuss by its
> >>>> own.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Couple of question / comment:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 100 `StateStore#commit()`: The JavaDoc says "must not be
> >>>> called
> >>>>>>>>>>>>>>>> by users" -- I would propose to put a guard in place for
> >>>> this,
> >>>>>>> by
> >>>>>>>>>>>>>>>> either throwing an exception (preferable) or adding a
> no-op
> >>>>>>>>>>>>>>>> implementation (at least for our own stores, by wrapping
> >>>> them
> >>>>>>> --
> >>>>>>>>>>>>>>>> we cannot enforce it for custom stores I assume), and
> >>>> document
> >>>>>>>>>>>>>>>> this contract explicitly.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 101 adding `.position` to the store: Why do we actually
> need
> >>>>>>>>>>>>>>>> this? The KIP says "To ensure consistency with the
> committed
> >>>>>>> data
> >>>>>>>>>>>>>>>> and changelog offsets" but I am not sure if I can follow?
> >>>> Can
> >>>>>>> you
> >>>>>>>>>>>>>>>> elaborate why leaving the `.position` file as-is won't
> work?
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> If it's possible at all, it will need to be done by
> >>>>>>>>>>>>>>>>> creating temporary StateManagers and StateStores during
> >>>>>>>>>>>>>>>>> rebalance. I think
> >>>>>>>>>>>>>>>>> it is possible, and probably not too expensive, but the
> >>>> devil
> >>>>>>>>>>>>>>>>> will be in
> >>>>>>>>>>>>>>>>> the detail.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> This sounds like a significant overhead to me. We know
> that
> >>>>>>>>>>>>>>>> opening a single RocksDB takes about 500ms, and thus
> opening
> >>>>>>>>>>>>>>>> RocksDB to get this information might slow down rebalances
> >>>>>>>>>>>>>>>> significantly.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 102: It's unclear to me, how `.position` information is
> >>>> added.
> >>>>>>>>>>>>>>>> The KIP only says: "position offsets will be stored in
> >>>> RocksDB,
> >>>>>>>>>>>>>>>> in the same column family as the changelog offsets". Do
> you
> >>>>>>>>>>>>>>>> intent to add this information to the map passed via
> >>>>>>>>>>>>>>>> `commit(final Map<TopicPartition, Long>
> changelogOffsets)`?
> >>>> The
> >>>>>>>>>>>>>>>> KIP should describe this in more detail. Also, if my
> >>>> assumption
> >>>>>>>>>>>>>>>> is correct, we might want to rename the parameter and also
> >>>>>>> have a
> >>>>>>>>>>>>>>>> better JavaDoc description?
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 103: Should we make it mandatory (long-term) that all
> stores
> >>>>>>>>>>>>>>>> (including custom stores) manage their offsets internally?
> >>>>>>>>>>>>>>>> Maintaining both options and thus both code paths puts a
> >>>> burden
> >>>>>>>>>>>>>>>> on everyone and make the code messy. I would strongly
> >>>> prefer if
> >>>>>>>>>>>>>>>> we could have mid-term path to get rid of supporting both.
> >>>> --
> >>>>>>>>>>>>>>>> For this case, we should deprecate the newly added
> >>>>>>>>>>>>>>>> `managesOffsets()` method right away, to point out that we
> >>>>>>> intend
> >>>>>>>>>>>>>>>> to remove it. If it's mandatory to maintain offsets for
> >>>> stores,
> >>>>>>>>>>>>>>>> we won't need this method any longer. In memory stores can
> >>>> just
> >>>>>>>>>>>>>>>> return null from #committedOffset().
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 104 "downgrading": I think it might be worth to add
> support
> >>>> for
> >>>>>>>>>>>>>>>> downgrading w/o the need to wipe stores? Leveraging
> >>>>>>>>>>>>>>>> `upgrade.from` parameter, we could build a two rolling
> >>>> bounce
> >>>>>>>>>>>>>>>> downgrade: (1) the new code is started with `upgrade.from`
> >>>> set
> >>>>>>> to
> >>>>>>>>>>>>>>>> a lower version, telling the runtime to do the cleanup on
> >>>>>>>>>>>>>>>> `close()` -- (ie, ensure that all data is written into
> >>>>>>>>>>>>>>>> `.checkpoint` and `.position` file, and the newly added CL
> >>>> is
> >>>>>>>>>>>>>>>> deleted). In a second, rolling bounce, the old code would
> be
> >>>>>>> able
> >>>>>>>>>>>>>>>> to open RocksDB. -- I understand that this implies much
> more
> >>>>>>>>>>>>>>>> work, but downgrade seems to be common enough, that it
> >>>> might be
> >>>>>>>>>>>>>>>> worth it? Even if we did not always support this in the
> >>>> past,
> >>>>>>> we
> >>>>>>>>>>>>>>>> have the face the fact that KS is getting more and more
> >>>> adopted
> >>>>>>>>>>>>>>>> and as a more mature product should support this?
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> On 4/21/24 11:58 PM, Bruno Cadonna wrote:
> >>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> How should we proceed here?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> 1. with the plain .checkpoint file
> >>>>>>>>>>>>>>>>> 2. with a way to use the state store interface on
> >>>> unassigned
> >>>>>>> but
> >>>>>>>>>>>>>>>>> locally existing task state
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> While I like option 2, I think option 1 is less risky and
> >>>> will
> >>>>>>>>>>>>>>>>> give us the benefits of transactional state stores
> sooner.
> >>>> We
> >>>>>>>>>>>>>>>>> should consider the interface approach afterwards,
> though.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>> Bruno
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On 4/17/24 3:15 PM, Bruno Cadonna wrote:
> >>>>>>>>>>>>>>>>>> Hi Nick and Sophie,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I think the task ID is not enough to create a state
> store
> >>>>>>> that
> >>>>>>>>>>>>>>>>>> can read the offsets of non-assigned tasks for lag
> >>>>>>> computation
> >>>>>>>>>>>>>>>>>> during rebalancing. The state store also needs the state
> >>>>>>>>>>>>>>>>>> directory so that it knows where to find the information
> >>>> that
> >>>>>>>>>>>>>>>>>> it needs to return from changelogOffsets().
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> In general, I think we should proceed with the plain
> >>>>>>>>>>>>>>>>>> .checkpoint file for now and iterate back to the state
> >>>> store
> >>>>>>>>>>>>>>>>>> solution later since it seems it is not that
> >>>> straightforward.
> >>>>>>>>>>>>>>>>>> Alternatively, Nick could timebox an effort to better
> >>>>>>>>>>>>>>>>>> understand what would be needed for the state store
> >>>> solution.
> >>>>>>>>>>>>>>>>>> Nick, let us know your decision.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Regarding your question about the state store instance.
> I
> >>>> am
> >>>>>>>>>>>>>>>>>> not too familiar with that part of the code, but I think
> >>>> the
> >>>>>>>>>>>>>>>>>> state store is build when the processor topology is
> build
> >>>> and
> >>>>>>>>>>>>>>>>>> the processor topology is build per stream task. So
> there
> >>>> is
> >>>>>>>>>>>>>>>>>> one instance of processor topology and state store per
> >>>> stream
> >>>>>>>>>>>>>>>>>> task. Try to follow the call in [1].
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>> Bruno
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>
> https://github.com/apache/kafka/blob/f52575b17225828d2ff11996030ab7304667deab/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java#L153
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On 4/16/24 8:59 PM, Nick Telford wrote:
> >>>>>>>>>>>>>>>>>>> That does make sense. The one thing I can't figure out
> is
> >>>>>>> how
> >>>>>>>>>>>>>>>>>>> per-Task
> >>>>>>>>>>>>>>>>>>> StateStore instances are constructed.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> It looks like we construct one StateStore instance for
> >>>> the
> >>>>>>>>>>>>>>>>>>> whole Topology
> >>>>>>>>>>>>>>>>>>> (in InternalTopologyBuilder), and pass that into
> >>>>>>>>>>>>>>>>>>> ProcessorStateManager (via
> >>>>>>>>>>>>>>>>>>> StateManagerUtil) for each Task, which then initializes
> >>>> it.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> This can't be the case though, otherwise multiple
> >>>> partitions
> >>>>>>>>>>>>>>>>>>> of the same
> >>>>>>>>>>>>>>>>>>> sub-topology (aka Tasks) would share the same
> StateStore
> >>>>>>>>>>>>>>>>>>> instance, which
> >>>>>>>>>>>>>>>>>>> they don't.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> What am I missing?
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> On Tue, 16 Apr 2024 at 16:22, Sophie Blee-Goldman
> >>>>>>>>>>>>>>>>>>> <sop...@responsive.dev>
> >>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> I don't think we need to *require* a constructor
> accept
> >>>> the
> >>>>>>>>>>>>>>>>>>>> TaskId, but we
> >>>>>>>>>>>>>>>>>>>> would definitely make sure that the RocksDB state
> store
> >>>>>>>>>>>>>>>>>>>> changes its
> >>>>>>>>>>>>>>>>>>>> constructor to one that accepts the TaskID (which we
> >>>> can do
> >>>>>>>>>>>>>>>>>>>> without
> >>>>>>>>>>>>>>>>>>>> deprecation since its an internal API), and custom
> state
> >>>>>>>>>>>>>>>>>>>> stores can just
> >>>>>>>>>>>>>>>>>>>> decide for themselves whether they want to opt-in/use
> >>>> the
> >>>>>>>>>>>>>>>>>>>> TaskId param
> >>>>>>>>>>>>>>>>>>>> or not. I mean custom state stores would have to
> opt-in
> >>>>>>>>>>>>>>>>>>>> anyways by
> >>>>>>>>>>>>>>>>>>>> implementing the new StoreSupplier#get(TaskId) API and
> >>>> the
> >>>>>>>> only
> >>>>>>>>>>>>>>>>>>>> reason to do that would be to have created a
> constructor
> >>>>>>> that
> >>>>>>>>>>>>>>>>>>>> accepts
> >>>>>>>>>>>>>>>>>>>> a TaskId
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Just to be super clear about the proposal, this is
> what
> >>>> I
> >>>>>>> had
> >>>>>>>>>>>>>>>>>>>> in mind.
> >>>>>>>>>>>>>>>>>>>> It's actually fairly simple and wouldn't add much to
> the
> >>>>>>>>>>>>>>>>>>>> scope of the
> >>>>>>>>>>>>>>>>>>>> KIP (I think -- if it turns out to be more complicated
> >>>> than
> >>>>>>>>>>>>>>>>>>>> I'm assuming,
> >>>>>>>>>>>>>>>>>>>> we should definitely do whatever has the smallest LOE
> to
> >>>>>>> get
> >>>>>>>>>>>>>>>>>>>> this done
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Anyways, the (only) public API changes would be to add
> >>>> this
> >>>>>>>> new
> >>>>>>>>>>>>>>>>>>>> method to the StoreSupplier API:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> default T get(final TaskId taskId) {
> >>>>>>>>>>>>>>>>>>>>        return get();
> >>>>>>>>>>>>>>>>>>>> }
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> We can decide whether or not to deprecate the old #get
> >>>> but
> >>>>>>>>>>>>>>>>>>>> it's not
> >>>>>>>>>>>>>>>>>>>> really necessary and might cause a lot of turmoil, so
> >>>> I'd
> >>>>>>>>>>>>>>>>>>>> personally
> >>>>>>>>>>>>>>>>>>>> say we just leave both APIs in place.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> And that's it for public API changes! Internally, we
> >>>> would
> >>>>>>>>>>>>>>>>>>>> just adapt
> >>>>>>>>>>>>>>>>>>>> each of the rocksdb StoreSupplier classes to implement
> >>>> this
> >>>>>>>> new
> >>>>>>>>>>>>>>>>>>>> API. So for example with the
> >>>>>>>> RocksDBKeyValueBytesStoreSupplier,
> >>>>>>>>>>>>>>>>>>>> we just add
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> @Override
> >>>>>>>>>>>>>>>>>>>> public KeyValueStore<Bytes, byte[]> get(final TaskId
> >>>>>>> taskId)
> >>>>>>>> {
> >>>>>>>>>>>>>>>>>>>>        return returnTimestampedStore ?
> >>>>>>>>>>>>>>>>>>>>            new RocksDBTimestampedStore(name,
> >>>> metricsScope(),
> >>>>>>>>>>>>>>>>>>>> taskId) :
> >>>>>>>>>>>>>>>>>>>>            new RocksDBStore(name, metricsScope(),
> >>>> taskId);
> >>>>>>>>>>>>>>>>>>>> }
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> And of course add the TaskId parameter to each of the
> >>>>>>> actual
> >>>>>>>>>>>>>>>>>>>> state store constructors returned here.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> Does that make sense? It's entirely possible I'm
> missing
> >>>>>>>>>>>>>>>>>>>> something
> >>>>>>>>>>>>>>>>>>>> important here, but I think this would be a pretty
> small
> >>>>>>>>>>>>>>>>>>>> addition that
> >>>>>>>>>>>>>>>>>>>> would solve the problem you mentioned earlier while
> also
> >>>>>>>> being
> >>>>>>>>>>>>>>>>>>>> useful to anyone who uses custom state stores.
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On Mon, Apr 15, 2024 at 10:21 AM Nick Telford
> >>>>>>>>>>>>>>>>>>>> <nick.telf...@gmail.com>
> >>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Hi Sophie,
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Interesting idea! Although what would that mean for
> the
> >>>>>>>>>>>>>>>>>>>>> StateStore
> >>>>>>>>>>>>>>>>>>>>> interface? Obviously we can't require that the
> >>>> constructor
> >>>>>>>>>>>>>>>>>>>>> take the
> >>>>>>>>>>>>>>>>>>>> TaskId.
> >>>>>>>>>>>>>>>>>>>>> Is it enough to add the parameter to the
> StoreSupplier?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Would doing this be in-scope for this KIP, or are we
> >>>>>>>>>>>>>>>>>>>>> over-complicating
> >>>>>>>>>>>>>>>>>>>> it?
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> On Fri, 12 Apr 2024 at 21:30, Sophie Blee-Goldman
> >>>>>>>>>>>>>>>>>>>>> <sop...@responsive.dev
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Somewhat minor point overall, but it actually drives
> >>>> me
> >>>>>>>>>>>>>>>>>>>>>> crazy that you
> >>>>>>>>>>>>>>>>>>>>>> can't get access to the taskId of a StateStore until
> >>>>>>> #init
> >>>>>>>>>>>>>>>>>>>>>> is called.
> >>>>>>>>>>>>>>>>>>>>> This
> >>>>>>>>>>>>>>>>>>>>>> has caused me a huge headache personally (since the
> >>>> same
> >>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>> true for
> >>>>>>>>>>>>>>>>>>>>>> processors and I was trying to do something that's
> >>>>>>> probably
> >>>>>>>>>>>>>>>>>>>>>> too hacky
> >>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>> actually complain about here lol)
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Can we just change the StateStoreSupplier to receive
> >>>> and
> >>>>>>>>>>>>>>>>>>>>>> pass along the
> >>>>>>>>>>>>>>>>>>>>>> taskId when creating a new store? Presumably by
> >>>> adding a
> >>>>>>>>>>>>>>>>>>>>>> new version of
> >>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>> #get method that takes in a taskId parameter? We can
> >>>> have
> >>>>>>>>>>>>>>>>>>>>>> it default to
> >>>>>>>>>>>>>>>>>>>>>> invoking the old one for compatibility reasons and
> it
> >>>>>>>>>>>>>>>>>>>>>> should be
> >>>>>>>>>>>>>>>>>>>>> completely
> >>>>>>>>>>>>>>>>>>>>>> safe to tack on.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> Would also prefer the same for a ProcessorSupplier,
> >>>> but
> >>>>>>>>> that's
> >>>>>>>>>>>>>>>>>>>> definitely
> >>>>>>>>>>>>>>>>>>>>>> outside the scope of this KIP
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> On Fri, Apr 12, 2024 at 3:31 AM Nick Telford
> >>>>>>>>>>>>>>>>>>>>>> <nick.telf...@gmail.com>
> >>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> On further thought, it's clear that this can't work
> >>>> for
> >>>>>>>>>>>>>>>>>>>>>>> one simple
> >>>>>>>>>>>>>>>>>>>>>> reason:
> >>>>>>>>>>>>>>>>>>>>>>> StateStores don't know their associated TaskId (and
> >>>>>>> hence,
> >>>>>>>>>>>>>>>>>>>>>>> their
> >>>>>>>>>>>>>>>>>>>>>>> StateDirectory) until the init() call. Therefore,
> >>>>>>>>>>>>>>>>>>>>>>> committedOffset()
> >>>>>>>>>>>>>>>>>>>>> can't
> >>>>>>>>>>>>>>>>>>>>>>> be called before init(), unless we also added a
> >>>>>>>>>>>>>>>>>>>>>>> StateStoreContext
> >>>>>>>>>>>>>>>>>>>>>> argument
> >>>>>>>>>>>>>>>>>>>>>>> to committedOffset(), which I think might be trying
> >>>> to
> >>>>>>>>>>>>>>>>>>>>>>> shoehorn too
> >>>>>>>>>>>>>>>>>>>>> much
> >>>>>>>>>>>>>>>>>>>>>>> into committedOffset().
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> I still don't like the idea of the Streams engine
> >>>>>>>>>>>>>>>>>>>>>>> maintaining the
> >>>>>>>>>>>>>>>>>>>> cache
> >>>>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>> changelog offsets independently of stores, mostly
> >>>>>>> because
> >>>>>>>>>>>>>>>>>>>>>>> of the
> >>>>>>>>>>>>>>>>>>>>>>> maintenance burden of the code duplication, but it
> >>>> looks
> >>>>>>>>>>>>>>>>>>>>>>> like we'll
> >>>>>>>>>>>>>>>>>>>>> have
> >>>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>> live with it.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Unless you have any better ideas?
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> On Wed, 10 Apr 2024 at 14:12, Nick Telford
> >>>>>>>>>>>>>>>>>>>>>>> <nick.telf...@gmail.com>
> >>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Hi Bruno,
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Immediately after I sent my response, I looked at
> >>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> codebase and
> >>>>>>>>>>>>>>>>>>>>> came
> >>>>>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>>>> the same conclusion. If it's possible at all, it
> >>>> will
> >>>>>>>>>>>>>>>>>>>>>>>> need to be
> >>>>>>>>>>>>>>>>>>>> done
> >>>>>>>>>>>>>>>>>>>>>> by
> >>>>>>>>>>>>>>>>>>>>>>>> creating temporary StateManagers and StateStores
> >>>> during
> >>>>>>>>>>>>>>>>>>>>>>>> rebalance.
> >>>>>>>>>>>>>>>>>>>> I
> >>>>>>>>>>>>>>>>>>>>>>> think
> >>>>>>>>>>>>>>>>>>>>>>>> it is possible, and probably not too expensive,
> but
> >>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> devil will
> >>>>>>>>>>>>>>>>>>>> be
> >>>>>>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>>>> the detail.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> I'll try to find some time to explore the idea to
> >>>> see
> >>>>>>> if
> >>>>>>>>>>>>>>>>>>>>>>>> it's
> >>>>>>>>>>>>>>>>>>>>> possible
> >>>>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>> report back, because we'll need to determine this
> >>>>>>> before
> >>>>>>>>>>>>>>>>>>>>>>>> we can
> >>>>>>>>>>>>>>>>>>>> vote
> >>>>>>>>>>>>>>>>>>>>> on
> >>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>> KIP.
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> On Wed, 10 Apr 2024 at 11:36, Bruno Cadonna
> >>>>>>>>>>>>>>>>>>>>>>>> <cado...@apache.org>
> >>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Hi Nick,
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Thanks for reacting on my comments so quickly!
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> 2.
> >>>>>>>>>>>>>>>>>>>>>>>>> Some thoughts on your proposal.
> >>>>>>>>>>>>>>>>>>>>>>>>> State managers (and state stores) are parts of
> >>>> tasks.
> >>>>>>> If
> >>>>>>>>>>>>>>>>>>>>>>>>> the task
> >>>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>>>>> assigned locally, we do not create those tasks.
> To
> >>>> get
> >>>>>>>>>>>>>>>>>>>>>>>>> the offsets
> >>>>>>>>>>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>>>>>>>>>>>> your approach, we would need to either create
> kind
> >>>> of
> >>>>>>>>>>>>>>>>>>>>>>>>> inactive
> >>>>>>>>>>>>>>>>>>>> tasks
> >>>>>>>>>>>>>>>>>>>>>>>>> besides active and standby tasks or store and
> >>>> manage
> >>>>>>>> state
> >>>>>>>>>>>>>>>>>>>> managers
> >>>>>>>>>>>>>>>>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>>>> non-assigned tasks differently than the state
> >>>> managers
> >>>>>>>>>>>>>>>>>>>>>>>>> of assigned
> >>>>>>>>>>>>>>>>>>>>>>>>> tasks. Additionally, the cleanup thread that
> >>>> removes
> >>>>>>>>>>>>>>>>>>>>>>>>> unassigned
> >>>>>>>>>>>>>>>>>>>> task
> >>>>>>>>>>>>>>>>>>>>>>>>> directories needs to concurrently delete those
> >>>>>>> inactive
> >>>>>>>>>>>>>>>>>>>>>>>>> tasks or
> >>>>>>>>>>>>>>>>>>>>>>>>> task-less state managers of unassigned tasks.
> This
> >>>>>>> seems
> >>>>>>>>>>>>>>>>>>>>>>>>> all quite
> >>>>>>>>>>>>>>>>>>>>>> messy
> >>>>>>>>>>>>>>>>>>>>>>>>> to me.
> >>>>>>>>>>>>>>>>>>>>>>>>> Could we create those state managers (or state
> >>>> stores)
> >>>>>>>>>>>>>>>>>>>>>>>>> for locally
> >>>>>>>>>>>>>>>>>>>>>>>>> existing but unassigned tasks on demand when
> >>>>>>>>>>>>>>>>>>>>>>>>> TaskManager#getTaskOffsetSums() is executed? Or
> >>>> have a
> >>>>>>>>>>>>>>>>>>>>>>>>> different
> >>>>>>>>>>>>>>>>>>>>>>>>> encapsulation for the unused task directories?
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>>>>>>> Bruno
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> On 4/10/24 11:31 AM, Nick Telford wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>> Hi Bruno,
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the review!
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> 1, 4, 5.
> >>>>>>>>>>>>>>>>>>>>>>>>>> Done
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> 3.
> >>>>>>>>>>>>>>>>>>>>>>>>>> You're right. I've removed the offending
> >>>> paragraph. I
> >>>>>>>> had
> >>>>>>>>>>>>>>>>>>>>> originally
> >>>>>>>>>>>>>>>>>>>>>>>>>> adapted this from the guarantees outlined in
> >>>> KIP-892.
> >>>>>>>>>>>>>>>>>>>>>>>>>> But it's
> >>>>>>>>>>>>>>>>>>>>>>>>> difficult to
> >>>>>>>>>>>>>>>>>>>>>>>>>> provide these guarantees without the KIP-892
> >>>>>>>> transaction
> >>>>>>>>>>>>>>>>>>>> buffers.
> >>>>>>>>>>>>>>>>>>>>>>>>> Instead,
> >>>>>>>>>>>>>>>>>>>>>>>>>> we'll add the guarantees back into the JavaDoc
> >>>> when
> >>>>>>>>>>>>>>>>>>>>>>>>>> KIP-892
> >>>>>>>>>>>>>>>>>>>> lands.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> 2.
> >>>>>>>>>>>>>>>>>>>>>>>>>> Good point! This is the only part of the KIP
> that
> >>>> was
> >>>>>>>>>>>>>>>>>>>>>> (significantly)
> >>>>>>>>>>>>>>>>>>>>>>>>>> changed when I extracted it from KIP-892. My
> >>>>>>> prototype
> >>>>>>>>>>>>>>>>>>>>>>>>>> currently
> >>>>>>>>>>>>>>>>>>>>>>>>> maintains
> >>>>>>>>>>>>>>>>>>>>>>>>>> this "cache" of changelog offsets in
> .checkpoint,
> >>>> but
> >>>>>>>>>>>>>>>>>>>>>>>>>> doing so
> >>>>>>>>>>>>>>>>>>>>>> becomes
> >>>>>>>>>>>>>>>>>>>>>>>>> very
> >>>>>>>>>>>>>>>>>>>>>>>>>> messy. My intent with this change was to try to
> >>>>>>> better
> >>>>>>>>>>>>>>>>>>>> encapsulate
> >>>>>>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>>>>>> offset "caching", especially for StateStores
> that
> >>>> can
> >>>>>>>>>>>>>>>>>>>>>>>>>> cheaply
> >>>>>>>>>>>>>>>>>>>>>> provide
> >>>>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>> offsets stored directly in them without needing
> to
> >>>>>>>>>>>>>>>>>>>>>>>>>> duplicate
> >>>>>>>>>>>>>>>>>>>> them
> >>>>>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>>>>>> cache.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> It's clear some more work is needed here to
> better
> >>>>>>>>>>>>>>>>>>>>>>>>>> encapsulate
> >>>>>>>>>>>>>>>>>>>>> this.
> >>>>>>>>>>>>>>>>>>>>>>> My
> >>>>>>>>>>>>>>>>>>>>>>>>>> immediate thought is: what if we construct *but
> >>>> don't
> >>>>>>>>>>>>>>>>>>>> initialize*
> >>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>> StateManager and StateStores for every Task
> >>>> directory
> >>>>>>>>>>>>>>>>>>>>>>>>>> on-disk?
> >>>>>>>>>>>>>>>>>>>>> That
> >>>>>>>>>>>>>>>>>>>>>>>>> should
> >>>>>>>>>>>>>>>>>>>>>>>>>> still be quite cheap to do, and would enable us
> to
> >>>>>>>>>>>>>>>>>>>>>>>>>> query the
> >>>>>>>>>>>>>>>>>>>>> offsets
> >>>>>>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>>>>> all on-disk stores, even if they're not open. If
> >>>> the
> >>>>>>>>>>>>>>>>>>>> StateManager
> >>>>>>>>>>>>>>>>>>>>>>> (aka.
> >>>>>>>>>>>>>>>>>>>>>>>>>> ProcessorStateManager/GlobalStateManager) proves
> >>>> too
> >>>>>>>>>>>>>>>>>>>>>>>>>> expensive
> >>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>> hold
> >>>>>>>>>>>>>>>>>>>>>>>>> open
> >>>>>>>>>>>>>>>>>>>>>>>>>> for closed stores, we could always have a
> >>>>>>>>>>>>>>>>>>>>>>>>>> "StubStateManager" in
> >>>>>>>>>>>>>>>>>>>>> its
> >>>>>>>>>>>>>>>>>>>>>>>>> place,
> >>>>>>>>>>>>>>>>>>>>>>>>>> that enables the querying of offsets, but
> nothing
> >>>>>>> else?
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> IDK, what do you think?
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, 9 Apr 2024 at 15:00, Bruno Cadonna
> >>>>>>>>>>>>>>>>>>>>>>>>>> <cado...@apache.org>
> >>>>>>>>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Nick,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for breaking out the KIP from KIP-892!
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Here a couple of comments/questions:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> 1.
> >>>>>>>>>>>>>>>>>>>>>>>>>>> In Kafka Streams, we have a design guideline
> >>>> which
> >>>>>>>>>>>>>>>>>>>>>>>>>>> says to not
> >>>>>>>>>>>>>>>>>>>>> use
> >>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>> "get"-prefix for getters on the public API.
> Could
> >>>>>>> you
> >>>>>>>>>>>>>>>>>>>>>>>>>>> please
> >>>>>>>>>>>>>>>>>>>>> change
> >>>>>>>>>>>>>>>>>>>>>>>>>>> getCommittedOffsets() to committedOffsets()?
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> 2.
> >>>>>>>>>>>>>>>>>>>>>>>>>>> It is not clear to me how
> >>>>>>>>> TaskManager#getTaskOffsetSums()
> >>>>>>>>>>>>>>>>>>>> should
> >>>>>>>>>>>>>>>>>>>>>> read
> >>>>>>>>>>>>>>>>>>>>>>>>>>> offsets of tasks the stream thread does not own
> >>>> but
> >>>>>>>>>>>>>>>>>>>>>>>>>>> that have a
> >>>>>>>>>>>>>>>>>>>>>> state
> >>>>>>>>>>>>>>>>>>>>>>>>>>> directory on the Streams client by calling
> >>>>>>>>>>>>>>>>>>>>>>>>>>> StateStore#getCommittedOffsets(). If the thread
> >>>> does
> >>>>>>>>>>>>>>>>>>>>>>>>>>> not own a
> >>>>>>>>>>>>>>>>>>>>> task
> >>>>>>>>>>>>>>>>>>>>>>> it
> >>>>>>>>>>>>>>>>>>>>>>>>>>> does also not create any state stores for the
> >>>> task,
> >>>>>>>>>>>>>>>>>>>>>>>>>>> which means
> >>>>>>>>>>>>>>>>>>>>>> there
> >>>>>>>>>>>>>>>>>>>>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>>>>>> no state store on which to call
> >>>>>>> getCommittedOffsets().
> >>>>>>>>>>>>>>>>>>>>>>>>>>> I would have rather expected that a checkpoint
> >>>> file
> >>>>>>> is
> >>>>>>>>>>>>>>>>>>>>>>>>>>> written
> >>>>>>>>>>>>>>>>>>>>> for
> >>>>>>>>>>>>>>>>>>>>>>> all
> >>>>>>>>>>>>>>>>>>>>>>>>>>> state stores on close -- not only for the
> >>>>>>> RocksDBStore
> >>>>>>>>>>>>>>>>>>>>>>>>>>> -- and
> >>>>>>>>>>>>>>>>>>>>> that
> >>>>>>>>>>>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>>>>>>>>>>>>>> checkpoint file is read in
> >>>>>>>>>>>>>>>>>>>>>>>>>>> TaskManager#getTaskOffsetSums() for
> >>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>> tasks
> >>>>>>>>>>>>>>>>>>>>>>>>>>> that have a state directory on the client but
> are
> >>>>>>> not
> >>>>>>>>>>>>>>>>>>>>>>>>>>> currently
> >>>>>>>>>>>>>>>>>>>>>>>>> assigned
> >>>>>>>>>>>>>>>>>>>>>>>>>>> to any stream thread of the Streams client.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> 3.
> >>>>>>>>>>>>>>>>>>>>>>>>>>> In the javadocs for commit() you write
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> "... all writes since the last commit(Map), or
> >>>> since
> >>>>>>>>>>>>>>>>>>>>>> init(StateStore)
> >>>>>>>>>>>>>>>>>>>>>>>>>>> *MUST* be available to readers, even after a
> >>>>>>> restart."
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> This is only true for a clean close before the
> >>>>>>>>>>>>>>>>>>>>>>>>>>> restart, isn't
> >>>>>>>>>>>>>>>>>>>> it?
> >>>>>>>>>>>>>>>>>>>>>>>>>>> If the task fails with a dirty close, Kafka
> >>>> Streams
> >>>>>>>>>>>>>>>>>>>>>>>>>>> cannot
> >>>>>>>>>>>>>>>>>>>>>> guarantee
> >>>>>>>>>>>>>>>>>>>>>>>>>>> that the in-memory structures of the state
> store
> >>>>>>> (e.g.
> >>>>>>>>>>>>>>>>>>>>>>>>>>> memtable
> >>>>>>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>> case of RocksDB) are flushed so that the
> records
> >>>> and
> >>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>> committed
> >>>>>>>>>>>>>>>>>>>>>>>>>>> offsets are persisted.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> 4.
> >>>>>>>>>>>>>>>>>>>>>>>>>>> The wrapper that provides the legacy
> >>>> checkpointing
> >>>>>>>>>>>>>>>>>>>>>>>>>>> behavior is
> >>>>>>>>>>>>>>>>>>>>>>> actually
> >>>>>>>>>>>>>>>>>>>>>>>>>>> an implementation detail. I would remove it
> from
> >>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>> KIP, but
> >>>>>>>>>>>>>>>>>>>>> still
> >>>>>>>>>>>>>>>>>>>>>>>>>>> state that the legacy checkpointing behavior
> >>>> will be
> >>>>>>>>>>>>>>>>>>>>>>>>>>> supported
> >>>>>>>>>>>>>>>>>>>>> when
> >>>>>>>>>>>>>>>>>>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>> state store does not manage the checkpoints.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> 5.
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Regarding the metrics, could you please add the
> >>>>>>> tags,
> >>>>>>>>>>>>>>>>>>>>>>>>>>> and the
> >>>>>>>>>>>>>>>>>>>>>>> recording
> >>>>>>>>>>>>>>>>>>>>>>>>>>> level (DEBUG or INFO) as done in KIP-607 or
> >>>> KIP-444.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Bruno
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>> On 4/7/24 5:35 PM, Nick Telford wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi everyone,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Based on some offline discussion, I've split
> out
> >>>>>>> the
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> "Atomic
> >>>>>>>>>>>>>>>>>>>>>>>>>>> Checkpointing"
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> section from KIP-892: Transactional Semantics
> >>>> for
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> StateStores,
> >>>>>>>>>>>>>>>>>>>>>> into
> >>>>>>>>>>>>>>>>>>>>>>>>> its
> >>>>>>>>>>>>>>>>>>>>>>>>>>> own
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> KIP
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> KIP-1035: StateStore managed changelog offsets
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> While KIP-892 was adopted *with* the changes
> >>>>>>> outlined
> >>>>>>>>> in
> >>>>>>>>>>>>>>>>>>>>> KIP-1035,
> >>>>>>>>>>>>>>>>>>>>>>>>> these
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> changes were always the most contentious part,
> >>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> continued
> >>>>>>>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>>>>>>>> spur
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> discussion even after KIP-892 was adopted.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> All the changes introduced in KIP-1035 have
> been
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> removed from
> >>>>>>>>>>>>>>>>>>>>>>> KIP-892,
> >>>>>>>>>>>>>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> a hard dependency on KIP-1035 has been added
> to
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> KIP-892 in
> >>>>>>>>>>>>>>>>>>>> their
> >>>>>>>>>>>>>>>>>>>>>>>>> place.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> I'm hopeful that with some more focus on this
> >>>> set
> >>>>>>> of
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> changes,
> >>>>>>>>>>>>>>>>>>>> we
> >>>>>>>>>>>>>>>>>>>>>> can
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> deliver something that we're all happy with.
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards,
> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Nick
> >>>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>
> >>>
> >
>

Reply via email to