Hi everyone,

I didn't spot this before, but it looks like the API of
KeyValueStoreTestDriver will need to be updated to change the nomenclature
from "flushed" to "committed":

numFlushedEntryRemoved() -> numCommittedEntryRemoved()
numFlushedEntryStored() -> numCommittedEntryStored()
flushedEntryRemoved(K) -> committedEntryRemoved(K)
flushedEntryStored(K) -> committedEntryStored(K)

The old methods will obviously be marked as @Deprecated.

Any objections before I add this to the KIP?

Regards,
Nick


On Wed, 29 May 2024 at 11:20, Nick Telford <nick.telf...@gmail.com> wrote:

> I've updated the KIP with the following:
>
>    - Deprecation of StateStore#managesOffsets
>    - Change StateStore#commit to throw UnsupportedOperationException when
>    called from a Processor (via AbstractReadWriteDecorator)
>    - Updated consumer rebalance lag computation strategy
>    
> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets#KIP1035:StateStoremanagedchangelogoffsets-ConsumerRebalanceMetadata>
>    based on our Meet discussion
>       - I've added a bit more detail here than we discussed, in
>       particular around how we handle the offsets for tasks assigned to our 
> local
>       instance, and how we handle offsets when Tasks are closed/revoked.
>    - Improved downgrade behaviour
>       - Note: users that don't downgrade with upgrade.from will still get
>       the wipe-and-restore behaviour by-default.
>
> I believe this covers all the outstanding changes that were requested.
> Please let me know if I've missed anything or you think further changes are
> needed.
>
> Regards,
> Nick
>
> On Wed, 29 May 2024 at 09:28, Nick Telford <nick.telf...@gmail.com> wrote:
>
>> Hi everyone,
>>
>> Sorry I haven't got around to updating the KIP yet. Now that I've wrapped
>> up KIP-989, I'm going to be working on 1035 starting today.
>>
>> I'll update the KIP first, and then call a vote.
>>
>> Regards,
>> Nick
>>
>> On Wed, 29 May 2024 at 07:25, Bruno Cadonna <cado...@apache.org> wrote:
>>
>>> Totally agree on moving forward and starting the VOTE!
>>>
>>> However, the KIP should be updated with the new info before starting the
>>> VOTE.
>>>
>>> Best,
>>> Bruno
>>>
>>> On 5/29/24 2:36 AM, Matthias J. Sax wrote:
>>> > Sounds like a good plan. -- I think we are still wrapping up 3.8
>>> > release, but would also like to move forward with with one.
>>> >
>>> > Should we start a VOTE?
>>> >
>>> > For merging PRs we need to wait after code freeze, and 3.8 branch was
>>> > but. But we could start reviewing PRs before this already.
>>> >
>>> >
>>> > -Matthias
>>> >
>>> > On 5/17/24 3:05 AM, Nick Telford wrote:
>>> >> Hi everyone,
>>> >>
>>> >> As discussed on the Zoom call, we're going to handle rebalance
>>> >> meta-data by:
>>> >>
>>> >> - On start-up, Streams will open each store and read its changelog
>>> >> offsets
>>> >> into an in-memory cache. This cache will be shared among all
>>> >> StreamThreads.
>>> >> - On rebalance, the cache will be consulted for Task offsets for any
>>> Task
>>> >> that is not active on any instance-local StreamThreads. If the Task is
>>> >> active on *any* instance-local StreamThread, we will report the Task
>>> >> lag as
>>> >> "up to date" (i.e. -1), because we know that the local state is
>>> currently
>>> >> up-to-date.
>>> >>
>>> >> We will avoid caching offsets across restarts in the legacy
>>> ".checkpoint"
>>> >> file, so that we can eliminate the logic for handling this class. If
>>> >> performance of opening/closing many state stores is poor, we can
>>> >> parallelise it by forking off a thread for each Task directory when
>>> >> reading
>>> >> the offsets.
>>> >>
>>> >> I'll update the KIP later today to reflect this design, but I will
>>> try to
>>> >> keep it high-level, so that the exact implementation can vary.
>>> >>
>>> >> Regards,
>>> >>
>>> >> Nick
>>> >>
>>> >> On Thu, 16 May 2024 at 03:12, Sophie Blee-Goldman <
>>> sop...@responsive.dev>
>>> >> wrote:
>>> >>
>>> >>> 103: I like the idea of immediately deprecating #managesOffsets and
>>> >>> aiming
>>> >>> to make offset management mandatory in the long run. I assume we
>>> >>> would also
>>> >>> log a warning for any custom stores that return "false" from this
>>> >>> method to
>>> >>> encourage custom store implementations to start doing so? My only
>>> >>> question/concern is that if we want folks to start managing their own
>>> >>> offsets then we should make this transition easy for them, perhaps by
>>> >>> exposing some public utility APIs for things that are currently
>>> >>> handled by
>>> >>> Kafka Streams such as reading/writing checkpoint files. Maybe it
>>> >>> would be
>>> >>> useful to include a small example in the KIP of what it would
>>> >>> actually mean
>>> >>> to "manage your own offsets" -- I know (all too well) that plugging
>>> in
>>> >>> custom storage implementations is not easy and most people who do
>>> >>> this are
>>> >>> probably fairly advanced users, but offset management will be a
>>> >>> totally new
>>> >>> ballgame to most people people and this kind of feels like throwing
>>> them
>>> >>> off the deep end. We should at least provide a lifejacket via some
>>> >>> kind of
>>> >>> utility API and/or example
>>> >>>
>>> >>> 200. There's been a lot of back and forth on the rebalance
>>> metadata/task
>>> >>> lag computation question, so forgive me if I missed any part of
>>> this,
>>> >>> but I
>>> >>> think we've landed at the right idea here. To summarize: the "tl;dr"
>>> >>> explanation is that we'll write the checkpoint file only on close
>>> and
>>> >>> will
>>> >>> account for hard-crash scenarios by opening up the stores on startup
>>> and
>>> >>> writing a checkpoint file for any missing tasks. Does that sound
>>> about
>>> >>> right?
>>> >>>
>>> >>> A few clarifications:
>>> >>> I think we're all more or less on the same page here but just to be
>>> >>> absolutely clear, the task lags for each task directory found on
>>> disk
>>> >>> will
>>> >>> be reported by only one of the StreamThreads, and each StreamThread
>>> will
>>> >>> report lags only for tasks that it already owns or are not assigned
>>> >>> to any
>>> >>> other StreamThread in the client. In other words, we only need to
>>> get
>>> >>> the
>>> >>> task lag for completely unassigned/unlocked tasks, which means if
>>> >>> there is
>>> >>> a checkpoint file at all then it must be up-to-date, because there
>>> is no
>>> >>> other StreamThread actively writing to that state store (if so then
>>> only
>>> >>> that StreamThread would report lag for that particular task).
>>> >>>
>>> >>> This still leaves the "no checkpoint at all" case which as previously
>>> >>> mentioned can occur after a  hard-crash. Luckily we only have to
>>> worry
>>> >>> about this once, after starting up again following said hard crash.
>>> >>> We can
>>> >>> simply open up each of the state stores before ever joining the
>>> >>> group, get
>>> >>> the offsets from rocksdb, and write them to a new checkpoint file.
>>> After
>>> >>> that, we can depend on the checkpoints written at close and won't
>>> >>> have to
>>> >>> open up any stores that aren't already assigned for the reasons laid
>>> >>> out in
>>> >>> the paragraph above.
>>> >>>
>>> >>> As for the specific mechanism and which thread-does-what, since
>>> there
>>> >>> were
>>> >>> some questions, this is how I'm imagining the process:
>>> >>>
>>> >>>     1.   The general idea is that we simply go through each task
>>> >>> directories
>>> >>>     with state but no checkpoint file and open the StateStore, call
>>> >>>     #committedOffset, and then write it to the checkpoint file. We
>>> >>> can then
>>> >>>     close these stores and let things proceed as normal.
>>> >>>     2.  This only has to happen once, during startup, but we have two
>>> >>>     options:
>>> >>>        1. Do this from KafkaStreams#start, ie before we even create
>>> the
>>> >>>        StreamThreads
>>> >>>        2.  Do this from StreamThread#start, following a similar
>>> >>> lock-based
>>> >>>        approach to the one used #computeTaskLags, where each
>>> >>> StreamThread
>>> >>> just
>>> >>>        makes a pass over the task directories on disk and attempts
>>> to
>>> >>> lock
>>> >>> them
>>> >>>        one by one. If they obtain the lock, check whether there is
>>> state
>>> >>> but no
>>> >>>        checkpoint, and write the checkpoint if needed. If it can't
>>> grab
>>> >>> the lock,
>>> >>>        then we know one of the other StreamThreads must be handling
>>> the
>>> >>> checkpoint
>>> >>>        file for that task directory, and we can move on.
>>> >>>
>>> >>> Don't really feel too strongly about which approach is best,  doing
>>> >>> it in
>>> >>> KafkaStreams#start is certainly the most simple while doing it in the
>>> >>> StreamThread's startup is more efficient. If we're worried about
>>> >>> adding too
>>> >>> much weight to KafkaStreams#start then the 2nd option is probably
>>> best,
>>> >>> though slightly more complicated.
>>> >>>
>>> >>> Thoughts?
>>> >>>
>>> >>> On Tue, May 14, 2024 at 10:02 AM Nick Telford <
>>> nick.telf...@gmail.com>
>>> >>> wrote:
>>> >>>
>>> >>>> Hi everyone,
>>> >>>>
>>> >>>> Sorry for the delay in replying. I've finally now got some time to
>>> work
>>> >>> on
>>> >>>> this.
>>> >>>>
>>> >>>> Addressing Matthias's comments:
>>> >>>>
>>> >>>> 100.
>>> >>>> Good point. As Bruno mentioned, there's already
>>> >>> AbstractReadWriteDecorator
>>> >>>> which we could leverage to provide that protection. I'll add
>>> details on
>>> >>>> this to the KIP.
>>> >>>>
>>> >>>> 101,102.
>>> >>>> It looks like these points have already been addressed by Bruno.
>>> Let me
>>> >>>> know if anything here is still unclear or you feel needs to be
>>> detailed
>>> >>>> more in the KIP.
>>> >>>>
>>> >>>> 103.
>>> >>>> I'm in favour of anything that gets the old code removed sooner, but
>>> >>>> wouldn't deprecating an API that we expect (some) users to implement
>>> >>> cause
>>> >>>> problems?
>>> >>>> I'm thinking about implementers of custom StateStores, as they may
>>> be
>>> >>>> confused by managesOffsets() being deprecated, especially since they
>>> >>> would
>>> >>>> have to mark their implementation as @Deprecated in order to avoid
>>> >>> compile
>>> >>>> warnings.
>>> >>>> If deprecating an API *while it's still expected to be implemented*
>>> is
>>> >>>> something that's generally done in the project, then I'm happy to
>>> do so
>>> >>>> here.
>>> >>>>
>>> >>>> 104.
>>> >>>> I think this is technically possible, but at the cost of
>>> considerable
>>> >>>> additional code to maintain. Would we ever have a pathway to remove
>>> >>>> this
>>> >>>> downgrade code in the future?
>>> >>>>
>>> >>>>
>>> >>>> Regarding rebalance metadata:
>>> >>>> Opening all stores on start-up to read and cache their offsets is an
>>> >>>> interesting idea, especially if we can avoid re-opening the stores
>>> once
>>> >>> the
>>> >>>> Tasks have been assigned. Scalability shouldn't be too much of a
>>> >>>> problem,
>>> >>>> because typically users have a fairly short state.cleanup.delay, so
>>> the
>>> >>>> number of on-disk Task directories should rarely exceed the number
>>> of
>>> >>> Tasks
>>> >>>> previously assigned to that instance.
>>> >>>> An advantage of this approach is that it would also simplify
>>> StateStore
>>> >>>> implementations, as they would only need to guarantee that committed
>>> >>>> offsets are available when the store is open.
>>> >>>>
>>> >>>> I'll investigate this approach this week for feasibility and report
>>> >>>> back.
>>> >>>>
>>> >>>> I think that covers all the outstanding feedback, unless I missed
>>> >>> anything?
>>> >>>>
>>> >>>> Regards,
>>> >>>> Nick
>>> >>>>
>>> >>>> On Mon, 6 May 2024 at 14:06, Bruno Cadonna <cado...@apache.org>
>>> wrote:
>>> >>>>
>>> >>>>> Hi Matthias,
>>> >>>>>
>>> >>>>> I see what you mean.
>>> >>>>>
>>> >>>>> To sum up:
>>> >>>>>
>>> >>>>> With this KIP the .checkpoint file is written when the store
>>> closes.
>>> >>>>> That is when:
>>> >>>>> 1. a task moves away from Kafka Streams client
>>> >>>>> 2. Kafka Streams client shuts down
>>> >>>>>
>>> >>>>> A Kafka Streams client needs the information in the .checkpoint
>>> file
>>> >>>>> 1. on startup because it does not have any open stores yet.
>>> >>>>> 2. during rebalances for non-empty state directories of tasks that
>>> are
>>> >>>>> not assigned to the Kafka Streams client.
>>> >>>>>
>>> >>>>> With hard crashes, i.e., when the Streams client is not able to
>>> close
>>> >>>>> its state stores and write the .checkpoint file, the .checkpoint
>>> file
>>> >>>>> might be quite stale. That influences the next rebalance after
>>> >>>>> failover
>>> >>>>> negatively.
>>> >>>>>
>>> >>>>>
>>> >>>>> My conclusion is that Kafka Streams either needs to open the state
>>> >>>>> stores at start up or we write the checkpoint file more often.
>>> >>>>>
>>> >>>>> Writing the .checkpoint file during processing more often without
>>> >>>>> controlling the flush to disk would work. However, Kafka Streams
>>> would
>>> >>>>> checkpoint offsets that are not yet persisted on disk by the state
>>> >>>>> store. That is with a hard crash the offsets in the .checkpoint
>>> file
>>> >>>>> might be larger than the offsets checkpointed in the state store.
>>> That
>>> >>>>> might not be a problem if Kafka Streams uses the .checkpoint file
>>> only
>>> >>>>> to compute the task lag. The downside is that it makes the
>>> managing of
>>> >>>>> checkpoints more complex because now we have to maintain two
>>> >>>>> checkpoints: one for restoration and one for computing the task
>>> lag.
>>> >>>>> I think we should explore the option where Kafka Streams opens the
>>> >>> state
>>> >>>>> stores at start up to get the offsets.
>>> >>>>>
>>> >>>>> I also checked when Kafka Streams needs the checkpointed offsets to
>>> >>>>> compute the task lag during a rebalance. Turns out Kafka Streams
>>> needs
>>> >>>>> them before sending the join request. Now, I am wondering if
>>> opening
>>> >>> the
>>> >>>>> state stores of unassigned tasks whose state directory exists
>>> locally
>>> >>> is
>>> >>>>> actually such a big issue due to the expected higher latency since
>>> it
>>> >>>>> happens actually before the Kafka Streams client joins the
>>> rebalance.
>>> >>>>>
>>> >>>>> Best,
>>> >>>>> Bruno
>>> >>>>>
>>> >>>>>
>>> >>>>>
>>> >>>>>
>>> >>>>>
>>> >>>>>
>>> >>>>>
>>> >>>>> On 5/4/24 12:05 AM, Matthias J. Sax wrote:
>>> >>>>>> That's good questions... I could think of a few approaches, but I
>>> >>> admit
>>> >>>>>> it might all be a little bit tricky to code up...
>>> >>>>>>
>>> >>>>>> However if we don't solve this problem, I think this KIP does not
>>> >>>> really
>>> >>>>>> solve the core issue we are facing? In the end, if we rely on the
>>> >>>>>> `.checkpoint` file to compute a task assignment, but the
>>> >>> `.checkpoint`
>>> >>>>>> file can be arbitrary stale after a crash because we only write it
>>> >>> on a
>>> >>>>>> clean close, there would be still a huge gap that this KIP does
>>> not
>>> >>>>> close?
>>> >>>>>>
>>> >>>>>> For the case in which we keep the checkpoint file, this KIP would
>>> >>> still
>>> >>>>>> help for "soft errors" in which KS can recover, and roll back the
>>> >>>> store.
>>> >>>>>> A significant win for sure. -- But hard crashes would still be an
>>> >>>>>> problem? We might assign tasks to "wrong" instance, ie, which are
>>> not
>>> >>>>>> most up to date, as the checkpoint information could be very
>>> >>> outdated?
>>> >>>>>> Would we end up with a half-baked solution? Would this be good
>>> enough
>>> >>>> to
>>> >>>>>> justify the introduced complexity? In the, for soft failures it's
>>> >>> still
>>> >>>>>> a win. Just want to make sure we understand the limitations and
>>> make
>>> >>> an
>>> >>>>>> educated decision.
>>> >>>>>>
>>> >>>>>> Or do I miss something?
>>> >>>>>>
>>> >>>>>>
>>> >>>>>> -Matthias
>>> >>>>>>
>>> >>>>>> On 5/3/24 10:20 AM, Bruno Cadonna wrote:
>>> >>>>>>> Hi Matthias,
>>> >>>>>>>
>>> >>>>>>>
>>> >>>>>>> 200:
>>> >>>>>>> I like the idea in general. However, it is not clear to me how
>>> the
>>> >>>>>>> behavior should be with multiple stream threads in the same Kafka
>>> >>>>>>> Streams client. What stream thread opens which store? How can a
>>> >>> stream
>>> >>>>>>> thread pass an open store to another stream thread that got the
>>> >>>>>>> corresponding task assigned? How does a stream thread know that a
>>> >>> task
>>> >>>>>>> was not assigned to any of the stream threads of the Kafka
>>> Streams
>>> >>>>>>> client? I have the feeling we should just keep the .checkpoint
>>> file
>>> >>> on
>>> >>>>>>> close for now to unblock this KIP and try to find a solution to
>>> get
>>> >>>>>>> totally rid of it later.
>>> >>>>>>>
>>> >>>>>>>
>>> >>>>>>> Best,
>>> >>>>>>> Bruno
>>> >>>>>>>
>>> >>>>>>>
>>> >>>>>>>
>>> >>>>>>> On 5/3/24 6:29 PM, Matthias J. Sax wrote:
>>> >>>>>>>> 101: Yes, but what I am saying is, that we don't need to flush
>>> the
>>> >>>>>>>> .position file to disk periodically, but only maintain it in
>>> main
>>> >>>>>>>> memory, and only write it to disk on close() to preserve it
>>> across
>>> >>>>>>>> restarts. This way, it would never be ahead, but might only lag?
>>> >>> But
>>> >>>>>>>> with my better understanding about (102) it might be mood
>>> anyway...
>>> >>>>>>>>
>>> >>>>>>>>
>>> >>>>>>>> 102: Thanks for clarifying. Looked into the code now. Makes
>>> sense.
>>> >>>>>>>> Might be something to be worth calling out explicitly in the KIP
>>> >>>>>>>> writeup. -- Now that I realize that the position is tracked
>>> inside
>>> >>>>>>>> the store (not outside as the changelog offsets) it makes much
>>> more
>>> >>>>>>>> sense to pull position into RocksDB itself. In the end, it's
>>> >>> actually
>>> >>>>>>>> a "store implementation" detail how it tracks the position (and
>>> >>> kinda
>>> >>>>>>>> leaky abstraction currently, that we re-use the checkpoint file
>>> >>>>>>>> mechanism to track it and flush to disk).
>>> >>>>>>>>
>>> >>>>>>>>
>>> >>>>>>>> 200: I was thinking about this a little bit more, and maybe it's
>>> >>> not
>>> >>>>>>>> too bad? When KS starts up, we could upon all stores we find on
>>> >>> local
>>> >>>>>>>> disk pro-actively, and keep them all open until the first
>>> rebalance
>>> >>>>>>>> finishes: For tasks we get assigned, we hand in the already
>>> opened
>>> >>>>>>>> store (this would amortize the cost to open the store before the
>>> >>>>>>>> rebalance) and for non-assigned tasks, we know the offset
>>> >>> information
>>> >>>>>>>> won't change and we could just cache it in-memory for later
>>> reuse
>>> >>>>>>>> (ie, next rebalance) and close the store to free up resources?
>>> --
>>> >>>>>>>> Assuming that we would get a large percentage of opened stores
>>> >>>>>>>> assigned as tasks anyway, this could work?
>>> >>>>>>>>
>>> >>>>>>>>
>>> >>>>>>>> -Matthias
>>> >>>>>>>>
>>> >>>>>>>> On 5/3/24 1:29 AM, Bruno Cadonna wrote:
>>> >>>>>>>>> Hi Matthias,
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>> 101:
>>> >>>>>>>>> Let's assume a RocksDB store, but I think the following might
>>> be
>>> >>>>>>>>> true also for other store implementations. With this KIP, if
>>> Kafka
>>> >>>>>>>>> Streams commits the offsets, the committed offsets will be
>>> stored
>>> >>> in
>>> >>>>>>>>> an in-memory data structure (i.e. the memtable) and stay there
>>> >>> until
>>> >>>>>>>>> RocksDB decides that it is time to persist its in-memory data
>>> >>>>>>>>> structure. If Kafka Streams writes its position to the
>>> .position
>>> >>>>>>>>> file during a commit and a crash happens before RocksDB persist
>>> >>> the
>>> >>>>>>>>> memtable then the position in the .position file is ahead of
>>> the
>>> >>>>>>>>> persisted offset. If IQ is done between the crash and the state
>>> >>>>>>>>> store fully restored the changelog, the position might tell IQ
>>> >>> that
>>> >>>>>>>>> the state store is more up-to-date than it actually is.
>>> >>>>>>>>> In contrast, if Kafka Streams handles persisting positions the
>>> >>> same
>>> >>>>>>>>> as persisting offset, the position should always be consistent
>>> >>> with
>>> >>>>>>>>> the offset, because they are persisted together.
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>> 102:
>>> >>>>>>>>> I am confused about your confusion which tells me that we are
>>> >>>>>>>>> talking about two different things.
>>> >>>>>>>>> You asked
>>> >>>>>>>>>
>>> >>>>>>>>> "Do you intent to add this information [i.e. position] to the
>>> map
>>> >>>>>>>>> passed via commit(final Map<TopicPartition, Long>
>>> >>>> changelogOffsets)?"
>>> >>>>>>>>>
>>> >>>>>>>>> and with what I wrote I meant that we do not need to pass the
>>> >>>>>>>>> position into the implementation of the StateStore interface
>>> since
>>> >>>>>>>>> the position is updated within the implementation of the
>>> >>> StateStore
>>> >>>>>>>>> interface (e.g. RocksDBStore [1]). My statement describes the
>>> >>>>>>>>> behavior now, not the change proposed in this KIP, so it does
>>> not
>>> >>>>>>>>> contradict what is stated in the KIP.
>>> >>>>>>>>>
>>> >>>>>>>>>
>>> >>>>>>>>> 200:
>>> >>>>>>>>> This is about Matthias' main concern about rebalance metadata.
>>> >>>>>>>>> As far as I understand the KIP, Kafka Streams will only use the
>>> >>>>>>>>> .checkpoint files to compute the task lag for unassigned tasks
>>> >>> whose
>>> >>>>>>>>> state is locally available. For assigned tasks, it will use the
>>> >>>>>>>>> offsets managed by the open state store.
>>> >>>>>>>>>
>>> >>>>>>>>> Best,
>>> >>>>>>>>> Bruno
>>> >>>>>>>>>
>>> >>>>>>>>> [1]
>>> >>>>>>>>>
>>> >>>>>
>>> >>>>
>>> >>>
>>> https://github.com/apache/kafka/blob/fcbfd3412eb746a0c81374eb55ad0f73de6b1e71/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L397
>>> >>>>>>>>>
>>> >>>>>>>>> On 5/1/24 3:00 AM, Matthias J. Sax wrote:
>>> >>>>>>>>>> Thanks Bruno.
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>> 101: I think I understand this better now. But just want to
>>> make
>>> >>>>>>>>>> sure I do. What do you mean by "they can diverge" and
>>> "Recovering
>>> >>>>>>>>>> after a failure might load inconsistent offsets and
>>> positions."
>>> >>>>>>>>>>
>>> >>>>>>>>>> The checkpoint is the offset from the changelog, while the
>>> >>> position
>>> >>>>>>>>>> is the offset from the upstream source topic, right? -- In the
>>> >>> end,
>>> >>>>>>>>>> the position is about IQ, and if we fail to update it, it only
>>> >>>>>>>>>> means that there is some gap when we might not be able to
>>> query a
>>> >>>>>>>>>> standby task, because we think it's not up-to-date enough
>>> even if
>>> >>>>>>>>>> it is, which would resolve itself soon? Ie, the position might
>>> >>>>>>>>>> "lag", but it's not "inconsistent". Do we believe that this
>>> lag
>>> >>>>>>>>>> would be highly problematic?
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>> 102: I am confused.
>>> >>>>>>>>>>
>>> >>>>>>>>>>> The position is maintained inside the state store, but is
>>> >>>>>>>>>>> persisted in the .position file when the state store closes.
>>> >>>>>>>>>>
>>> >>>>>>>>>> This contradicts the KIP:
>>> >>>>>>>>>>
>>> >>>>>>>>>>>   these position offsets will be stored in RocksDB, in the
>>> same
>>> >>>>>>>>>>> column family as the changelog offsets, instead of the
>>> .position
>>> >>>>> file
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>> My main concern is currently about rebalance metadata --
>>> opening
>>> >>>>>>>>>> RocksDB stores seems to be very expensive, but if we follow
>>> the
>>> >>>> KIP:
>>> >>>>>>>>>>
>>> >>>>>>>>>>> We will do this under EOS by updating the .checkpoint file
>>> >>>>>>>>>>> whenever a store is close()d.
>>> >>>>>>>>>>
>>> >>>>>>>>>> It seems, having the offset inside RocksDB does not help us at
>>> >>> all?
>>> >>>>>>>>>> In the end, when we crash, we don't want to lose the state,
>>> but
>>> >>>>>>>>>> when we update the .checkpoint only on a clean close, the
>>> >>>>>>>>>> .checkpoint might be stale (ie, still contains the checkpoint
>>> >>> when
>>> >>>>>>>>>> we opened the store when we got a task assigned).
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>>
>>> >>>>>>>>>> -Matthias
>>> >>>>>>>>>>
>>> >>>>>>>>>> On 4/30/24 2:40 AM, Bruno Cadonna wrote:
>>> >>>>>>>>>>> Hi all,
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> 100
>>> >>>>>>>>>>> I think we already have such a wrapper. It is called
>>> >>>>>>>>>>> AbstractReadWriteDecorator.
>>> >>>>>>>>>>>
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> 101
>>> >>>>>>>>>>> Currently, the position is checkpointed when a offset
>>> checkpoint
>>> >>>>>>>>>>> is written. If we let the state store manage the committed
>>> >>>>>>>>>>> offsets, we need to also let the state store also manage the
>>> >>>>>>>>>>> position otherwise they might diverge. State store managed
>>> >>> offsets
>>> >>>>>>>>>>> can get flushed (i.e. checkpointed) to the disk when the
>>> state
>>> >>>>>>>>>>> store decides to flush its in-memory data structures, but the
>>> >>>>>>>>>>> position is only checkpointed at commit time. Recovering
>>> after a
>>> >>>>>>>>>>> failure might load inconsistent offsets and positions.
>>> >>>>>>>>>>>
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> 102
>>> >>>>>>>>>>> The position is maintained inside the state store, but is
>>> >>>>>>>>>>> persisted in the .position file when the state store closes.
>>> The
>>> >>>>>>>>>>> only public interface that uses the position is IQv2 in a
>>> >>>>>>>>>>> read-only mode. So the position is only updated within the
>>> state
>>> >>>>>>>>>>> store and read from IQv2. No need to add anything to the
>>> public
>>> >>>>>>>>>>> StateStore interface.
>>> >>>>>>>>>>>
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> 103
>>> >>>>>>>>>>> Deprecating managesOffsets() right away might be a good idea.
>>> >>>>>>>>>>>
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> 104
>>> >>>>>>>>>>> I agree that we should try to support downgrades without
>>> wipes.
>>> >>> At
>>> >>>>>>>>>>> least Nick should state in the KIP why we do not support it.
>>> >>>>>>>>>>>
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> Best,
>>> >>>>>>>>>>> Bruno
>>> >>>>>>>>>>>
>>> >>>>>>>>>>>
>>> >>>>>>>>>>>
>>> >>>>>>>>>>>
>>> >>>>>>>>>>> On 4/23/24 8:13 AM, Matthias J. Sax wrote:
>>> >>>>>>>>>>>> Thanks for splitting out this KIP. The discussion shows,
>>> that
>>> >>> it
>>> >>>>>>>>>>>> is a complex beast by itself, so worth to discuss by its
>>> own.
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> Couple of question / comment:
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> 100 `StateStore#commit()`: The JavaDoc says "must not be
>>> called
>>> >>>>>>>>>>>> by users" -- I would propose to put a guard in place for
>>> this,
>>> >>> by
>>> >>>>>>>>>>>> either throwing an exception (preferable) or adding a no-op
>>> >>>>>>>>>>>> implementation (at least for our own stores, by wrapping
>>> them
>>> >>> --
>>> >>>>>>>>>>>> we cannot enforce it for custom stores I assume), and
>>> document
>>> >>>>>>>>>>>> this contract explicitly.
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> 101 adding `.position` to the store: Why do we actually need
>>> >>>>>>>>>>>> this? The KIP says "To ensure consistency with the committed
>>> >>> data
>>> >>>>>>>>>>>> and changelog offsets" but I am not sure if I can follow?
>>> Can
>>> >>> you
>>> >>>>>>>>>>>> elaborate why leaving the `.position` file as-is won't work?
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>>> If it's possible at all, it will need to be done by
>>> >>>>>>>>>>>>> creating temporary StateManagers and StateStores during
>>> >>>>>>>>>>>>> rebalance. I think
>>> >>>>>>>>>>>>> it is possible, and probably not too expensive, but the
>>> devil
>>> >>>>>>>>>>>>> will be in
>>> >>>>>>>>>>>>> the detail.
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> This sounds like a significant overhead to me. We know that
>>> >>>>>>>>>>>> opening a single RocksDB takes about 500ms, and thus opening
>>> >>>>>>>>>>>> RocksDB to get this information might slow down rebalances
>>> >>>>>>>>>>>> significantly.
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> 102: It's unclear to me, how `.position` information is
>>> added.
>>> >>>>>>>>>>>> The KIP only says: "position offsets will be stored in
>>> RocksDB,
>>> >>>>>>>>>>>> in the same column family as the changelog offsets". Do you
>>> >>>>>>>>>>>> intent to add this information to the map passed via
>>> >>>>>>>>>>>> `commit(final Map<TopicPartition, Long> changelogOffsets)`?
>>> The
>>> >>>>>>>>>>>> KIP should describe this in more detail. Also, if my
>>> assumption
>>> >>>>>>>>>>>> is correct, we might want to rename the parameter and also
>>> >>> have a
>>> >>>>>>>>>>>> better JavaDoc description?
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> 103: Should we make it mandatory (long-term) that all stores
>>> >>>>>>>>>>>> (including custom stores) manage their offsets internally?
>>> >>>>>>>>>>>> Maintaining both options and thus both code paths puts a
>>> burden
>>> >>>>>>>>>>>> on everyone and make the code messy. I would strongly
>>> prefer if
>>> >>>>>>>>>>>> we could have mid-term path to get rid of supporting both.
>>> --
>>> >>>>>>>>>>>> For this case, we should deprecate the newly added
>>> >>>>>>>>>>>> `managesOffsets()` method right away, to point out that we
>>> >>> intend
>>> >>>>>>>>>>>> to remove it. If it's mandatory to maintain offsets for
>>> stores,
>>> >>>>>>>>>>>> we won't need this method any longer. In memory stores can
>>> just
>>> >>>>>>>>>>>> return null from #committedOffset().
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> 104 "downgrading": I think it might be worth to add support
>>> for
>>> >>>>>>>>>>>> downgrading w/o the need to wipe stores? Leveraging
>>> >>>>>>>>>>>> `upgrade.from` parameter, we could build a two rolling
>>> bounce
>>> >>>>>>>>>>>> downgrade: (1) the new code is started with `upgrade.from`
>>> set
>>> >>> to
>>> >>>>>>>>>>>> a lower version, telling the runtime to do the cleanup on
>>> >>>>>>>>>>>> `close()` -- (ie, ensure that all data is written into
>>> >>>>>>>>>>>> `.checkpoint` and `.position` file, and the newly added CL
>>> is
>>> >>>>>>>>>>>> deleted). In a second, rolling bounce, the old code would be
>>> >>> able
>>> >>>>>>>>>>>> to open RocksDB. -- I understand that this implies much more
>>> >>>>>>>>>>>> work, but downgrade seems to be common enough, that it
>>> might be
>>> >>>>>>>>>>>> worth it? Even if we did not always support this in the
>>> past,
>>> >>> we
>>> >>>>>>>>>>>> have the face the fact that KS is getting more and more
>>> adopted
>>> >>>>>>>>>>>> and as a more mature product should support this?
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> -Matthias
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>>
>>> >>>>>>>>>>>> On 4/21/24 11:58 PM, Bruno Cadonna wrote:
>>> >>>>>>>>>>>>> Hi all,
>>> >>>>>>>>>>>>>
>>> >>>>>>>>>>>>> How should we proceed here?
>>> >>>>>>>>>>>>>
>>> >>>>>>>>>>>>> 1. with the plain .checkpoint file
>>> >>>>>>>>>>>>> 2. with a way to use the state store interface on
>>> unassigned
>>> >>> but
>>> >>>>>>>>>>>>> locally existing task state
>>> >>>>>>>>>>>>>
>>> >>>>>>>>>>>>> While I like option 2, I think option 1 is less risky and
>>> will
>>> >>>>>>>>>>>>> give us the benefits of transactional state stores sooner.
>>> We
>>> >>>>>>>>>>>>> should consider the interface approach afterwards, though.
>>> >>>>>>>>>>>>>
>>> >>>>>>>>>>>>>
>>> >>>>>>>>>>>>> Best,
>>> >>>>>>>>>>>>> Bruno
>>> >>>>>>>>>>>>>
>>> >>>>>>>>>>>>>
>>> >>>>>>>>>>>>>
>>> >>>>>>>>>>>>> On 4/17/24 3:15 PM, Bruno Cadonna wrote:
>>> >>>>>>>>>>>>>> Hi Nick and Sophie,
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>> I think the task ID is not enough to create a state store
>>> >>> that
>>> >>>>>>>>>>>>>> can read the offsets of non-assigned tasks for lag
>>> >>> computation
>>> >>>>>>>>>>>>>> during rebalancing. The state store also needs the state
>>> >>>>>>>>>>>>>> directory so that it knows where to find the information
>>> that
>>> >>>>>>>>>>>>>> it needs to return from changelogOffsets().
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>> In general, I think we should proceed with the plain
>>> >>>>>>>>>>>>>> .checkpoint file for now and iterate back to the state
>>> store
>>> >>>>>>>>>>>>>> solution later since it seems it is not that
>>> straightforward.
>>> >>>>>>>>>>>>>> Alternatively, Nick could timebox an effort to better
>>> >>>>>>>>>>>>>> understand what would be needed for the state store
>>> solution.
>>> >>>>>>>>>>>>>> Nick, let us know your decision.
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>> Regarding your question about the state store instance. I
>>> am
>>> >>>>>>>>>>>>>> not too familiar with that part of the code, but I think
>>> the
>>> >>>>>>>>>>>>>> state store is build when the processor topology is build
>>> and
>>> >>>>>>>>>>>>>> the processor topology is build per stream task. So there
>>> is
>>> >>>>>>>>>>>>>> one instance of processor topology and state store per
>>> stream
>>> >>>>>>>>>>>>>> task. Try to follow the call in [1].
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>> Best,
>>> >>>>>>>>>>>>>> Bruno
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>> [1]
>>> >>>>>>>>>>>>>>
>>> >>>>>
>>> >>>>
>>> >>>
>>> https://github.com/apache/kafka/blob/f52575b17225828d2ff11996030ab7304667deab/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java#L153
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>> On 4/16/24 8:59 PM, Nick Telford wrote:
>>> >>>>>>>>>>>>>>> That does make sense. The one thing I can't figure out is
>>> >>> how
>>> >>>>>>>>>>>>>>> per-Task
>>> >>>>>>>>>>>>>>> StateStore instances are constructed.
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> It looks like we construct one StateStore instance for
>>> the
>>> >>>>>>>>>>>>>>> whole Topology
>>> >>>>>>>>>>>>>>> (in InternalTopologyBuilder), and pass that into
>>> >>>>>>>>>>>>>>> ProcessorStateManager (via
>>> >>>>>>>>>>>>>>> StateManagerUtil) for each Task, which then initializes
>>> it.
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> This can't be the case though, otherwise multiple
>>> partitions
>>> >>>>>>>>>>>>>>> of the same
>>> >>>>>>>>>>>>>>> sub-topology (aka Tasks) would share the same StateStore
>>> >>>>>>>>>>>>>>> instance, which
>>> >>>>>>>>>>>>>>> they don't.
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> What am I missing?
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>> On Tue, 16 Apr 2024 at 16:22, Sophie Blee-Goldman
>>> >>>>>>>>>>>>>>> <sop...@responsive.dev>
>>> >>>>>>>>>>>>>>> wrote:
>>> >>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>> I don't think we need to *require* a constructor accept
>>> the
>>> >>>>>>>>>>>>>>>> TaskId, but we
>>> >>>>>>>>>>>>>>>> would definitely make sure that the RocksDB state store
>>> >>>>>>>>>>>>>>>> changes its
>>> >>>>>>>>>>>>>>>> constructor to one that accepts the TaskID (which we
>>> can do
>>> >>>>>>>>>>>>>>>> without
>>> >>>>>>>>>>>>>>>> deprecation since its an internal API), and custom state
>>> >>>>>>>>>>>>>>>> stores can just
>>> >>>>>>>>>>>>>>>> decide for themselves whether they want to opt-in/use
>>> the
>>> >>>>>>>>>>>>>>>> TaskId param
>>> >>>>>>>>>>>>>>>> or not. I mean custom state stores would have to opt-in
>>> >>>>>>>>>>>>>>>> anyways by
>>> >>>>>>>>>>>>>>>> implementing the new StoreSupplier#get(TaskId) API and
>>> the
>>> >>>> only
>>> >>>>>>>>>>>>>>>> reason to do that would be to have created a constructor
>>> >>> that
>>> >>>>>>>>>>>>>>>> accepts
>>> >>>>>>>>>>>>>>>> a TaskId
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>> Just to be super clear about the proposal, this is what
>>> I
>>> >>> had
>>> >>>>>>>>>>>>>>>> in mind.
>>> >>>>>>>>>>>>>>>> It's actually fairly simple and wouldn't add much to the
>>> >>>>>>>>>>>>>>>> scope of the
>>> >>>>>>>>>>>>>>>> KIP (I think -- if it turns out to be more complicated
>>> than
>>> >>>>>>>>>>>>>>>> I'm assuming,
>>> >>>>>>>>>>>>>>>> we should definitely do whatever has the smallest LOE to
>>> >>> get
>>> >>>>>>>>>>>>>>>> this done
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>> Anyways, the (only) public API changes would be to add
>>> this
>>> >>>> new
>>> >>>>>>>>>>>>>>>> method to the StoreSupplier API:
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>> default T get(final TaskId taskId) {
>>> >>>>>>>>>>>>>>>>       return get();
>>> >>>>>>>>>>>>>>>> }
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>> We can decide whether or not to deprecate the old #get
>>> but
>>> >>>>>>>>>>>>>>>> it's not
>>> >>>>>>>>>>>>>>>> really necessary and might cause a lot of turmoil, so
>>> I'd
>>> >>>>>>>>>>>>>>>> personally
>>> >>>>>>>>>>>>>>>> say we just leave both APIs in place.
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>> And that's it for public API changes! Internally, we
>>> would
>>> >>>>>>>>>>>>>>>> just adapt
>>> >>>>>>>>>>>>>>>> each of the rocksdb StoreSupplier classes to implement
>>> this
>>> >>>> new
>>> >>>>>>>>>>>>>>>> API. So for example with the
>>> >>>> RocksDBKeyValueBytesStoreSupplier,
>>> >>>>>>>>>>>>>>>> we just add
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>> @Override
>>> >>>>>>>>>>>>>>>> public KeyValueStore<Bytes, byte[]> get(final TaskId
>>> >>> taskId)
>>> >>>> {
>>> >>>>>>>>>>>>>>>>       return returnTimestampedStore ?
>>> >>>>>>>>>>>>>>>>           new RocksDBTimestampedStore(name,
>>> metricsScope(),
>>> >>>>>>>>>>>>>>>> taskId) :
>>> >>>>>>>>>>>>>>>>           new RocksDBStore(name, metricsScope(),
>>> taskId);
>>> >>>>>>>>>>>>>>>> }
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>> And of course add the TaskId parameter to each of the
>>> >>> actual
>>> >>>>>>>>>>>>>>>> state store constructors returned here.
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>> Does that make sense? It's entirely possible I'm missing
>>> >>>>>>>>>>>>>>>> something
>>> >>>>>>>>>>>>>>>> important here, but I think this would be a pretty small
>>> >>>>>>>>>>>>>>>> addition that
>>> >>>>>>>>>>>>>>>> would solve the problem you mentioned earlier while also
>>> >>>> being
>>> >>>>>>>>>>>>>>>> useful to anyone who uses custom state stores.
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>> On Mon, Apr 15, 2024 at 10:21 AM Nick Telford
>>> >>>>>>>>>>>>>>>> <nick.telf...@gmail.com>
>>> >>>>>>>>>>>>>>>> wrote:
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> Hi Sophie,
>>> >>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> Interesting idea! Although what would that mean for the
>>> >>>>>>>>>>>>>>>>> StateStore
>>> >>>>>>>>>>>>>>>>> interface? Obviously we can't require that the
>>> constructor
>>> >>>>>>>>>>>>>>>>> take the
>>> >>>>>>>>>>>>>>>> TaskId.
>>> >>>>>>>>>>>>>>>>> Is it enough to add the parameter to the StoreSupplier?
>>> >>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> Would doing this be in-scope for this KIP, or are we
>>> >>>>>>>>>>>>>>>>> over-complicating
>>> >>>>>>>>>>>>>>>> it?
>>> >>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> Nick
>>> >>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> On Fri, 12 Apr 2024 at 21:30, Sophie Blee-Goldman
>>> >>>>>>>>>>>>>>>>> <sop...@responsive.dev
>>> >>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>> wrote:
>>> >>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>> Somewhat minor point overall, but it actually drives
>>> me
>>> >>>>>>>>>>>>>>>>>> crazy that you
>>> >>>>>>>>>>>>>>>>>> can't get access to the taskId of a StateStore until
>>> >>> #init
>>> >>>>>>>>>>>>>>>>>> is called.
>>> >>>>>>>>>>>>>>>>> This
>>> >>>>>>>>>>>>>>>>>> has caused me a huge headache personally (since the
>>> same
>>> >>> is
>>> >>>>>>>>>>>>>>>>>> true for
>>> >>>>>>>>>>>>>>>>>> processors and I was trying to do something that's
>>> >>> probably
>>> >>>>>>>>>>>>>>>>>> too hacky
>>> >>>>>>>>>>>>>>>> to
>>> >>>>>>>>>>>>>>>>>> actually complain about here lol)
>>> >>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>> Can we just change the StateStoreSupplier to receive
>>> and
>>> >>>>>>>>>>>>>>>>>> pass along the
>>> >>>>>>>>>>>>>>>>>> taskId when creating a new store? Presumably by
>>> adding a
>>> >>>>>>>>>>>>>>>>>> new version of
>>> >>>>>>>>>>>>>>>>> the
>>> >>>>>>>>>>>>>>>>>> #get method that takes in a taskId parameter? We can
>>> have
>>> >>>>>>>>>>>>>>>>>> it default to
>>> >>>>>>>>>>>>>>>>>> invoking the old one for compatibility reasons and it
>>> >>>>>>>>>>>>>>>>>> should be
>>> >>>>>>>>>>>>>>>>> completely
>>> >>>>>>>>>>>>>>>>>> safe to tack on.
>>> >>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>> Would also prefer the same for a ProcessorSupplier,
>>> but
>>> >>>>> that's
>>> >>>>>>>>>>>>>>>> definitely
>>> >>>>>>>>>>>>>>>>>> outside the scope of this KIP
>>> >>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>> On Fri, Apr 12, 2024 at 3:31 AM Nick Telford
>>> >>>>>>>>>>>>>>>>>> <nick.telf...@gmail.com>
>>> >>>>>>>>>>>>>>>>>> wrote:
>>> >>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>> On further thought, it's clear that this can't work
>>> for
>>> >>>>>>>>>>>>>>>>>>> one simple
>>> >>>>>>>>>>>>>>>>>> reason:
>>> >>>>>>>>>>>>>>>>>>> StateStores don't know their associated TaskId (and
>>> >>> hence,
>>> >>>>>>>>>>>>>>>>>>> their
>>> >>>>>>>>>>>>>>>>>>> StateDirectory) until the init() call. Therefore,
>>> >>>>>>>>>>>>>>>>>>> committedOffset()
>>> >>>>>>>>>>>>>>>>> can't
>>> >>>>>>>>>>>>>>>>>>> be called before init(), unless we also added a
>>> >>>>>>>>>>>>>>>>>>> StateStoreContext
>>> >>>>>>>>>>>>>>>>>> argument
>>> >>>>>>>>>>>>>>>>>>> to committedOffset(), which I think might be trying
>>> to
>>> >>>>>>>>>>>>>>>>>>> shoehorn too
>>> >>>>>>>>>>>>>>>>> much
>>> >>>>>>>>>>>>>>>>>>> into committedOffset().
>>> >>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>> I still don't like the idea of the Streams engine
>>> >>>>>>>>>>>>>>>>>>> maintaining the
>>> >>>>>>>>>>>>>>>> cache
>>> >>>>>>>>>>>>>>>>>> of
>>> >>>>>>>>>>>>>>>>>>> changelog offsets independently of stores, mostly
>>> >>> because
>>> >>>>>>>>>>>>>>>>>>> of the
>>> >>>>>>>>>>>>>>>>>>> maintenance burden of the code duplication, but it
>>> looks
>>> >>>>>>>>>>>>>>>>>>> like we'll
>>> >>>>>>>>>>>>>>>>> have
>>> >>>>>>>>>>>>>>>>>> to
>>> >>>>>>>>>>>>>>>>>>> live with it.
>>> >>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>> Unless you have any better ideas?
>>> >>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>> Regards,
>>> >>>>>>>>>>>>>>>>>>> Nick
>>> >>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>> On Wed, 10 Apr 2024 at 14:12, Nick Telford
>>> >>>>>>>>>>>>>>>>>>> <nick.telf...@gmail.com>
>>> >>>>>>>>>>>>>>>>>> wrote:
>>> >>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>> Hi Bruno,
>>> >>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>> Immediately after I sent my response, I looked at
>>> the
>>> >>>>>>>>>>>>>>>>>>>> codebase and
>>> >>>>>>>>>>>>>>>>> came
>>> >>>>>>>>>>>>>>>>>>> to
>>> >>>>>>>>>>>>>>>>>>>> the same conclusion. If it's possible at all, it
>>> will
>>> >>>>>>>>>>>>>>>>>>>> need to be
>>> >>>>>>>>>>>>>>>> done
>>> >>>>>>>>>>>>>>>>>> by
>>> >>>>>>>>>>>>>>>>>>>> creating temporary StateManagers and StateStores
>>> during
>>> >>>>>>>>>>>>>>>>>>>> rebalance.
>>> >>>>>>>>>>>>>>>> I
>>> >>>>>>>>>>>>>>>>>>> think
>>> >>>>>>>>>>>>>>>>>>>> it is possible, and probably not too expensive, but
>>> the
>>> >>>>>>>>>>>>>>>>>>>> devil will
>>> >>>>>>>>>>>>>>>> be
>>> >>>>>>>>>>>>>>>>>> in
>>> >>>>>>>>>>>>>>>>>>>> the detail.
>>> >>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>> I'll try to find some time to explore the idea to
>>> see
>>> >>> if
>>> >>>>>>>>>>>>>>>>>>>> it's
>>> >>>>>>>>>>>>>>>>> possible
>>> >>>>>>>>>>>>>>>>>>> and
>>> >>>>>>>>>>>>>>>>>>>> report back, because we'll need to determine this
>>> >>> before
>>> >>>>>>>>>>>>>>>>>>>> we can
>>> >>>>>>>>>>>>>>>> vote
>>> >>>>>>>>>>>>>>>>> on
>>> >>>>>>>>>>>>>>>>>>> the
>>> >>>>>>>>>>>>>>>>>>>> KIP.
>>> >>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>> Regards,
>>> >>>>>>>>>>>>>>>>>>>> Nick
>>> >>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>> On Wed, 10 Apr 2024 at 11:36, Bruno Cadonna
>>> >>>>>>>>>>>>>>>>>>>> <cado...@apache.org>
>>> >>>>>>>>>>>>>>>>>> wrote:
>>> >>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>> Hi Nick,
>>> >>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>> Thanks for reacting on my comments so quickly!
>>> >>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>> 2.
>>> >>>>>>>>>>>>>>>>>>>>> Some thoughts on your proposal.
>>> >>>>>>>>>>>>>>>>>>>>> State managers (and state stores) are parts of
>>> tasks.
>>> >>> If
>>> >>>>>>>>>>>>>>>>>>>>> the task
>>> >>>>>>>>>>>>>>>> is
>>> >>>>>>>>>>>>>>>>>> not
>>> >>>>>>>>>>>>>>>>>>>>> assigned locally, we do not create those tasks. To
>>> get
>>> >>>>>>>>>>>>>>>>>>>>> the offsets
>>> >>>>>>>>>>>>>>>>>> with
>>> >>>>>>>>>>>>>>>>>>>>> your approach, we would need to either create kind
>>> of
>>> >>>>>>>>>>>>>>>>>>>>> inactive
>>> >>>>>>>>>>>>>>>> tasks
>>> >>>>>>>>>>>>>>>>>>>>> besides active and standby tasks or store and
>>> manage
>>> >>>> state
>>> >>>>>>>>>>>>>>>> managers
>>> >>>>>>>>>>>>>>>>> of
>>> >>>>>>>>>>>>>>>>>>>>> non-assigned tasks differently than the state
>>> managers
>>> >>>>>>>>>>>>>>>>>>>>> of assigned
>>> >>>>>>>>>>>>>>>>>>>>> tasks. Additionally, the cleanup thread that
>>> removes
>>> >>>>>>>>>>>>>>>>>>>>> unassigned
>>> >>>>>>>>>>>>>>>> task
>>> >>>>>>>>>>>>>>>>>>>>> directories needs to concurrently delete those
>>> >>> inactive
>>> >>>>>>>>>>>>>>>>>>>>> tasks or
>>> >>>>>>>>>>>>>>>>>>>>> task-less state managers of unassigned tasks. This
>>> >>> seems
>>> >>>>>>>>>>>>>>>>>>>>> all quite
>>> >>>>>>>>>>>>>>>>>> messy
>>> >>>>>>>>>>>>>>>>>>>>> to me.
>>> >>>>>>>>>>>>>>>>>>>>> Could we create those state managers (or state
>>> stores)
>>> >>>>>>>>>>>>>>>>>>>>> for locally
>>> >>>>>>>>>>>>>>>>>>>>> existing but unassigned tasks on demand when
>>> >>>>>>>>>>>>>>>>>>>>> TaskManager#getTaskOffsetSums() is executed? Or
>>> have a
>>> >>>>>>>>>>>>>>>>>>>>> different
>>> >>>>>>>>>>>>>>>>>>>>> encapsulation for the unused task directories?
>>> >>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>> Best,
>>> >>>>>>>>>>>>>>>>>>>>> Bruno
>>> >>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>> On 4/10/24 11:31 AM, Nick Telford wrote:
>>> >>>>>>>>>>>>>>>>>>>>>> Hi Bruno,
>>> >>>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>> Thanks for the review!
>>> >>>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>> 1, 4, 5.
>>> >>>>>>>>>>>>>>>>>>>>>> Done
>>> >>>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>> 3.
>>> >>>>>>>>>>>>>>>>>>>>>> You're right. I've removed the offending
>>> paragraph. I
>>> >>>> had
>>> >>>>>>>>>>>>>>>>> originally
>>> >>>>>>>>>>>>>>>>>>>>>> adapted this from the guarantees outlined in
>>> KIP-892.
>>> >>>>>>>>>>>>>>>>>>>>>> But it's
>>> >>>>>>>>>>>>>>>>>>>>> difficult to
>>> >>>>>>>>>>>>>>>>>>>>>> provide these guarantees without the KIP-892
>>> >>>> transaction
>>> >>>>>>>>>>>>>>>> buffers.
>>> >>>>>>>>>>>>>>>>>>>>> Instead,
>>> >>>>>>>>>>>>>>>>>>>>>> we'll add the guarantees back into the JavaDoc
>>> when
>>> >>>>>>>>>>>>>>>>>>>>>> KIP-892
>>> >>>>>>>>>>>>>>>> lands.
>>> >>>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>> 2.
>>> >>>>>>>>>>>>>>>>>>>>>> Good point! This is the only part of the KIP that
>>> was
>>> >>>>>>>>>>>>>>>>>> (significantly)
>>> >>>>>>>>>>>>>>>>>>>>>> changed when I extracted it from KIP-892. My
>>> >>> prototype
>>> >>>>>>>>>>>>>>>>>>>>>> currently
>>> >>>>>>>>>>>>>>>>>>>>> maintains
>>> >>>>>>>>>>>>>>>>>>>>>> this "cache" of changelog offsets in .checkpoint,
>>> but
>>> >>>>>>>>>>>>>>>>>>>>>> doing so
>>> >>>>>>>>>>>>>>>>>> becomes
>>> >>>>>>>>>>>>>>>>>>>>> very
>>> >>>>>>>>>>>>>>>>>>>>>> messy. My intent with this change was to try to
>>> >>> better
>>> >>>>>>>>>>>>>>>> encapsulate
>>> >>>>>>>>>>>>>>>>>>> this
>>> >>>>>>>>>>>>>>>>>>>>>> offset "caching", especially for StateStores that
>>> can
>>> >>>>>>>>>>>>>>>>>>>>>> cheaply
>>> >>>>>>>>>>>>>>>>>> provide
>>> >>>>>>>>>>>>>>>>>>>>> the
>>> >>>>>>>>>>>>>>>>>>>>>> offsets stored directly in them without needing to
>>> >>>>>>>>>>>>>>>>>>>>>> duplicate
>>> >>>>>>>>>>>>>>>> them
>>> >>>>>>>>>>>>>>>>> in
>>> >>>>>>>>>>>>>>>>>>>>> this
>>> >>>>>>>>>>>>>>>>>>>>>> cache.
>>> >>>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>> It's clear some more work is needed here to better
>>> >>>>>>>>>>>>>>>>>>>>>> encapsulate
>>> >>>>>>>>>>>>>>>>> this.
>>> >>>>>>>>>>>>>>>>>>> My
>>> >>>>>>>>>>>>>>>>>>>>>> immediate thought is: what if we construct *but
>>> don't
>>> >>>>>>>>>>>>>>>> initialize*
>>> >>>>>>>>>>>>>>>>>> the
>>> >>>>>>>>>>>>>>>>>>>>>> StateManager and StateStores for every Task
>>> directory
>>> >>>>>>>>>>>>>>>>>>>>>> on-disk?
>>> >>>>>>>>>>>>>>>>> That
>>> >>>>>>>>>>>>>>>>>>>>> should
>>> >>>>>>>>>>>>>>>>>>>>>> still be quite cheap to do, and would enable us to
>>> >>>>>>>>>>>>>>>>>>>>>> query the
>>> >>>>>>>>>>>>>>>>> offsets
>>> >>>>>>>>>>>>>>>>>>> for
>>> >>>>>>>>>>>>>>>>>>>>>> all on-disk stores, even if they're not open. If
>>> the
>>> >>>>>>>>>>>>>>>> StateManager
>>> >>>>>>>>>>>>>>>>>>> (aka.
>>> >>>>>>>>>>>>>>>>>>>>>> ProcessorStateManager/GlobalStateManager) proves
>>> too
>>> >>>>>>>>>>>>>>>>>>>>>> expensive
>>> >>>>>>>>>>>>>>>> to
>>> >>>>>>>>>>>>>>>>>> hold
>>> >>>>>>>>>>>>>>>>>>>>> open
>>> >>>>>>>>>>>>>>>>>>>>>> for closed stores, we could always have a
>>> >>>>>>>>>>>>>>>>>>>>>> "StubStateManager" in
>>> >>>>>>>>>>>>>>>>> its
>>> >>>>>>>>>>>>>>>>>>>>> place,
>>> >>>>>>>>>>>>>>>>>>>>>> that enables the querying of offsets, but nothing
>>> >>> else?
>>> >>>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>> IDK, what do you think?
>>> >>>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>> Regards,
>>> >>>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>> Nick
>>> >>>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>> On Tue, 9 Apr 2024 at 15:00, Bruno Cadonna
>>> >>>>>>>>>>>>>>>>>>>>>> <cado...@apache.org>
>>> >>>>>>>>>>>>>>>>>>> wrote:
>>> >>>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>>> Hi Nick,
>>> >>>>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>>> Thanks for breaking out the KIP from KIP-892!
>>> >>>>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>>> Here a couple of comments/questions:
>>> >>>>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>>> 1.
>>> >>>>>>>>>>>>>>>>>>>>>>> In Kafka Streams, we have a design guideline
>>> which
>>> >>>>>>>>>>>>>>>>>>>>>>> says to not
>>> >>>>>>>>>>>>>>>>> use
>>> >>>>>>>>>>>>>>>>>>> the
>>> >>>>>>>>>>>>>>>>>>>>>>> "get"-prefix for getters on the public API. Could
>>> >>> you
>>> >>>>>>>>>>>>>>>>>>>>>>> please
>>> >>>>>>>>>>>>>>>>> change
>>> >>>>>>>>>>>>>>>>>>>>>>> getCommittedOffsets() to committedOffsets()?
>>> >>>>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>>> 2.
>>> >>>>>>>>>>>>>>>>>>>>>>> It is not clear to me how
>>> >>>>> TaskManager#getTaskOffsetSums()
>>> >>>>>>>>>>>>>>>> should
>>> >>>>>>>>>>>>>>>>>> read
>>> >>>>>>>>>>>>>>>>>>>>>>> offsets of tasks the stream thread does not own
>>> but
>>> >>>>>>>>>>>>>>>>>>>>>>> that have a
>>> >>>>>>>>>>>>>>>>>> state
>>> >>>>>>>>>>>>>>>>>>>>>>> directory on the Streams client by calling
>>> >>>>>>>>>>>>>>>>>>>>>>> StateStore#getCommittedOffsets(). If the thread
>>> does
>>> >>>>>>>>>>>>>>>>>>>>>>> not own a
>>> >>>>>>>>>>>>>>>>> task
>>> >>>>>>>>>>>>>>>>>>> it
>>> >>>>>>>>>>>>>>>>>>>>>>> does also not create any state stores for the
>>> task,
>>> >>>>>>>>>>>>>>>>>>>>>>> which means
>>> >>>>>>>>>>>>>>>>>> there
>>> >>>>>>>>>>>>>>>>>>>>> is
>>> >>>>>>>>>>>>>>>>>>>>>>> no state store on which to call
>>> >>> getCommittedOffsets().
>>> >>>>>>>>>>>>>>>>>>>>>>> I would have rather expected that a checkpoint
>>> file
>>> >>> is
>>> >>>>>>>>>>>>>>>>>>>>>>> written
>>> >>>>>>>>>>>>>>>>> for
>>> >>>>>>>>>>>>>>>>>>> all
>>> >>>>>>>>>>>>>>>>>>>>>>> state stores on close -- not only for the
>>> >>> RocksDBStore
>>> >>>>>>>>>>>>>>>>>>>>>>> -- and
>>> >>>>>>>>>>>>>>>>> that
>>> >>>>>>>>>>>>>>>>>>> this
>>> >>>>>>>>>>>>>>>>>>>>>>> checkpoint file is read in
>>> >>>>>>>>>>>>>>>>>>>>>>> TaskManager#getTaskOffsetSums() for
>>> >>>>>>>>>>>>>>>>> the
>>> >>>>>>>>>>>>>>>>>>>>> tasks
>>> >>>>>>>>>>>>>>>>>>>>>>> that have a state directory on the client but are
>>> >>> not
>>> >>>>>>>>>>>>>>>>>>>>>>> currently
>>> >>>>>>>>>>>>>>>>>>>>> assigned
>>> >>>>>>>>>>>>>>>>>>>>>>> to any stream thread of the Streams client.
>>> >>>>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>>> 3.
>>> >>>>>>>>>>>>>>>>>>>>>>> In the javadocs for commit() you write
>>> >>>>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>>> "... all writes since the last commit(Map), or
>>> since
>>> >>>>>>>>>>>>>>>>>> init(StateStore)
>>> >>>>>>>>>>>>>>>>>>>>>>> *MUST* be available to readers, even after a
>>> >>> restart."
>>> >>>>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>>> This is only true for a clean close before the
>>> >>>>>>>>>>>>>>>>>>>>>>> restart, isn't
>>> >>>>>>>>>>>>>>>> it?
>>> >>>>>>>>>>>>>>>>>>>>>>> If the task fails with a dirty close, Kafka
>>> Streams
>>> >>>>>>>>>>>>>>>>>>>>>>> cannot
>>> >>>>>>>>>>>>>>>>>> guarantee
>>> >>>>>>>>>>>>>>>>>>>>>>> that the in-memory structures of the state store
>>> >>> (e.g.
>>> >>>>>>>>>>>>>>>>>>>>>>> memtable
>>> >>>>>>>>>>>>>>>>> in
>>> >>>>>>>>>>>>>>>>>>> the
>>> >>>>>>>>>>>>>>>>>>>>>>> case of RocksDB) are flushed so that the records
>>> and
>>> >>>> the
>>> >>>>>>>>>>>>>>>>> committed
>>> >>>>>>>>>>>>>>>>>>>>>>> offsets are persisted.
>>> >>>>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>>> 4.
>>> >>>>>>>>>>>>>>>>>>>>>>> The wrapper that provides the legacy
>>> checkpointing
>>> >>>>>>>>>>>>>>>>>>>>>>> behavior is
>>> >>>>>>>>>>>>>>>>>>> actually
>>> >>>>>>>>>>>>>>>>>>>>>>> an implementation detail. I would remove it from
>>> the
>>> >>>>>>>>>>>>>>>>>>>>>>> KIP, but
>>> >>>>>>>>>>>>>>>>> still
>>> >>>>>>>>>>>>>>>>>>>>>>> state that the legacy checkpointing behavior
>>> will be
>>> >>>>>>>>>>>>>>>>>>>>>>> supported
>>> >>>>>>>>>>>>>>>>> when
>>> >>>>>>>>>>>>>>>>>>> the
>>> >>>>>>>>>>>>>>>>>>>>>>> state store does not manage the checkpoints.
>>> >>>>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>>> 5.
>>> >>>>>>>>>>>>>>>>>>>>>>> Regarding the metrics, could you please add the
>>> >>> tags,
>>> >>>>>>>>>>>>>>>>>>>>>>> and the
>>> >>>>>>>>>>>>>>>>>>> recording
>>> >>>>>>>>>>>>>>>>>>>>>>> level (DEBUG or INFO) as done in KIP-607 or
>>> KIP-444.
>>> >>>>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>>> Best,
>>> >>>>>>>>>>>>>>>>>>>>>>> Bruno
>>> >>>>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>>> On 4/7/24 5:35 PM, Nick Telford wrote:
>>> >>>>>>>>>>>>>>>>>>>>>>>> Hi everyone,
>>> >>>>>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>>>> Based on some offline discussion, I've split out
>>> >>> the
>>> >>>>>>>>>>>>>>>>>>>>>>>> "Atomic
>>> >>>>>>>>>>>>>>>>>>>>>>> Checkpointing"
>>> >>>>>>>>>>>>>>>>>>>>>>>> section from KIP-892: Transactional Semantics
>>> for
>>> >>>>>>>>>>>>>>>>>>>>>>>> StateStores,
>>> >>>>>>>>>>>>>>>>>> into
>>> >>>>>>>>>>>>>>>>>>>>> its
>>> >>>>>>>>>>>>>>>>>>>>>>> own
>>> >>>>>>>>>>>>>>>>>>>>>>>> KIP
>>> >>>>>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>>>> KIP-1035: StateStore managed changelog offsets
>>> >>>>>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>
>>> >>>>>
>>> >>>>
>>> >>>
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets
>>> >>>>>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>>>> While KIP-892 was adopted *with* the changes
>>> >>> outlined
>>> >>>>> in
>>> >>>>>>>>>>>>>>>>> KIP-1035,
>>> >>>>>>>>>>>>>>>>>>>>> these
>>> >>>>>>>>>>>>>>>>>>>>>>>> changes were always the most contentious part,
>>> and
>>> >>>>>>>>>>>>>>>>>>>>>>>> continued
>>> >>>>>>>>>>>>>>>> to
>>> >>>>>>>>>>>>>>>>>> spur
>>> >>>>>>>>>>>>>>>>>>>>>>>> discussion even after KIP-892 was adopted.
>>> >>>>>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>>>> All the changes introduced in KIP-1035 have been
>>> >>>>>>>>>>>>>>>>>>>>>>>> removed from
>>> >>>>>>>>>>>>>>>>>>> KIP-892,
>>> >>>>>>>>>>>>>>>>>>>>>>> and
>>> >>>>>>>>>>>>>>>>>>>>>>>> a hard dependency on KIP-1035 has been added to
>>> >>>>>>>>>>>>>>>>>>>>>>>> KIP-892 in
>>> >>>>>>>>>>>>>>>> their
>>> >>>>>>>>>>>>>>>>>>>>> place.
>>> >>>>>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>>>> I'm hopeful that with some more focus on this
>>> set
>>> >>> of
>>> >>>>>>>>>>>>>>>>>>>>>>>> changes,
>>> >>>>>>>>>>>>>>>> we
>>> >>>>>>>>>>>>>>>>>> can
>>> >>>>>>>>>>>>>>>>>>>>>>>> deliver something that we're all happy with.
>>> >>>>>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>>>> Regards,
>>> >>>>>>>>>>>>>>>>>>>>>>>> Nick
>>> >>>>>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>>
>>> >>>>>>>>>>>>>>>
>>> >>>>>
>>> >>>>
>>> >>>
>>> >>
>>>
>>

Reply via email to