Hi John,
Hi Nick,
Thanks for the interesting ideas!
Here my comments.
1.
It is not clear to me what happens if the cache exceeds its configured
size between two commits? Currently, the cache evicts its entries and
writes dirty entries to the state store. Should the cache write those
dirty entries to the SST files directly? But then, how would the task
that owns the state store or an interactive query query those SST files?
We cannot ingest the SST files into the state store yet, because we
first need to commit the Kafka transaction containing those entries.
Nick, I think you had a similar question a couple of messages ago.
2.
I am wondering how ingesting external SST files affects compaction and
whether Streams will experience more write stalls.
3.
I would really like to get rid of the extra commits triggered by
exceeding the configured size of the transaction as proposed in Nick's
KIP. Avoiding this extra commits would allow us to put all the
transactional logic behind the state store interface without the need to
check for memory sizes before starting to process a record.
However, at the moment, I do not see how to achieve this without the
possibility to spill records to disk. John's (SST ingest) and Alex's
(KIP-844) ideas go in that direction. Alex's idea has a performance
issue and with John's idea it is not clear to me how to query records in
SST files.
I see three options to improve on those two ideas:
a. After the Kafka transaction is committed ingest SST files produced by
the temporary state store proposed by Alex.
That might improve performance since in this way we update the state
store in batches. However, I am not sure how performant clearing the
temporary state store is.
b. Implementing querying of SST files in Java.
This seems quite some work. However, the implementation does not need to
be super performant because the data to query might not be too much and
querying the SST files might also not happen too often.
c. Make WriteBatchWithIndex spill to disk when configured size is exceeded.
That would require a change in RocksDB. For that, we need to either fork
RocksDB or to get this change in one of the next releases. The latter
would also imply to upgrade the RocksDB version in Kafka Streams which
might get us backwards compatibility issues.
Best,
Bruno
On 20.06.23 23:43, Nick Telford wrote:
Here's what I'm thinking: based on Bruno's earlier feedback, I'm going to
try to simplify my original design down such that it needs no/minimal
changes to the public interface.
If that succeeds, then it should also be possible to transparently
implement the "no memtables" solution as a performance optimization when
the record cache is enabled. I consider this approach only an optimisation,
because of the need to still support stores with the cache disabled.
For that reason, I think the "no memtables" approach would probably best be
suited as a follow-up KIP, but that we keep it in mind during the design of
this one.
What do you think?
Regards,
Nick
On Tue, 20 Jun 2023, 22:26 John Roesler, <vvcep...@apache.org> wrote:
Oh, that's a good point.
On the topic of a behavioral switch for disabled caches, the typical use
case for disabling the cache is to cause each individual update to
propagate down the topology, so another thought might be to just go
ahead and add the memory we would have used for the memtables to the
cache size, but if people did disable the cache entirely, then we could
still go ahead and forward the records on each write?
I know that Guozhang was also proposing for a while to actually decouple
caching and forwarding, which might provide a way to side-step this
dilemma (i.e., we just always forward and only apply the cache to state
and changelog writes).
By the way, I'm basing my statement on why you'd disable caches on
memory, but also on the guidance here:
https://docs.confluent.io/platform/current/streams/developer-guide/memory-mgmt.html
. That doc also contains a section on how to bound the total memory
usage across RocksDB memtables, which points to another benefit of
disabling memtables and managing the write buffer ourselves (simplified
memory configuration).
Thanks,
-John
On 6/20/23 16:05, Nick Telford wrote:
Potentially we could just go the memorable with Rocks WriteBatches route
if
the cache is disabled?
On Tue, 20 Jun 2023, 22:00 John Roesler, <j...@vvcephei.org> wrote:
Touché!
Ok, I agree that figuring out the case of a disabled cache would be
non-trivial. Ingesting single-record SST files will probably not be
performant, but benchmarking may prove different. Or maybe we can have
some reserved cache space on top of the user-configured cache, which we
would have reclaimed from the memtable space. Or some other, more
creative solution.
Thanks,
-John
On 6/20/23 15:30, Nick Telford wrote:
Note that users can disable the cache, which would still be
ok, I think. We wouldn't ingest the SST files on every record, but just
append to them and only ingest them on commit, when we're already
waiting for acks and a RocksDB commit.
In this case, how would uncommitted records be read by joins?
On Tue, 20 Jun 2023, 20:51 John Roesler, <vvcep...@apache.org> wrote:
Ah, sorry Nick,
I just meant the regular heap based cache that we maintain in
Streams. I
see that it's not called "RecordCache" (my mistake).
The actual cache is ThreadCache:
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
Here's the example of how we use the cache in KeyValueStore:
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
It's basically just an on-heap Map of records that have not yet been
written to the changelog or flushed into the underlying store. It gets
flushed when the total cache size exceeds `cache.max.bytes.buffering`
or
the `commit.interval.ms` elapses.
Speaking of those configs, another benefit to this idea is that we
would
no longer need to trigger extra commits based on the size of the
ongoing
transaction. Instead, we'd just preserve the existing cache-flush
behavior. Note that users can disable the cache, which would still be
ok, I think. We wouldn't ingest the SST files on every record, but
just
append to them and only ingest them on commit, when we're already
waiting for acks and a RocksDB commit.
Thanks,
-John
On 6/20/23 14:09, Nick Telford wrote:
Hi John,
By "RecordCache", do you mean the RocksDB "WriteBatch"? I can't find
any
class called "RecordCache"...
Cheers,
Nick
On Tue, 20 Jun 2023 at 19:42, John Roesler <vvcep...@apache.org>
wrote:
Hi Nick,
Thanks for picking this up again!
I did have one new thought over the intervening months, which I'd
like
your take on.
What if, instead of using the RocksDB atomic write primitive at all,
we
instead just:
1. disable memtables entirely
2. directly write the RecordCache into SST files when we flush
3. atomically ingest the SST file(s) into RocksDB when we get the
ACK
from the changelog (see
https://github.com/EighteenZi/rocksdb_wiki/blob/master/Creating-and-Ingesting-SST-files.md
and
https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/IngestExternalFileOptions.java
and
https://github.com/facebook/rocksdb/blob/master/include/rocksdb/db.h#L1413-L1429
)
4. track the changelog offsets either in another CF or the same CF
with
a reserved key, either of which will make the changelog offset
update
atomic with the file ingestions
I suspect this'll have a number of benefits:
* writes to RocksDB will always be atomic
* we don't fragment memory between the RecordCache and the memtables
* RecordCache gives far higher performance than memtable for reads
and
writes
* we don't need any new "transaction" concepts or memory bound
configs
What do you think?
Thanks,
-John
On 6/20/23 10:51, Nick Telford wrote:
Hi Bruno,
Thanks for reviewing the KIP. It's been a long road, I started
working
on
this more than a year ago, and most of the time in the last 6
months
has
been spent on the "Atomic Checkpointing" stuff that's been benched,
so
some
of the reasoning behind some of my decisions have been lost, but
I'll
do
my
best to reconstruct them.
1.
IIRC, this was the initial approach I tried. I don't remember the
exact
reasons I changed it to use a separate "view" of the StateStore
that
encapsulates the transaction, but I believe it had something to do
with
concurrent access to the StateStore from Interactive Query threads.
Reads
from interactive queries need to be isolated from the currently
ongoing
transaction, both for consistency (so interactive queries don't
observe
changes that are subsequently rolled-back), but also to prevent
Iterators
opened by an interactive query from being closed and invalidated by
the
StreamThread when it commits the transaction, which causes your
interactive
queries to crash.
Another reason I believe I implemented it this way was a separation
of
concerns. Recall that newTransaction() originally created an object
of
type
Transaction, not StateStore. My intent was to improve the
type-safety
of
the API, in an effort to ensure Transactions weren't used
incorrectly.
Unfortunately, this didn't pan out, but newTransaction() remained.
Finally, this had the added benefit that implementations could
easily
add
support for transactions *without* re-writing their existing,
non-transactional implementation. I think this can be a benefit
both
for
implementers of custom StateStores, but also for anyone extending
RocksDbStore, as they can rely on the existing access methods
working
how
they expect them to.
I'm not too happy with the way the current design has panned out,
so
I'm
open to ideas on how to improve it. Key to this is finding some way
to
ensure that reads from Interactive Query threads are properly
isolated
from
the transaction, *without* the performance overhead of checking
which
thread the method is being called from on every access.
As for replacing flush() with commit() - I saw no reason to add
this
complexity to the KIP, unless there was a need to add arguments to
the
flush/commit method. This need arises with Atomic Checkpointing,
but
that
will be implemented separately, in a future KIP. Do you see a need
for
some
arguments to the flush/commit method that I've missed? Or were you
simply
suggesting a rename?
2.
This is simply due to the practical reason that isolationLevel() is
really
a proxy for checking if the app is under EOS. The application
configuration
is not provided to the constructor of StateStores, but it *is*
provided
to
init(), via StateStoreContext. For this reason, it seemed somewhat
natural
to add it to StateStoreContext. I think this makes sense, since the
IsolationLevel of all StateStores in an application *must* be the
same,
and
since those stores are all initialized with the same
StateStoreContext,
it
seems natural for that context to carry the desired IsolationLevel
to
use.
3.
Using IsolationLevel instead of just passing `boolean eosEnabled`,
like
much of the internals was an attempt to logically de-couple the
StateStore
API from the internals of Kafka Streams. Technically, StateStores
don't
need to know/care what processing mode the KS app is using, all
they
need
to know is the isolation level expected of them.
Having formal definitions for the expectations of the two required
IsolationLevels allow implementers to implement transactional
stores
without having to dig through the internals of Kafka Streams and
understand
exactly how they are used. The tight coupling between state stores
and
internal behaviour has actually significantly hindered my progress
on
this
KIP, and encouraged me to avoid increasing this logical coupling as
much
as
possible.
This also frees implementations to satisfy those requirements in
any
way
they choose. Transactions might not be the only/available approach
to
an
implementation, but they might have an alternative way to satisfy
the
isolation requirements. I admit that this point is more about
semantics,
but "transactional" would need to be formally defined in order for
implementers to provide a valid implementation, and these
IsolationLevels
provide that formal definition.
4.
I can remove them. I added them only as I planned to include them
in
the
org.apache.kafka.streams.state package, as a recommended base
implementation for all StateStores, including those implemented by
users. I
had assumed that anything in "public" packages, such as
org.apache.kafka.streams.state, should be included in a KIP. Is
that
wrong?
5.
RocksDB provides no way to measure the actual size of a
WriteBatch(WithIndex), so we're limited to tracking the sum total
of
the
size of keys + values that are written to the transaction. This
obviously
under-estimates the actual memory usage, because WriteBatch
no-doubt
includes some record overheads, and WriteBatchWithIndex has to
maintain
an
index.
Ideally, we could trivially add a method upstream to
WriteBatchInterface
that provides the exact size of the batch, but that would require
an
upgrade of RocksDB, which won't happen soon. So for the time being,
we're
stuck with an approximation, so I felt that the new method should
reflect
that.
Would you prefer the new method name ignores this constraint and
that
we
simply make the rocks measurement more accurate in the future?
6.
Done
7.
Very good point. The KIP already specifically calls out memory in
the
documentation of the config: "Maximum number of memory bytes to be
used
to
buffer uncommitted state-store records." - did you have something
else
in
mind?
Should we also make this clearer by renaming the config property
itself?
Perhaps to something like statestore.transaction.buffer.max.bytes?
8.
OK, I can remove this. The intent here was to describe how Streams
itself
will manage transaction roll-over etc. Presumably that means we
also
don't
need a description of how Streams will manage the commit of
changelog
transactions, state store transactions and checkpointing?
9.
What do you mean by fail-over? Do you mean failing over an Active
Task
to
an instance already hosting a Standby Task?
Thanks again and sorry for the essay of a response!
Regards,
Nick
On Tue, 20 Jun 2023 at 10:49, Bruno Cadonna <cado...@apache.org>
wrote:
Hi Nick,
Thanks for the updates!
I really appreciate that you simplified the KIP by removing some
aspects. As I have already told you, I think the removed aspects
are
also good ideas and we can discuss them on follow-up KIPs.
Regarding the current KIP, I have the following feedback.
1.
Is there a good reason to add method newTransaction() to the
StateStore
interface? As far as I understand, the idea is that users of a
state
store (transactional or not) call this method at start-up and
after
each
commit. Since the call to newTransaction() is done in any case
and I
think it would simplify the caller code if we just start a new
transaction after a commit in the implementation?
As far as I understand, you plan to commit the transaction in the
flush() method. I find the idea to replace flush() with commit()
presented in KIP-844 an elegant solution.
2.
Why is the method to query the isolation level added to the state
store
context?
3.
Do we need all the isolation level definitions? I think it is good
to
know the guarantees of the transactionality of the state store.
However, currently, Streams guarantees that there will only be one
transaction that writes to the state store. Only the stream thread
that
executes the active task that owns the state store will write to
the
state store. I think it should be enough to know if the state
store
is
transactional or not. So my proposal would be to just add a method
on
the state store interface the returns if a state store is
transactional
or not by returning a boolean or an enum.
4.
I am wondering why AbstractTransaction and
AbstractTransactionalStore
are part of the KIP. They look like implementation details that
should
not be exposed in the public API.
5.
Why does StateStore#approximateNumUncommittedBytes() return an
approximate number of bytes?
6.
RocksDB is just one implementation of the state stores in Streams.
However, the issues regarding OOM errors might also apply to other
custom implementations. So in the KIP I would extract that part
from
section "RocksDB Transaction". I would also move section "RocksDB
Transaction" to the end of section "Proposed Changes" and handle
it
as
an example implementation for a state store.
7.
Should statestore.uncommitted.max.bytes only limit the uncommitted
bytes
or the uncommitted bytes that reside in memory? In future, other
transactional state store implementations might implement a buffer
for
uncommitted records that are able to spill records on disk. I
think
statestore.uncommitted.max.bytes needs to limit the uncommitted
bytes
irrespective if they reside in memory or disk. Since Streams will
use
this config to decide if it needs to trigger a commit, state store
implementations that can spill to disk will never be able to spill
to
disk. You would only need to change the doc of the config, if you
agree
with me.
8.
Section "Transaction Management" about the wrappers is rather a
implementation detail that should not be in the KIP.
9.
Could you add a section that describes how failover will work with
the
transactional state stores? I think section "Error handling" is
already
a good start.
Best,
Bruno
On 15.05.23 11:04, Nick Telford wrote:
Hi everyone,
Quick update: I've added a new section to the KIP: "Offsets for
Consumer
Rebalances", that outlines my solution to the problem that
StreamsPartitionAssignor needs to read StateStore offsets even if
they're
not currently open.
Regards,
Nick
On Wed, 3 May 2023 at 11:34, Nick Telford <
nick.telf...@gmail.com>
wrote:
Hi Bruno,
Thanks for reviewing my proposal.
1.
The main reason I added it was because it was easy to do. If we
see
no
value in it, I can remove it.
2.
Global StateStores can have multiple partitions in their input
topics
(which function as their changelogs), so they would have more
than
one
partition.
3.
That's a good point. At present, the only method it adds is
isolationLevel(), which is likely not necessary outside of
StateStores.
It *does* provide slightly different guarantees in the
documentation
to
several of the methods (hence the overrides). I'm not sure if
this
is
enough to warrant a new interface though.
I think the question that remains is whether this interface
makes
it
easier to implement custom transactional StateStores than if we
were
to
remove it? Probably not.
4.
The main motivation for the Atomic Checkpointing is actually
performance.
My team has been testing out an implementation of this KIP
without
it,
and
we had problems with RocksDB doing *much* more compaction, due
to
the
significantly increased flush rate. It was enough of a problem
that
(for
the time being), we had to revert back to Kafka Streams proper.
I think the best way to solve this, as you say, is to keep the
.checkpoint
files *in addition* to the offsets being stored within the store
itself.
Essentially, when closing StateStores, we force a memtable
flush,
then
call getCommittedOffsets and write those out to the .checkpoint
file.
That would ensure the metadata is available to the
StreamsPartitionAssignor for all closed stores.
If there's a crash (no clean close), then we won't be able to
guarantee
which offsets were flushed to disk by RocksDB, so we'd need to
open
(
init()), read offsets, and then close() those stores. But since
this
is
the exception, and will only occur once (provided it doesn't
crash
every
time!), I think the performance impact here would be acceptable.
Thanks for the feedback, please let me know if you have any more
comments
or questions!
I'm currently working on rebasing against trunk. This involves
adding
support for transactionality to VersionedStateStores. I will
probably
need
to revise my implementation for transactional "segmented"
stores,
both
to
accommodate VersionedStateStore, and to clean up some other
stuff.
Regards,
Nick
On Tue, 2 May 2023 at 13:45, Bruno Cadonna <cado...@apache.org>
wrote:
Hi Nick,
Thanks for the updates!
I have a couple of questions/comments.
1.
Why do you propose a configuration that involves max. bytes and
max.
reords? I think we are mainly concerned about memory
consumption
because
we want to limit the off-heap memory used. I cannot think of a
case
where one would want to set the max. number of records.
2.
Why does
default void commit(final Map<TopicPartition, Long>
changelogOffsets) {
flush();
}
take a map of partitions to changelog offsets?
The mapping between state stores to partitions is a 1:1
relationship.
Passing in a single changelog offset should suffice.
3.
Why do we need the Transaction interface? It should be possible
to
hide
beginning and committing a transactions withing the state store
implementation, so that from outside the state store, it does
not
matter
whether the state store is transactional or not. What would be
the
advantage of using the Transaction interface?
4.
Regarding checkpointing offsets, I think we should keep the
checkpoint
file in any case for the reason you mentioned about
rebalancing.
Even
if
that would not be an issue, I would propose to move the change
to
offset
management to a new KIP and to not add more complexity than
needed
to
this one. I would not be too concerned about the consistency
violation
you mention. As far as I understand, with transactional state
stores
Streams would write the checkpoint file during every commit
even
under
EOS. In the failure case you describe, Streams would restore
the
state
stores from the offsets found in the checkpoint file written
during
the
penultimate commit instead of during the last commit.
Basically,
Streams
would overwrite the records written to the state store between
the
last
two commits with the same records read from the changelogs.
While I
understand that this is wasteful, it is -- at the same time --
acceptable and most importantly it does not break EOS.
Best,
Bruno
On 27.04.23 12:34, Nick Telford wrote:
Hi everyone,
I find myself (again) considering removing the offset
management
from
StateStores, and keeping the old checkpoint file system. The
reason
is
that
the StreamPartitionAssignor directly reads checkpoint files in
order
to
determine which instance has the most up-to-date copy of the
local
state.
If we move offsets into the StateStore itself, then we will
need
to
open,
initialize, read offsets and then close each StateStore (that
is
not
already assigned and open) for which we have *any* local
state,
on
every
rebalance.
Generally, I don't think there are many "orphan" stores like
this
sitting
around on most instances, but even a few would introduce
additional
latency
to an already somewhat lengthy rebalance procedure.
I'm leaning towards Colt's (Slack) suggestion of just keeping
things
in
the
checkpoint file(s) for now, and not worrying about the race.
The
downside
is that we wouldn't be able to remove the explicit RocksDB
flush
on-commit,
which likely hurts performance.
If anyone has any thoughts or ideas on this subject, I would
appreciate
it!
Regards,
Nick
On Wed, 19 Apr 2023 at 15:05, Nick Telford <
nick.telf...@gmail.com>
wrote:
Hi Colt,
The issue is that if there's a crash between 2 and 3, then
you
still
end
up with inconsistent data in RocksDB. The only way to
guarantee
that
your
checkpoint offsets and locally stored data are consistent
with
each
other
are to atomically commit them, which can be achieved by
having
the
offsets
stored in RocksDB.
The offsets column family is likely to be extremely small
(one
per-changelog partition + one per Topology input partition
for
regular
stores, one per input partition for global stores). So the
overhead
will be
minimal.
A major benefit of doing this is that we can remove the
explicit
calls
to
db.flush(), which forcibly flushes memtables to disk
on-commit.
It
turns
out, RocksDB memtable flushes are largely dictated by Kafka
Streams
commits, *not* RocksDB configuration, which could be a major
source
of
confusion. Atomic checkpointing makes it safe to remove these
explicit
flushes, because it no longer matters exactly when RocksDB
flushes
data to
disk; since the data and corresponding checkpoint offsets
will
always
be
flushed together, the local store is always in a consistent
state,
and
on-restart, it can always safely resume restoration from the
on-disk
offsets, restoring the small amount of data that hadn't been
flushed
when
the app exited/crashed.
Regards,
Nick
On Wed, 19 Apr 2023 at 14:35, Colt McNealy <
c...@littlehorse.io>
wrote:
Nick,
Thanks for your reply. Ack to A) and B).
For item C), I see what you're referring to. Your proposed
solution
will
work, so no need to change it. What I was suggesting was
that
it
might be
possible to achieve this with only one column family. So
long
as:
- No uncommitted records (i.e. not committed to the
changelog)
are
*committed* to the state store, AND
- The Checkpoint offset (which refers to the
changelog
topic)
is
less
than or equal to the last written changelog offset
in
rocksdb
I don't see the need to do the full restoration from
scratch.
My
understanding was that prior to 844/892, full restorations
were
required
because there could be uncommitted records written to
RocksDB;
however,
given your use of RocksDB transactions, that can be avoided
with
the
pattern of 1) commit Kafka transaction, 2) commit RocksDB
transaction, 3)
update offset in checkpoint file.
Anyways, your proposed solution works equivalently and I
don't
believe
there is much overhead to an additional column family in
RocksDB.
Perhaps
it may even perform better than making separate writes to
the
checkpoint
file.
Colt McNealy
*Founder, LittleHorse.io*
On Wed, Apr 19, 2023 at 5:53 AM Nick Telford <
nick.telf...@gmail.com>
wrote:
Hi Colt,
A. I've done my best to de-couple the StateStore stuff from
the
rest
of
the
Streams engine. The fact that there will be only one
ongoing
(write)
transaction at a time is not guaranteed by any API, and is
just a
consequence of the way Streams operates. To that end, I
tried
to
ensure
the
documentation and guarantees provided by the new APIs are
independent of
this incidental behaviour. In practice, you're right, this
essentially
refers to "interactive queries", which are technically
"read
transactions",
even if they don't actually use the transaction API to
isolate
themselves.
B. Yes, although not ideal. This is for backwards
compatibility,
because:
1) Existing custom StateStore implementations
will
implement
flush(),
and not commit(), but the Streams engine now calls
commit(),
so
those
calls
need to be forwarded to flush() for these legacy stores.
2) Existing StateStore *users*, i.e. outside of
the
Streams
engine
itself, may depend on explicitly calling flush(), so for
these
cases,
flush() needs to be redirected to call commit().
If anyone has a better way to guarantee compatibility
without
introducing
this potential recursion loop, I'm open to changes!
C. This is described in the "Atomic Checkpointing" section.
Offsets
are
stored in a separate RocksDB column family, which is
guaranteed
to
be
atomically flushed to disk with all other column families.
The
issue
of
checkpoints being written to disk after commit causing
inconsistency
if
it
crashes in between is the reason why, under EOS, checkpoint
files
are
only
written on clean shutdown. This is one of the major causes
of
"full
restorations", so moving the offsets into a place where
they
can
be
guaranteed to be atomically written with the data they
checkpoint
allows us
to write the checkpoint offsets *on every commit*, not just
on
clean
shutdown.
Regards,
Nick
On Tue, 18 Apr 2023 at 15:39, Colt McNealy <
c...@littlehorse.io>
wrote:
Nick,
Thank you for continuing this work. I have a few minor
clarifying
questions.
A) "Records written to any transaction are visible to all
other
transactions immediately." I am confused here—I thought
there
could
only
be
one transaction going on at a time for a given state store
given
the
threading model for processing records on a Task. Do you
mean
Interactive
Queries by "other transactions"? (If so, then everything
makes
sense—I
thought that since IQ were read-only then they didn't
count
as
transactions).
B) Is it intentional that the default implementations of
the
flush()
and
commit() methods in the StateStore class refer to each
other
in
some
sort
of unbounded recursion?
C) How will the getCommittedOffset() method work? At
first I
thought
the
way to do it would be using a special key in the RocksDB
store
to
store
the
offset, and committing that with the transaction. But upon
second
thought,
since restoration from the changelog is an idempotent
procedure, I
think
it
would be fine to 1) commit the RocksDB transaction and
then
2)
write
the
offset to disk in a checkpoint file. If there is a crash
between
1)
and
2),
I think the only downside is now we replay a few more
records
(at
a
cost
of
<100ms). Am I missing something there?
Other than that, everything makes sense to me.
Cheers,
Colt McNealy
*Founder, LittleHorse.io*
On Tue, Apr 18, 2023 at 3:59 AM Nick Telford <
nick.telf...@gmail.com>
wrote:
Hi everyone,
I've updated the KIP to reflect the latest version of the
design:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
There are several changes in there that reflect feedback
from
this
thread,
and there's a new section and a bunch of interface
changes
relating
to
Atomic Checkpointing, which is the final piece of the
puzzle
to
making
everything robust.
Let me know what you think!
Regards,
Nick
On Tue, 3 Jan 2023 at 11:33, Nick Telford <
nick.telf...@gmail.com>
wrote:
Hi Lucas,
Thanks for looking over my KIP.
A) The bound is per-instance, not per-Task. This was a
typo
in
the
KIP
that I've now corrected. It was originally per-Task,
but I
changed it
to
per-instance for exactly the reason you highlighted.
B) It's worth noting that transactionality is only
enabled
under
EOS,
and
in the default mode of operation (ALOS), there should be
no
change in
behavior at all. I think, under EOS, we can mitigate the
impact
on
users
by
sufficiently low default values for the memory bound
configuration. I
understand your hesitation to include a significant
change
of
behaviour,
especially in a minor release, but I suspect that most
users
will
prefer
the memory impact (under EOS) to the existing behaviour
of
frequent
state
restorations! If this is a problem, the changes can wait
until
the
next
major release. I'll be running a patched version of
streams
in
production
with these changes as soon as they're ready, so it won't
disrupt
me
:-D
C) The main purpose of this sentence was just to note
that
some
changes
will need to be made to the way Segments are handled in
order
to
ensure
they also benefit from transactions. At the time I wrote
it, I
hadn't
figured out the specific changes necessary, so it was
deliberately
vague.
This is the one outstanding problem I'm currently
working
on,
and
I'll
update this section with more detail once I have figured
out
the
exact
changes required.
D) newTransaction() provides the necessary isolation
guarantees.
While
the RocksDB implementation of transactions doesn't
technically
*need*
read-only users to call newTransaction(), other
implementations
(e.g. a
hypothetical PostgresStore) may require it. Calling
newTransaction()
when
no transaction is necessary is essentially free, as it
will
just
return
this.
I didn't do any profiling of the KIP-844 PoC, but I
think
it
should
be
fairly obvious where the performance problems stem from:
writes
under
KIP-844 require 3 extra memory-copies: 1 to encode it
with
the
tombstone/record flag, 1 to decode it from the
tombstone/record
flag,
and 1
to copy the record from the "temporary" store to the
"main"
store,
when
the
transaction commits. The different approach taken by
KIP-869
should
perform
much better, as it avoids all these copies, and may
actually
perform
slightly better than trunk, due to batched writes in
RocksDB
performing
better than non-batched writes.[1]
Regards,
Nick
1:
https://github.com/adamretter/rocksjava-write-methods-benchmark#results
On Mon, 2 Jan 2023 at 16:18, Lucas Brutschy <
lbruts...@confluent.io
.invalid>
wrote:
Hi Nick,
I'm just starting to read up on the whole discussion
about
KIP-892
and
KIP-844. Thanks a lot for your work on this, I do think
`WriteBatchWithIndex` may be the way to go here. I do
have
some
questions about the latest draft.
A) If I understand correctly, you propose to
put a
bound
on
the
(native) memory consumed by each task. However, I
wonder
if
this
is
sufficient if we have temporary imbalances in the
cluster.
For
example, depending on the timing of rebalances during a
cluster
restart, it could happen that a single streams node is
assigned a
lot
more tasks than expected. With your proposed change,
this
would
mean
that the memory required by this one node could be a
multiple
of
what
is required during normal operation. I wonder if it
wouldn't
be
safer
to put a global bound on the memory use, across all
tasks.
B) Generally, the memory concerns still give me
the
feeling
that
this
should not be enabled by default for all users in a
minor
release.
C) In section "Transaction Management": the
sentence
"A
similar
analogue will be created to automatically manage
`Segment`
transactions.". Maybe this is just me lacking some
background,
but I
do not understand this, it would be great if you could
clarify
what
you mean here.
D) Could you please clarify why IQ has to call
newTransaction(),
when
it's read-only.
And one last thing not strictly related to your KIP: if
there
is
an
easy way for you to find out why the KIP-844 PoC is 20x
slower
(e.g.
by providing a flame graph), that would be quite
interesting.
Cheers,
Lucas
On Thu, Dec 22, 2022 at 8:30 PM Nick Telford <
nick.telf...@gmail.com>
wrote:
Hi everyone,
I've updated the KIP with a more detailed design,
which
reflects
the
implementation I've been working on:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
This new design should address the outstanding points
already
made
in
the
thread.
Please let me know if there are areas that are unclear
or
need
more
clarification.
I have a (nearly) working implementation. I'm
confident
that
the
remaining
work (making Segments behave) will not impact the
documented
design.
Regards,
Nick
On Tue, 6 Dec 2022 at 19:24, Colt McNealy <
c...@littlehorse.io
wrote:
Nick,
Thank you for the reply; that makes sense. I was
hoping
that,
since
reading
uncommitted records from IQ in EOS isn't part of the
documented
API,
maybe
you *wouldn't* have to wait for the next major
release
to
make
that
change;
but given that it would be considered a major
change, I
like
your
approach
the best.
Wishing you a speedy recovery and happy coding!
Thanks,
Colt McNealy
*Founder, LittleHorse.io*
On Tue, Dec 6, 2022 at 10:30 AM Nick Telford <
nick.telf...@gmail.com>
wrote:
Hi Colt,
10: Yes, I agree it's not ideal. I originally
intended
to
try
to
keep the
behaviour unchanged as much as possible, otherwise
we'd
have
to
wait for
a
major version release to land these changes.
20: Good point, ALOS doesn't need the same level of
guarantee,
and
the
typically longer commit intervals would be
problematic
when
reading
only
"committed" records.
I've been away for 5 days recovering from minor
surgery,
but I
spent a
considerable amount of that time working through
ideas
for
possible
solutions in my head. I think your suggestion of
keeping
ALOS
as-is, but
buffering writes for EOS is the right path forwards,
although
I
have a
solution that both expands on this, and provides for
some
more
formal
guarantees.
Essentially, adding support to KeyValueStores for
"Transactions",
with
clearly defined IsolationLevels. Using "Read
Committed"
when
under
EOS,
and
"Read Uncommitted" under ALOS.
The nice thing about this approach is that it gives
us
much
more
clearly
defined isolation behaviour that can be properly
documented to
ensure
users
know what to expect.
I'm still working out the kinks in the design, and
will
update
the
KIP
when
I have something. The main struggle is trying to
implement
this
without
making any major changes to the existing interfaces
or
breaking
existing
implementations, because currently everything
expects
to
operate
directly
on a StateStore, and not a Transaction of that
store.
I
think
I'm
getting
close, although sadly I won't be able to progress
much
until
next
week
due
to some work commitments.
Regards,
Nick
On Thu, 1 Dec 2022 at 00:01, Colt McNealy <
c...@littlehorse.io>