Nick,
Thanks a lot for updating the KIP. I made a pass over it. Overall LGTM.
A few nits and some more minor questions:
200: nit (Javadocs for `StateStore.managesOffsets()`):
This is highly
recommended, if possible, to ensure that custom StateStores provide the
consistency guarantees that Kafka Streams
expects when operating under the {@code exactly-once} {@code
processing.mode}.
Given that we make it mandatory, we should rephrase this: "high
recommended" does not seems to be strong enough wording.
201: Javadocs for `StateStore.commit(final Map<TopicPartition, Long>
changelogOffsets)`:
Implementations <em>SHOULD</em> ensure that {@code changelogOffsets} are
committed to disk atomically with the
records they represent, if possible.
Not sure if I can follow? Why "should ensure", but not "must ensure"?
202: New metrics:
`commit-rate` -> Description says "The number of calls to..." -- Should
be "The number of calls per second to..."?
`commit-latency-[]` -> Description says "The [] time taken to" -- Should
be "The [] time in nanoseconds taken to..."? (or milliseconds in case we
report in millis?)
203: Section "Consumer Rebalance Metadata"
We will then cache these offsets in-memory and close() these stores.
I think we should not pro-actively close the store, but keep them open,
until we get tasks assigned. For assigned tasks, we don't need to
re-open the store, what provides a nice optimization. For other stores,
we could close them at this point as there is no need to keep them open.
-- However, this might all be internal implementation details and maybe
we don't need to specify this on the KIP at all (might be best to just
not say anything about this part)?
203: "managesOffsets deprecation"
to allow for its removal in the next major release of Kafka Streams
We don't release Kafka Streams, but Kafka :) -- Also, it's not
necessarily the next major release, as we have a one year / 3 releases
guarantee to keep deprecated APIs and we don't know when the next major
release will happen. Let's just rephrase this in a some more generic
way: "for its removal in a future [major] release" or something like this.
204: "Downgrade":
by default the on-disk state for any Task containing a RocksDBStore will
be wiped and restored from their changelogs.
This seems not to be correct? For this case, won't Kafka Streams just
crash? And a manual store deletions would be required?
205: How do we intent to implement offset management for segmented
stores? Are we going to add this new CL to _all_ segments? From a
structure POV is seems best to add to all segments, but it seem
sufficient to keep the information up-to-date only in the latest
segments (what would imply that we need to copy the information from the
current latest segment to a newly created segment explicitly) and only
_read_ the information from the latest segment as older segments might
contain stale metadata?
206: `KeyValueStoreTestDriver`:
This is a test class, right? So we don't need to cover it in the KIP,
and can rename w/o a deprecation phase, as it's all internal code.
-Matthias
On 5/30/24 8:57 AM, Nick Telford wrote:
Hi everyone,
I didn't spot this before, but it looks like the API of
KeyValueStoreTestDriver will need to be updated to change the
nomenclature
from "flushed" to "committed":
numFlushedEntryRemoved() -> numCommittedEntryRemoved()
numFlushedEntryStored() -> numCommittedEntryStored()
flushedEntryRemoved(K) -> committedEntryRemoved(K)
flushedEntryStored(K) -> committedEntryStored(K)
The old methods will obviously be marked as @Deprecated.
Any objections before I add this to the KIP?
Regards,
Nick
On Wed, 29 May 2024 at 11:20, Nick Telford <nick.telf...@gmail.com>
wrote:
I've updated the KIP with the following:
- Deprecation of StateStore#managesOffsets
- Change StateStore#commit to throw UnsupportedOperationException
when
called from a Processor (via AbstractReadWriteDecorator)
- Updated consumer rebalance lag computation strategy
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets#KIP1035:StateStoremanagedchangelogoffsets-ConsumerRebalanceMetadata
based on our Meet discussion
- I've added a bit more detail here than we discussed, in
particular around how we handle the offsets for tasks assigned
to our local
instance, and how we handle offsets when Tasks are
closed/revoked.
- Improved downgrade behaviour
- Note: users that don't downgrade with upgrade.from will still
get
the wipe-and-restore behaviour by-default.
I believe this covers all the outstanding changes that were requested.
Please let me know if I've missed anything or you think further changes
are
needed.
Regards,
Nick
On Wed, 29 May 2024 at 09:28, Nick Telford <nick.telf...@gmail.com>
wrote:
Hi everyone,
Sorry I haven't got around to updating the KIP yet. Now that I've
wrapped
up KIP-989, I'm going to be working on 1035 starting today.
I'll update the KIP first, and then call a vote.
Regards,
Nick
On Wed, 29 May 2024 at 07:25, Bruno Cadonna <cado...@apache.org>
wrote:
Totally agree on moving forward and starting the VOTE!
However, the KIP should be updated with the new info before starting
the
VOTE.
Best,
Bruno
On 5/29/24 2:36 AM, Matthias J. Sax wrote:
Sounds like a good plan. -- I think we are still wrapping up 3.8
release, but would also like to move forward with with one.
Should we start a VOTE?
For merging PRs we need to wait after code freeze, and 3.8 branch was
but. But we could start reviewing PRs before this already.
-Matthias
On 5/17/24 3:05 AM, Nick Telford wrote:
Hi everyone,
As discussed on the Zoom call, we're going to handle rebalance
meta-data by:
- On start-up, Streams will open each store and read its changelog
offsets
into an in-memory cache. This cache will be shared among all
StreamThreads.
- On rebalance, the cache will be consulted for Task offsets for any
Task
that is not active on any instance-local StreamThreads. If the Task
is
active on *any* instance-local StreamThread, we will report the Task
lag as
"up to date" (i.e. -1), because we know that the local state is
currently
up-to-date.
We will avoid caching offsets across restarts in the legacy
".checkpoint"
file, so that we can eliminate the logic for handling this class. If
performance of opening/closing many state stores is poor, we can
parallelise it by forking off a thread for each Task directory when
reading
the offsets.
I'll update the KIP later today to reflect this design, but I will
try to
keep it high-level, so that the exact implementation can vary.
Regards,
Nick
On Thu, 16 May 2024 at 03:12, Sophie Blee-Goldman <
sop...@responsive.dev>
wrote:
103: I like the idea of immediately deprecating #managesOffsets and
aiming
to make offset management mandatory in the long run. I assume we
would also
log a warning for any custom stores that return "false" from this
method to
encourage custom store implementations to start doing so? My only
question/concern is that if we want folks to start managing their
own
offsets then we should make this transition easy for them, perhaps
by
exposing some public utility APIs for things that are currently
handled by
Kafka Streams such as reading/writing checkpoint files. Maybe it
would be
useful to include a small example in the KIP of what it would
actually mean
to "manage your own offsets" -- I know (all too well) that plugging
in
custom storage implementations is not easy and most people who do
this are
probably fairly advanced users, but offset management will be a
totally new
ballgame to most people people and this kind of feels like throwing
them
off the deep end. We should at least provide a lifejacket via some
kind of
utility API and/or example
200. There's been a lot of back and forth on the rebalance
metadata/task
lag computation question, so forgive me if I missed any part of
this,
but I
think we've landed at the right idea here. To summarize: the
"tl;dr"
explanation is that we'll write the checkpoint file only on close
and
will
account for hard-crash scenarios by opening up the stores on
startup
and
writing a checkpoint file for any missing tasks. Does that sound
about
right?
A few clarifications:
I think we're all more or less on the same page here but just to be
absolutely clear, the task lags for each task directory found on
disk
will
be reported by only one of the StreamThreads, and each StreamThread
will
report lags only for tasks that it already owns or are not assigned
to any
other StreamThread in the client. In other words, we only need to
get
the
task lag for completely unassigned/unlocked tasks, which means if
there is
a checkpoint file at all then it must be up-to-date, because there
is no
other StreamThread actively writing to that state store (if so then
only
that StreamThread would report lag for that particular task).
This still leaves the "no checkpoint at all" case which as
previously
mentioned can occur after a hard-crash. Luckily we only have to
worry
about this once, after starting up again following said hard crash.
We can
simply open up each of the state stores before ever joining the
group, get
the offsets from rocksdb, and write them to a new checkpoint file.
After
that, we can depend on the checkpoints written at close and won't
have to
open up any stores that aren't already assigned for the reasons
laid
out in
the paragraph above.
As for the specific mechanism and which thread-does-what, since
there
were
some questions, this is how I'm imagining the process:
1. The general idea is that we simply go through each task
directories
with state but no checkpoint file and open the StateStore,
call
#committedOffset, and then write it to the checkpoint file. We
can then
close these stores and let things proceed as normal.
2. This only has to happen once, during startup, but we have
two
options:
1. Do this from KafkaStreams#start, ie before we even
create
the
StreamThreads
2. Do this from StreamThread#start, following a similar
lock-based
approach to the one used #computeTaskLags, where each
StreamThread
just
makes a pass over the task directories on disk and attempts
to
lock
them
one by one. If they obtain the lock, check whether there is
state
but no
checkpoint, and write the checkpoint if needed. If it can't
grab
the lock,
then we know one of the other StreamThreads must be
handling
the
checkpoint
file for that task directory, and we can move on.
Don't really feel too strongly about which approach is best, doing
it in
KafkaStreams#start is certainly the most simple while doing it in
the
StreamThread's startup is more efficient. If we're worried about
adding too
much weight to KafkaStreams#start then the 2nd option is probably
best,
though slightly more complicated.
Thoughts?
On Tue, May 14, 2024 at 10:02 AM Nick Telford <
nick.telf...@gmail.com>
wrote:
Hi everyone,
Sorry for the delay in replying. I've finally now got some time to
work
on
this.
Addressing Matthias's comments:
100.
Good point. As Bruno mentioned, there's already
AbstractReadWriteDecorator
which we could leverage to provide that protection. I'll add
details on
this to the KIP.
101,102.
It looks like these points have already been addressed by Bruno.
Let me
know if anything here is still unclear or you feel needs to be
detailed
more in the KIP.
103.
I'm in favour of anything that gets the old code removed sooner,
but
wouldn't deprecating an API that we expect (some) users to
implement
cause
problems?
I'm thinking about implementers of custom StateStores, as they may
be
confused by managesOffsets() being deprecated, especially since
they
would
have to mark their implementation as @Deprecated in order to avoid
compile
warnings.
If deprecating an API *while it's still expected to be
implemented*
is
something that's generally done in the project, then I'm happy to
do so
here.
104.
I think this is technically possible, but at the cost of
considerable
additional code to maintain. Would we ever have a pathway to
remove
this
downgrade code in the future?
Regarding rebalance metadata:
Opening all stores on start-up to read and cache their offsets is
an
interesting idea, especially if we can avoid re-opening the stores
once
the
Tasks have been assigned. Scalability shouldn't be too much of a
problem,
because typically users have a fairly short state.cleanup.delay,
so
the
number of on-disk Task directories should rarely exceed the number
of
Tasks
previously assigned to that instance.
An advantage of this approach is that it would also simplify
StateStore
implementations, as they would only need to guarantee that
committed
offsets are available when the store is open.
I'll investigate this approach this week for feasibility and
report
back.
I think that covers all the outstanding feedback, unless I missed
anything?
Regards,
Nick
On Mon, 6 May 2024 at 14:06, Bruno Cadonna <cado...@apache.org>
wrote:
Hi Matthias,
I see what you mean.
To sum up:
With this KIP the .checkpoint file is written when the store
closes.
That is when:
1. a task moves away from Kafka Streams client
2. Kafka Streams client shuts down
A Kafka Streams client needs the information in the .checkpoint
file
1. on startup because it does not have any open stores yet.
2. during rebalances for non-empty state directories of tasks
that
are
not assigned to the Kafka Streams client.
With hard crashes, i.e., when the Streams client is not able to
close
its state stores and write the .checkpoint file, the .checkpoint
file
might be quite stale. That influences the next rebalance after
failover
negatively.
My conclusion is that Kafka Streams either needs to open the
state
stores at start up or we write the checkpoint file more often.
Writing the .checkpoint file during processing more often without
controlling the flush to disk would work. However, Kafka Streams
would
checkpoint offsets that are not yet persisted on disk by the
state
store. That is with a hard crash the offsets in the .checkpoint
file
might be larger than the offsets checkpointed in the state store.
That
might not be a problem if Kafka Streams uses the .checkpoint file
only
to compute the task lag. The downside is that it makes the
managing of
checkpoints more complex because now we have to maintain two
checkpoints: one for restoration and one for computing the task
lag.
I think we should explore the option where Kafka Streams opens
the
state
stores at start up to get the offsets.
I also checked when Kafka Streams needs the checkpointed offsets
to
compute the task lag during a rebalance. Turns out Kafka Streams
needs
them before sending the join request. Now, I am wondering if
opening
the
state stores of unassigned tasks whose state directory exists
locally
is
actually such a big issue due to the expected higher latency
since
it
happens actually before the Kafka Streams client joins the
rebalance.
Best,
Bruno
On 5/4/24 12:05 AM, Matthias J. Sax wrote:
That's good questions... I could think of a few approaches, but
I
admit
it might all be a little bit tricky to code up...
However if we don't solve this problem, I think this KIP does
not
really
solve the core issue we are facing? In the end, if we rely on
the
`.checkpoint` file to compute a task assignment, but the
`.checkpoint`
file can be arbitrary stale after a crash because we only write
it
on a
clean close, there would be still a huge gap that this KIP does
not
close?
For the case in which we keep the checkpoint file, this KIP
would
still
help for "soft errors" in which KS can recover, and roll back
the
store.
A significant win for sure. -- But hard crashes would still be
an
problem? We might assign tasks to "wrong" instance, ie, which
are
not
most up to date, as the checkpoint information could be very
outdated?
Would we end up with a half-baked solution? Would this be good
enough
to
justify the introduced complexity? In the, for soft failures
it's
still
a win. Just want to make sure we understand the limitations and
make
an
educated decision.
Or do I miss something?
-Matthias
On 5/3/24 10:20 AM, Bruno Cadonna wrote:
Hi Matthias,
200:
I like the idea in general. However, it is not clear to me how
the
behavior should be with multiple stream threads in the same
Kafka
Streams client. What stream thread opens which store? How can a
stream
thread pass an open store to another stream thread that got the
corresponding task assigned? How does a stream thread know
that a
task
was not assigned to any of the stream threads of the Kafka
Streams
client? I have the feeling we should just keep the .checkpoint
file
on
close for now to unblock this KIP and try to find a solution to
get
totally rid of it later.
Best,
Bruno
On 5/3/24 6:29 PM, Matthias J. Sax wrote:
101: Yes, but what I am saying is, that we don't need to flush
the
.position file to disk periodically, but only maintain it in
main
memory, and only write it to disk on close() to preserve it
across
restarts. This way, it would never be ahead, but might only
lag?
But
with my better understanding about (102) it might be mood
anyway...
102: Thanks for clarifying. Looked into the code now. Makes
sense.
Might be something to be worth calling out explicitly in the
KIP
writeup. -- Now that I realize that the position is tracked
inside
the store (not outside as the changelog offsets) it makes much
more
sense to pull position into RocksDB itself. In the end, it's
actually
a "store implementation" detail how it tracks the position
(and
kinda
leaky abstraction currently, that we re-use the checkpoint
file
mechanism to track it and flush to disk).
200: I was thinking about this a little bit more, and maybe
it's
not
too bad? When KS starts up, we could upon all stores we find
on
local
disk pro-actively, and keep them all open until the first
rebalance
finishes: For tasks we get assigned, we hand in the already
opened
store (this would amortize the cost to open the store before
the
rebalance) and for non-assigned tasks, we know the offset
information
won't change and we could just cache it in-memory for later
reuse
(ie, next rebalance) and close the store to free up resources?
--
Assuming that we would get a large percentage of opened stores
assigned as tasks anyway, this could work?
-Matthias
On 5/3/24 1:29 AM, Bruno Cadonna wrote:
Hi Matthias,
101:
Let's assume a RocksDB store, but I think the following might
be
true also for other store implementations. With this KIP, if
Kafka
Streams commits the offsets, the committed offsets will be
stored
in
an in-memory data structure (i.e. the memtable) and stay
there
until
RocksDB decides that it is time to persist its in-memory data
structure. If Kafka Streams writes its position to the
.position
file during a commit and a crash happens before RocksDB
persist
the
memtable then the position in the .position file is ahead of
the
persisted offset. If IQ is done between the crash and the
state
store fully restored the changelog, the position might tell
IQ
that
the state store is more up-to-date than it actually is.
In contrast, if Kafka Streams handles persisting positions
the
same
as persisting offset, the position should always be
consistent
with
the offset, because they are persisted together.
102:
I am confused about your confusion which tells me that we are
talking about two different things.
You asked
"Do you intent to add this information [i.e. position] to the
map
passed via commit(final Map<TopicPartition, Long>
changelogOffsets)?"
and with what I wrote I meant that we do not need to pass the
position into the implementation of the StateStore interface
since
the position is updated within the implementation of the
StateStore
interface (e.g. RocksDBStore [1]). My statement describes the
behavior now, not the change proposed in this KIP, so it does
not
contradict what is stated in the KIP.
200:
This is about Matthias' main concern about rebalance
metadata.
As far as I understand the KIP, Kafka Streams will only use
the
.checkpoint files to compute the task lag for unassigned
tasks
whose
state is locally available. For assigned tasks, it will use
the
offsets managed by the open state store.
Best,
Bruno
[1]
https://github.com/apache/kafka/blob/fcbfd3412eb746a0c81374eb55ad0f73de6b1e71/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L397
On 5/1/24 3:00 AM, Matthias J. Sax wrote:
Thanks Bruno.
101: I think I understand this better now. But just want to
make
sure I do. What do you mean by "they can diverge" and
"Recovering
after a failure might load inconsistent offsets and
positions."
The checkpoint is the offset from the changelog, while the
position
is the offset from the upstream source topic, right? -- In
the
end,
the position is about IQ, and if we fail to update it, it
only
means that there is some gap when we might not be able to
query a
standby task, because we think it's not up-to-date enough
even if
it is, which would resolve itself soon? Ie, the position
might
"lag", but it's not "inconsistent". Do we believe that this
lag
would be highly problematic?
102: I am confused.
The position is maintained inside the state store, but is
persisted in the .position file when the state store
closes.
This contradicts the KIP:
these position offsets will be stored in RocksDB, in the
same
column family as the changelog offsets, instead of the
.position
file
My main concern is currently about rebalance metadata --
opening
RocksDB stores seems to be very expensive, but if we follow
the
KIP:
We will do this under EOS by updating the .checkpoint file
whenever a store is close()d.
It seems, having the offset inside RocksDB does not help us
at
all?
In the end, when we crash, we don't want to lose the state,
but
when we update the .checkpoint only on a clean close, the
.checkpoint might be stale (ie, still contains the
checkpoint
when
we opened the store when we got a task assigned).
-Matthias
On 4/30/24 2:40 AM, Bruno Cadonna wrote:
Hi all,
100
I think we already have such a wrapper. It is called
AbstractReadWriteDecorator.
101
Currently, the position is checkpointed when a offset
checkpoint
is written. If we let the state store manage the committed
offsets, we need to also let the state store also manage
the
position otherwise they might diverge. State store managed
offsets
can get flushed (i.e. checkpointed) to the disk when the
state
store decides to flush its in-memory data structures, but
the
position is only checkpointed at commit time. Recovering
after a
failure might load inconsistent offsets and positions.
102
The position is maintained inside the state store, but is
persisted in the .position file when the state store
closes.
The
only public interface that uses the position is IQv2 in a
read-only mode. So the position is only updated within the
state
store and read from IQv2. No need to add anything to the
public
StateStore interface.
103
Deprecating managesOffsets() right away might be a good
idea.
104
I agree that we should try to support downgrades without
wipes.
At
least Nick should state in the KIP why we do not support
it.
Best,
Bruno
On 4/23/24 8:13 AM, Matthias J. Sax wrote:
Thanks for splitting out this KIP. The discussion shows,
that
it
is a complex beast by itself, so worth to discuss by its
own.
Couple of question / comment:
100 `StateStore#commit()`: The JavaDoc says "must not be
called
by users" -- I would propose to put a guard in place for
this,
by
either throwing an exception (preferable) or adding a
no-op
implementation (at least for our own stores, by wrapping
them
--
we cannot enforce it for custom stores I assume), and
document
this contract explicitly.
101 adding `.position` to the store: Why do we actually
need
this? The KIP says "To ensure consistency with the
committed
data
and changelog offsets" but I am not sure if I can follow?
Can
you
elaborate why leaving the `.position` file as-is won't
work?
If it's possible at all, it will need to be done by
creating temporary StateManagers and StateStores during
rebalance. I think
it is possible, and probably not too expensive, but the
devil
will be in
the detail.
This sounds like a significant overhead to me. We know
that
opening a single RocksDB takes about 500ms, and thus
opening
RocksDB to get this information might slow down rebalances
significantly.
102: It's unclear to me, how `.position` information is
added.
The KIP only says: "position offsets will be stored in
RocksDB,
in the same column family as the changelog offsets". Do
you
intent to add this information to the map passed via
`commit(final Map<TopicPartition, Long>
changelogOffsets)`?
The
KIP should describe this in more detail. Also, if my
assumption
is correct, we might want to rename the parameter and also
have a
better JavaDoc description?
103: Should we make it mandatory (long-term) that all
stores
(including custom stores) manage their offsets internally?
Maintaining both options and thus both code paths puts a
burden
on everyone and make the code messy. I would strongly
prefer if
we could have mid-term path to get rid of supporting both.
--
For this case, we should deprecate the newly added
`managesOffsets()` method right away, to point out that we
intend
to remove it. If it's mandatory to maintain offsets for
stores,
we won't need this method any longer. In memory stores can
just
return null from #committedOffset().
104 "downgrading": I think it might be worth to add
support
for
downgrading w/o the need to wipe stores? Leveraging
`upgrade.from` parameter, we could build a two rolling
bounce
downgrade: (1) the new code is started with `upgrade.from`
set
to
a lower version, telling the runtime to do the cleanup on
`close()` -- (ie, ensure that all data is written into
`.checkpoint` and `.position` file, and the newly added CL
is
deleted). In a second, rolling bounce, the old code would
be
able
to open RocksDB. -- I understand that this implies much
more
work, but downgrade seems to be common enough, that it
might be
worth it? Even if we did not always support this in the
past,
we
have the face the fact that KS is getting more and more
adopted
and as a more mature product should support this?
-Matthias
On 4/21/24 11:58 PM, Bruno Cadonna wrote:
Hi all,
How should we proceed here?
1. with the plain .checkpoint file
2. with a way to use the state store interface on
unassigned
but
locally existing task state
While I like option 2, I think option 1 is less risky and
will
give us the benefits of transactional state stores
sooner.
We
should consider the interface approach afterwards,
though.
Best,
Bruno
On 4/17/24 3:15 PM, Bruno Cadonna wrote:
Hi Nick and Sophie,
I think the task ID is not enough to create a state
store
that
can read the offsets of non-assigned tasks for lag
computation
during rebalancing. The state store also needs the state
directory so that it knows where to find the information
that
it needs to return from changelogOffsets().
In general, I think we should proceed with the plain
.checkpoint file for now and iterate back to the state
store
solution later since it seems it is not that
straightforward.
Alternatively, Nick could timebox an effort to better
understand what would be needed for the state store
solution.
Nick, let us know your decision.
Regarding your question about the state store instance.
I
am
not too familiar with that part of the code, but I think
the
state store is build when the processor topology is
build
and
the processor topology is build per stream task. So
there
is
one instance of processor topology and state store per
stream
task. Try to follow the call in [1].
Best,
Bruno
[1]
https://github.com/apache/kafka/blob/f52575b17225828d2ff11996030ab7304667deab/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java#L153
On 4/16/24 8:59 PM, Nick Telford wrote:
That does make sense. The one thing I can't figure out
is
how
per-Task
StateStore instances are constructed.
It looks like we construct one StateStore instance for
the
whole Topology
(in InternalTopologyBuilder), and pass that into
ProcessorStateManager (via
StateManagerUtil) for each Task, which then initializes
it.
This can't be the case though, otherwise multiple
partitions
of the same
sub-topology (aka Tasks) would share the same
StateStore
instance, which
they don't.
What am I missing?
On Tue, 16 Apr 2024 at 16:22, Sophie Blee-Goldman
<sop...@responsive.dev>
wrote:
I don't think we need to *require* a constructor
accept
the
TaskId, but we
would definitely make sure that the RocksDB state
store
changes its
constructor to one that accepts the TaskID (which we
can do
without
deprecation since its an internal API), and custom
state
stores can just
decide for themselves whether they want to opt-in/use
the
TaskId param
or not. I mean custom state stores would have to
opt-in
anyways by
implementing the new StoreSupplier#get(TaskId) API and
the
only
reason to do that would be to have created a
constructor
that
accepts
a TaskId
Just to be super clear about the proposal, this is
what
I
had
in mind.
It's actually fairly simple and wouldn't add much to
the
scope of the
KIP (I think -- if it turns out to be more complicated
than
I'm assuming,
we should definitely do whatever has the smallest LOE
to
get
this done
Anyways, the (only) public API changes would be to add
this
new
method to the StoreSupplier API:
default T get(final TaskId taskId) {
return get();
}
We can decide whether or not to deprecate the old #get
but
it's not
really necessary and might cause a lot of turmoil, so
I'd
personally
say we just leave both APIs in place.
And that's it for public API changes! Internally, we
would
just adapt
each of the rocksdb StoreSupplier classes to implement
this
new
API. So for example with the
RocksDBKeyValueBytesStoreSupplier,
we just add
@Override
public KeyValueStore<Bytes, byte[]> get(final TaskId
taskId)
{
return returnTimestampedStore ?
new RocksDBTimestampedStore(name,
metricsScope(),
taskId) :
new RocksDBStore(name, metricsScope(),
taskId);
}
And of course add the TaskId parameter to each of the
actual
state store constructors returned here.
Does that make sense? It's entirely possible I'm
missing
something
important here, but I think this would be a pretty
small
addition that
would solve the problem you mentioned earlier while
also
being
useful to anyone who uses custom state stores.
On Mon, Apr 15, 2024 at 10:21 AM Nick Telford
<nick.telf...@gmail.com>
wrote:
Hi Sophie,
Interesting idea! Although what would that mean for
the
StateStore
interface? Obviously we can't require that the
constructor
take the
TaskId.
Is it enough to add the parameter to the
StoreSupplier?
Would doing this be in-scope for this KIP, or are we
over-complicating
it?
Nick
On Fri, 12 Apr 2024 at 21:30, Sophie Blee-Goldman
<sop...@responsive.dev
wrote:
Somewhat minor point overall, but it actually drives
me
crazy that you
can't get access to the taskId of a StateStore until
#init
is called.
This
has caused me a huge headache personally (since the
same
is
true for
processors and I was trying to do something that's
probably
too hacky
to
actually complain about here lol)
Can we just change the StateStoreSupplier to receive
and
pass along the
taskId when creating a new store? Presumably by
adding a
new version of
the
#get method that takes in a taskId parameter? We can
have
it default to
invoking the old one for compatibility reasons and
it
should be
completely
safe to tack on.
Would also prefer the same for a ProcessorSupplier,
but
that's
definitely
outside the scope of this KIP
On Fri, Apr 12, 2024 at 3:31 AM Nick Telford
<nick.telf...@gmail.com>
wrote:
On further thought, it's clear that this can't work
for
one simple
reason:
StateStores don't know their associated TaskId (and
hence,
their
StateDirectory) until the init() call. Therefore,
committedOffset()
can't
be called before init(), unless we also added a
StateStoreContext
argument
to committedOffset(), which I think might be trying
to
shoehorn too
much
into committedOffset().
I still don't like the idea of the Streams engine
maintaining the
cache
of
changelog offsets independently of stores, mostly
because
of the
maintenance burden of the code duplication, but it
looks
like we'll
have
to
live with it.
Unless you have any better ideas?
Regards,
Nick
On Wed, 10 Apr 2024 at 14:12, Nick Telford
<nick.telf...@gmail.com>
wrote:
Hi Bruno,
Immediately after I sent my response, I looked at
the
codebase and
came
to
the same conclusion. If it's possible at all, it
will
need to be
done
by
creating temporary StateManagers and StateStores
during
rebalance.
I
think
it is possible, and probably not too expensive,
but
the
devil will
be
in
the detail.
I'll try to find some time to explore the idea to
see
if
it's
possible
and
report back, because we'll need to determine this
before
we can
vote
on
the
KIP.
Regards,
Nick
On Wed, 10 Apr 2024 at 11:36, Bruno Cadonna
<cado...@apache.org>
wrote:
Hi Nick,
Thanks for reacting on my comments so quickly!
2.
Some thoughts on your proposal.
State managers (and state stores) are parts of
tasks.
If
the task
is
not
assigned locally, we do not create those tasks.
To
get
the offsets
with
your approach, we would need to either create
kind
of
inactive
tasks
besides active and standby tasks or store and
manage
state
managers
of
non-assigned tasks differently than the state
managers
of assigned
tasks. Additionally, the cleanup thread that
removes
unassigned
task
directories needs to concurrently delete those
inactive
tasks or
task-less state managers of unassigned tasks.
This
seems
all quite
messy
to me.
Could we create those state managers (or state
stores)
for locally
existing but unassigned tasks on demand when
TaskManager#getTaskOffsetSums() is executed? Or
have a
different
encapsulation for the unused task directories?
Best,
Bruno
On 4/10/24 11:31 AM, Nick Telford wrote:
Hi Bruno,
Thanks for the review!
1, 4, 5.
Done
3.
You're right. I've removed the offending
paragraph. I
had
originally
adapted this from the guarantees outlined in
KIP-892.
But it's
difficult to
provide these guarantees without the KIP-892
transaction
buffers.
Instead,
we'll add the guarantees back into the JavaDoc
when
KIP-892
lands.
2.
Good point! This is the only part of the KIP
that
was
(significantly)
changed when I extracted it from KIP-892. My
prototype
currently
maintains
this "cache" of changelog offsets in
.checkpoint,
but
doing so
becomes
very
messy. My intent with this change was to try to
better
encapsulate
this
offset "caching", especially for StateStores
that
can
cheaply
provide
the
offsets stored directly in them without needing
to
duplicate
them
in
this
cache.
It's clear some more work is needed here to
better
encapsulate
this.
My
immediate thought is: what if we construct *but
don't
initialize*
the
StateManager and StateStores for every Task
directory
on-disk?
That
should
still be quite cheap to do, and would enable us
to
query the
offsets
for
all on-disk stores, even if they're not open. If
the
StateManager
(aka.
ProcessorStateManager/GlobalStateManager) proves
too
expensive
to
hold
open
for closed stores, we could always have a
"StubStateManager" in
its
place,
that enables the querying of offsets, but
nothing
else?
IDK, what do you think?
Regards,
Nick
On Tue, 9 Apr 2024 at 15:00, Bruno Cadonna
<cado...@apache.org>
wrote:
Hi Nick,
Thanks for breaking out the KIP from KIP-892!
Here a couple of comments/questions:
1.
In Kafka Streams, we have a design guideline
which
says to not
use
the
"get"-prefix for getters on the public API.
Could
you
please
change
getCommittedOffsets() to committedOffsets()?
2.
It is not clear to me how
TaskManager#getTaskOffsetSums()
should
read
offsets of tasks the stream thread does not own
but
that have a
state
directory on the Streams client by calling
StateStore#getCommittedOffsets(). If the thread
does
not own a
task
it
does also not create any state stores for the
task,
which means
there
is
no state store on which to call
getCommittedOffsets().
I would have rather expected that a checkpoint
file
is
written
for
all
state stores on close -- not only for the
RocksDBStore
-- and
that
this
checkpoint file is read in
TaskManager#getTaskOffsetSums() for
the
tasks
that have a state directory on the client but
are
not
currently
assigned
to any stream thread of the Streams client.
3.
In the javadocs for commit() you write
"... all writes since the last commit(Map), or
since
init(StateStore)
*MUST* be available to readers, even after a
restart."
This is only true for a clean close before the
restart, isn't
it?
If the task fails with a dirty close, Kafka
Streams
cannot
guarantee
that the in-memory structures of the state
store
(e.g.
memtable
in
the
case of RocksDB) are flushed so that the
records
and
the
committed
offsets are persisted.
4.
The wrapper that provides the legacy
checkpointing
behavior is
actually
an implementation detail. I would remove it
from
the
KIP, but
still
state that the legacy checkpointing behavior
will be
supported
when
the
state store does not manage the checkpoints.
5.
Regarding the metrics, could you please add the
tags,
and the
recording
level (DEBUG or INFO) as done in KIP-607 or
KIP-444.
Best,
Bruno
On 4/7/24 5:35 PM, Nick Telford wrote:
Hi everyone,
Based on some offline discussion, I've split
out
the
"Atomic
Checkpointing"
section from KIP-892: Transactional Semantics
for
StateStores,
into
its
own
KIP
KIP-1035: StateStore managed changelog offsets
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets
While KIP-892 was adopted *with* the changes
outlined
in
KIP-1035,
these
changes were always the most contentious part,
and
continued
to
spur
discussion even after KIP-892 was adopted.
All the changes introduced in KIP-1035 have
been
removed from
KIP-892,
and
a hard dependency on KIP-1035 has been added
to
KIP-892 in
their
place.
I'm hopeful that with some more focus on this
set
of
changes,
we
can
deliver something that we're all happy with.
Regards,
Nick