Hi Alexander,
Thanks for the KIP! This seems like a great proposal. I have the same
opinion as John on the Configuration part though. I think the 2 level
config and its behaviour based on the setting/unsetting of the flag seems
confusing to me as well. Since the KIP seems specifically centred around
RocksDB it might be better to add it at the Supplier level as John
suggested.
On similar lines, this config name => *statestore.transactional.mechanism
*may
also need rethinking as the value assigned to it(rocksdb_indexbatch)
implicitly seems to assume that rocksdb is the only statestore that Kafka
Stream supports while that's not the case.
Also, regarding the potential memory pressure that can be introduced by
WriteBatchIndex, do you think it might make more sense to include some
numbers/benchmarks on how much the memory consumption might increase?
Lastly, the read_uncommitted flag's behaviour on IQ may need more
elaboration.
These points aside, as I said, this is a great proposal!
Thanks!
Sagar.
On Tue, May 24, 2022 at 10:35 PM John Roesler <vvcep...@apache.org> wrote:
Thanks for the KIP, Alex!
I'm really happy to see your proposal. This improvement fills a
long-standing gap.
I have a few questions:
1. Configuration
The KIP only mentions RocksDB, but of course, Streams also ships with an
InMemory store, and users also plug in their own custom state stores. It
is
also common to use multiple types of state stores in the same application
for different purposes.
Against this backdrop, the choice to configure transactionality as a
top-level config, as well as to configure the store transaction mechanism
as a top-level config, seems a bit off.
Did you consider instead just adding the option to the
RocksDB*StoreSupplier classes and the factories in Stores ? It seems like
the desire to enable the feature by default, but with a feature-flag to
disable it was a factor here. However, as you pointed out, there are some
major considerations that users should be aware of, so opt-in doesn't
seem
like a bad choice, either. You could add an Enum argument to those
factories like `RocksDBTransactionalMechanism.{NONE,
Some points in favor of this approach:
* Avoid "stores that don't support transactions ignore the config"
complexity
* Users can choose how to spend their memory budget, making some stores
transactional and others not
* When we add transactional support to in-memory stores, we don't have to
figure out what to do with the mechanism config (i.e., what do you set
the
mechanism to when there are multiple kinds of transactional stores in the
topology?)
2. caching/flushing/transactions
The coupling between memory usage and flushing that you mentioned is a
bit
troubling. It also occurs to me that there seems to be some relationship
with the existing record cache, which is also an in-memory holding area
for
records that are not yet written to the cache and/or store (albeit with
no
particular semantics). Have you considered how all these components
should
relate? For example, should a "full" WriteBatch actually trigger a flush
so
that we don't get OOMEs? If the proposed transactional mechanism forces
all
uncommitted writes to be buffered in memory, until a commit, then what is
the advantage over just doing the same thing with the RecordCache and not
introducing the WriteBatch at all?
3. ALOS
You mentioned that a transactional store can help reduce duplication in
the case of ALOS. We might want to be careful about claims like that.
Duplication isn't the way that repeated processing manifests in state
stores. Rather, it is in the form of dirty reads during reprocessing.
This
feature may reduce the incidence of dirty reads during reprocessing, but
not in a predictable way. During regular processing today, we will send
some records through to the changelog in between commit intervals. Under
ALOS, if any of those dirty writes gets committed to the changelog topic,
then upon failure, we have to roll the store forward to them anyway,
regardless of this new transactional mechanism. That's a fixable problem,
by the way, but this KIP doesn't seem to fix it. I wonder if we should
make
any claims about the relationship of this feature to ALOS if the
real-world
behavior is so complex.
4. IQ
As a reminder, we have a new IQv2 mechanism now. Should we propose any
changes to IQv1 to support this transactional mechanism, versus just
proposing it for IQv2? Certainly, it seems strange only to propose a
change
for IQv1 and not v2.
Regarding your proposal for IQv1, I'm unsure what the behavior should be
for readCommitted, since the current behavior also reads out of the
RecordCache. I guess if readCommitted==false, then we will continue to
read
from the cache first, then the Batch, then the store; and if
readCommitted==true, we would skip the cache and the Batch and only read
from the persistent RocksDB store?
What should IQ do if I request to readCommitted on a non-transactional
store?
Thanks again for proposing the KIP, and my apologies for the long reply;
I'm hoping to air all my concerns in one "batch" to save time for you.
Thanks,
-John
On Tue, May 24, 2022, at 03:45, Alexander Sorokoumov wrote:
Hi all,
I've written a KIP for making Kafka Streams state stores transactional
and
would like to start a discussion:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-844%3A+Transactional+State+Stores
Best,
Alex