Hi Matthias,
101:
Let's assume a RocksDB store, but I think the following might be true
also for other store implementations. With this KIP, if Kafka Streams
commits the offsets, the committed offsets will be stored in an
in-memory data structure (i.e. the memtable) and stay there until
RocksDB decides that it is time to persist its in-memory data structure.
If Kafka Streams writes its position to the .position file during a
commit and a crash happens before RocksDB persist the memtable then the
position in the .position file is ahead of the persisted offset. If IQ
is done between the crash and the state store fully restored the
changelog, the position might tell IQ that the state store is more
up-to-date than it actually is.
In contrast, if Kafka Streams handles persisting positions the same as
persisting offset, the position should always be consistent with the
offset, because they are persisted together.
102:
I am confused about your confusion which tells me that we are talking
about two different things.
You asked
"Do you intent to add this information [i.e. position] to the map passed
via commit(final Map<TopicPartition, Long> changelogOffsets)?"
and with what I wrote I meant that we do not need to pass the position
into the implementation of the StateStore interface since the position
is updated within the implementation of the StateStore interface (e.g.
RocksDBStore [1]). My statement describes the behavior now, not the
change proposed in this KIP, so it does not contradict what is stated in
the KIP.
200:
This is about Matthias' main concern about rebalance metadata.
As far as I understand the KIP, Kafka Streams will only use the
.checkpoint files to compute the task lag for unassigned tasks whose
state is locally available. For assigned tasks, it will use the offsets
managed by the open state store.
Best,
Bruno
[1]
https://github.com/apache/kafka/blob/fcbfd3412eb746a0c81374eb55ad0f73de6b1e71/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L397
On 5/1/24 3:00 AM, Matthias J. Sax wrote:
Thanks Bruno.
101: I think I understand this better now. But just want to make sure I
do. What do you mean by "they can diverge" and "Recovering after a
failure might load inconsistent offsets and positions."
The checkpoint is the offset from the changelog, while the position is
the offset from the upstream source topic, right? -- In the end, the
position is about IQ, and if we fail to update it, it only means that
there is some gap when we might not be able to query a standby task,
because we think it's not up-to-date enough even if it is, which would
resolve itself soon? Ie, the position might "lag", but it's not
"inconsistent". Do we believe that this lag would be highly problematic?
102: I am confused.
The position is maintained inside the state store, but is persisted in
the .position file when the state store closes.
This contradicts the KIP:
these position offsets will be stored in RocksDB, in the same column
family as the changelog offsets, instead of the .position file
My main concern is currently about rebalance metadata -- opening RocksDB
stores seems to be very expensive, but if we follow the KIP:
We will do this under EOS by updating the .checkpoint file whenever a
store is close()d.
It seems, having the offset inside RocksDB does not help us at all? In
the end, when we crash, we don't want to lose the state, but when we
update the .checkpoint only on a clean close, the .checkpoint might be
stale (ie, still contains the checkpoint when we opened the store when
we got a task assigned).
-Matthias
On 4/30/24 2:40 AM, Bruno Cadonna wrote:
Hi all,
100
I think we already have such a wrapper. It is called
AbstractReadWriteDecorator.
101
Currently, the position is checkpointed when a offset checkpoint is
written. If we let the state store manage the committed offsets, we
need to also let the state store also manage the position otherwise
they might diverge. State store managed offsets can get flushed (i.e.
checkpointed) to the disk when the state store decides to flush its
in-memory data structures, but the position is only checkpointed at
commit time. Recovering after a failure might load inconsistent
offsets and positions.
102
The position is maintained inside the state store, but is persisted in
the .position file when the state store closes. The only public
interface that uses the position is IQv2 in a read-only mode. So the
position is only updated within the state store and read from IQv2. No
need to add anything to the public StateStore interface.
103
Deprecating managesOffsets() right away might be a good idea.
104
I agree that we should try to support downgrades without wipes. At
least Nick should state in the KIP why we do not support it.
Best,
Bruno
On 4/23/24 8:13 AM, Matthias J. Sax wrote:
Thanks for splitting out this KIP. The discussion shows, that it is a
complex beast by itself, so worth to discuss by its own.
Couple of question / comment:
100 `StateStore#commit()`: The JavaDoc says "must not be called by
users" -- I would propose to put a guard in place for this, by either
throwing an exception (preferable) or adding a no-op implementation
(at least for our own stores, by wrapping them -- we cannot enforce
it for custom stores I assume), and document this contract explicitly.
101 adding `.position` to the store: Why do we actually need this?
The KIP says "To ensure consistency with the committed data and
changelog offsets" but I am not sure if I can follow? Can you
elaborate why leaving the `.position` file as-is won't work?
If it's possible at all, it will need to be done by
creating temporary StateManagers and StateStores during rebalance. I
think
it is possible, and probably not too expensive, but the devil will
be in
the detail.
This sounds like a significant overhead to me. We know that opening a
single RocksDB takes about 500ms, and thus opening RocksDB to get
this information might slow down rebalances significantly.
102: It's unclear to me, how `.position` information is added. The
KIP only says: "position offsets will be stored in RocksDB, in the
same column family as the changelog offsets". Do you intent to add
this information to the map passed via `commit(final
Map<TopicPartition, Long> changelogOffsets)`? The KIP should describe
this in more detail. Also, if my assumption is correct, we might want
to rename the parameter and also have a better JavaDoc description?
103: Should we make it mandatory (long-term) that all stores
(including custom stores) manage their offsets internally?
Maintaining both options and thus both code paths puts a burden on
everyone and make the code messy. I would strongly prefer if we could
have mid-term path to get rid of supporting both. -- For this case,
we should deprecate the newly added `managesOffsets()` method right
away, to point out that we intend to remove it. If it's mandatory to
maintain offsets for stores, we won't need this method any longer. In
memory stores can just return null from #committedOffset().
104 "downgrading": I think it might be worth to add support for
downgrading w/o the need to wipe stores? Leveraging `upgrade.from`
parameter, we could build a two rolling bounce downgrade: (1) the new
code is started with `upgrade.from` set to a lower version, telling
the runtime to do the cleanup on `close()` -- (ie, ensure that all
data is written into `.checkpoint` and `.position` file, and the
newly added CL is deleted). In a second, rolling bounce, the old code
would be able to open RocksDB. -- I understand that this implies much
more work, but downgrade seems to be common enough, that it might be
worth it? Even if we did not always support this in the past, we have
the face the fact that KS is getting more and more adopted and as a
more mature product should support this?
-Matthias
On 4/21/24 11:58 PM, Bruno Cadonna wrote:
Hi all,
How should we proceed here?
1. with the plain .checkpoint file
2. with a way to use the state store interface on unassigned but
locally existing task state
While I like option 2, I think option 1 is less risky and will give
us the benefits of transactional state stores sooner. We should
consider the interface approach afterwards, though.
Best,
Bruno
On 4/17/24 3:15 PM, Bruno Cadonna wrote:
Hi Nick and Sophie,
I think the task ID is not enough to create a state store that can
read the offsets of non-assigned tasks for lag computation during
rebalancing. The state store also needs the state directory so that
it knows where to find the information that it needs to return from
changelogOffsets().
In general, I think we should proceed with the plain .checkpoint
file for now and iterate back to the state store solution later
since it seems it is not that straightforward. Alternatively, Nick
could timebox an effort to better understand what would be needed
for the state store solution. Nick, let us know your decision.
Regarding your question about the state store instance. I am not
too familiar with that part of the code, but I think the state
store is build when the processor topology is build and the
processor topology is build per stream task. So there is one
instance of processor topology and state store per stream task. Try
to follow the call in [1].
Best,
Bruno
[1]
https://github.com/apache/kafka/blob/f52575b17225828d2ff11996030ab7304667deab/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java#L153
On 4/16/24 8:59 PM, Nick Telford wrote:
That does make sense. The one thing I can't figure out is how
per-Task
StateStore instances are constructed.
It looks like we construct one StateStore instance for the whole
Topology
(in InternalTopologyBuilder), and pass that into
ProcessorStateManager (via
StateManagerUtil) for each Task, which then initializes it.
This can't be the case though, otherwise multiple partitions of
the same
sub-topology (aka Tasks) would share the same StateStore instance,
which
they don't.
What am I missing?
On Tue, 16 Apr 2024 at 16:22, Sophie Blee-Goldman
<sop...@responsive.dev>
wrote:
I don't think we need to *require* a constructor accept the
TaskId, but we
would definitely make sure that the RocksDB state store changes its
constructor to one that accepts the TaskID (which we can do without
deprecation since its an internal API), and custom state stores
can just
decide for themselves whether they want to opt-in/use the TaskId
param
or not. I mean custom state stores would have to opt-in anyways by
implementing the new StoreSupplier#get(TaskId) API and the only
reason to do that would be to have created a constructor that
accepts
a TaskId
Just to be super clear about the proposal, this is what I had in
mind.
It's actually fairly simple and wouldn't add much to the scope of
the
KIP (I think -- if it turns out to be more complicated than I'm
assuming,
we should definitely do whatever has the smallest LOE to get this
done
Anyways, the (only) public API changes would be to add this new
method to the StoreSupplier API:
default T get(final TaskId taskId) {
return get();
}
We can decide whether or not to deprecate the old #get but it's not
really necessary and might cause a lot of turmoil, so I'd personally
say we just leave both APIs in place.
And that's it for public API changes! Internally, we would just
adapt
each of the rocksdb StoreSupplier classes to implement this new
API. So for example with the RocksDBKeyValueBytesStoreSupplier,
we just add
@Override
public KeyValueStore<Bytes, byte[]> get(final TaskId taskId) {
return returnTimestampedStore ?
new RocksDBTimestampedStore(name, metricsScope(), taskId) :
new RocksDBStore(name, metricsScope(), taskId);
}
And of course add the TaskId parameter to each of the actual
state store constructors returned here.
Does that make sense? It's entirely possible I'm missing something
important here, but I think this would be a pretty small addition
that
would solve the problem you mentioned earlier while also being
useful to anyone who uses custom state stores.
On Mon, Apr 15, 2024 at 10:21 AM Nick Telford
<nick.telf...@gmail.com>
wrote:
Hi Sophie,
Interesting idea! Although what would that mean for the StateStore
interface? Obviously we can't require that the constructor take the
TaskId.
Is it enough to add the parameter to the StoreSupplier?
Would doing this be in-scope for this KIP, or are we
over-complicating
it?
Nick
On Fri, 12 Apr 2024 at 21:30, Sophie Blee-Goldman
<sop...@responsive.dev
wrote:
Somewhat minor point overall, but it actually drives me crazy
that you
can't get access to the taskId of a StateStore until #init is
called.
This
has caused me a huge headache personally (since the same is
true for
processors and I was trying to do something that's probably too
hacky
to
actually complain about here lol)
Can we just change the StateStoreSupplier to receive and pass
along the
taskId when creating a new store? Presumably by adding a new
version of
the
#get method that takes in a taskId parameter? We can have it
default to
invoking the old one for compatibility reasons and it should be
completely
safe to tack on.
Would also prefer the same for a ProcessorSupplier, but that's
definitely
outside the scope of this KIP
On Fri, Apr 12, 2024 at 3:31 AM Nick Telford
<nick.telf...@gmail.com>
wrote:
On further thought, it's clear that this can't work for one
simple
reason:
StateStores don't know their associated TaskId (and hence, their
StateDirectory) until the init() call. Therefore,
committedOffset()
can't
be called before init(), unless we also added a StateStoreContext
argument
to committedOffset(), which I think might be trying to
shoehorn too
much
into committedOffset().
I still don't like the idea of the Streams engine maintaining the
cache
of
changelog offsets independently of stores, mostly because of the
maintenance burden of the code duplication, but it looks like
we'll
have
to
live with it.
Unless you have any better ideas?
Regards,
Nick
On Wed, 10 Apr 2024 at 14:12, Nick Telford
<nick.telf...@gmail.com>
wrote:
Hi Bruno,
Immediately after I sent my response, I looked at the
codebase and
came
to
the same conclusion. If it's possible at all, it will need to be
done
by
creating temporary StateManagers and StateStores during
rebalance.
I
think
it is possible, and probably not too expensive, but the devil
will
be
in
the detail.
I'll try to find some time to explore the idea to see if it's
possible
and
report back, because we'll need to determine this before we can
vote
on
the
KIP.
Regards,
Nick
On Wed, 10 Apr 2024 at 11:36, Bruno Cadonna <cado...@apache.org>
wrote:
Hi Nick,
Thanks for reacting on my comments so quickly!
2.
Some thoughts on your proposal.
State managers (and state stores) are parts of tasks. If the
task
is
not
assigned locally, we do not create those tasks. To get the
offsets
with
your approach, we would need to either create kind of inactive
tasks
besides active and standby tasks or store and manage state
managers
of
non-assigned tasks differently than the state managers of
assigned
tasks. Additionally, the cleanup thread that removes unassigned
task
directories needs to concurrently delete those inactive
tasks or
task-less state managers of unassigned tasks. This seems all
quite
messy
to me.
Could we create those state managers (or state stores) for
locally
existing but unassigned tasks on demand when
TaskManager#getTaskOffsetSums() is executed? Or have a
different
encapsulation for the unused task directories?
Best,
Bruno
On 4/10/24 11:31 AM, Nick Telford wrote:
Hi Bruno,
Thanks for the review!
1, 4, 5.
Done
3.
You're right. I've removed the offending paragraph. I had
originally
adapted this from the guarantees outlined in KIP-892. But it's
difficult to
provide these guarantees without the KIP-892 transaction
buffers.
Instead,
we'll add the guarantees back into the JavaDoc when KIP-892
lands.
2.
Good point! This is the only part of the KIP that was
(significantly)
changed when I extracted it from KIP-892. My prototype
currently
maintains
this "cache" of changelog offsets in .checkpoint, but doing so
becomes
very
messy. My intent with this change was to try to better
encapsulate
this
offset "caching", especially for StateStores that can cheaply
provide
the
offsets stored directly in them without needing to duplicate
them
in
this
cache.
It's clear some more work is needed here to better encapsulate
this.
My
immediate thought is: what if we construct *but don't
initialize*
the
StateManager and StateStores for every Task directory on-disk?
That
should
still be quite cheap to do, and would enable us to query the
offsets
for
all on-disk stores, even if they're not open. If the
StateManager
(aka.
ProcessorStateManager/GlobalStateManager) proves too expensive
to
hold
open
for closed stores, we could always have a
"StubStateManager" in
its
place,
that enables the querying of offsets, but nothing else?
IDK, what do you think?
Regards,
Nick
On Tue, 9 Apr 2024 at 15:00, Bruno Cadonna
<cado...@apache.org>
wrote:
Hi Nick,
Thanks for breaking out the KIP from KIP-892!
Here a couple of comments/questions:
1.
In Kafka Streams, we have a design guideline which says to
not
use
the
"get"-prefix for getters on the public API. Could you please
change
getCommittedOffsets() to committedOffsets()?
2.
It is not clear to me how TaskManager#getTaskOffsetSums()
should
read
offsets of tasks the stream thread does not own but that
have a
state
directory on the Streams client by calling
StateStore#getCommittedOffsets(). If the thread does not
own a
task
it
does also not create any state stores for the task, which
means
there
is
no state store on which to call getCommittedOffsets().
I would have rather expected that a checkpoint file is
written
for
all
state stores on close -- not only for the RocksDBStore -- and
that
this
checkpoint file is read in TaskManager#getTaskOffsetSums()
for
the
tasks
that have a state directory on the client but are not
currently
assigned
to any stream thread of the Streams client.
3.
In the javadocs for commit() you write
"... all writes since the last commit(Map), or since
init(StateStore)
*MUST* be available to readers, even after a restart."
This is only true for a clean close before the restart, isn't
it?
If the task fails with a dirty close, Kafka Streams cannot
guarantee
that the in-memory structures of the state store (e.g.
memtable
in
the
case of RocksDB) are flushed so that the records and the
committed
offsets are persisted.
4.
The wrapper that provides the legacy checkpointing
behavior is
actually
an implementation detail. I would remove it from the KIP, but
still
state that the legacy checkpointing behavior will be
supported
when
the
state store does not manage the checkpoints.
5.
Regarding the metrics, could you please add the tags, and the
recording
level (DEBUG or INFO) as done in KIP-607 or KIP-444.
Best,
Bruno
On 4/7/24 5:35 PM, Nick Telford wrote:
Hi everyone,
Based on some offline discussion, I've split out the "Atomic
Checkpointing"
section from KIP-892: Transactional Semantics for
StateStores,
into
its
own
KIP
KIP-1035: StateStore managed changelog offsets
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets
While KIP-892 was adopted *with* the changes outlined in
KIP-1035,
these
changes were always the most contentious part, and continued
to
spur
discussion even after KIP-892 was adopted.
All the changes introduced in KIP-1035 have been removed
from
KIP-892,
and
a hard dependency on KIP-1035 has been added to KIP-892 in
their
place.
I'm hopeful that with some more focus on this set of
changes,
we
can
deliver something that we're all happy with.
Regards,
Nick