Hi Nick,

1.
Yeah, I agree with you. That was actually also my point. I understood that John was proposing the ingestion path as a way to avoid the early commits. Probably, I misinterpreted the intent.

2.
I agree with John here, that actually it is public API. My question is how this usage pattern affects normal processing.

3.
My concern is that checking for the size of the transaction buffer and maybe triggering an early commit affects the whole processing of Kafka Streams. The transactionality of a state store is not confined to the state store itself, but spills over and changes the behavior of other parts of the system. I agree with you that it is a decent compromise. I just wanted to analyse the downsides and list the options to overcome them. I also agree with you that all options seem quite heavy compared with your KIP. I do not understand what you mean with "less predictable for users", though.


I found the discussions about the alternatives really interesting. But I also think that your plan sounds good and we should continue with it!


Some comments on your reply to my e-mail on June 20th:

3.
Ah, now, I understand the reasoning behind putting isolation level in the state store context. Thanks! Should that also be a way to give the the state store the opportunity to decide whether to turn on transactions or not? With my comment, I was more concerned about how do you know if a checkpoint file needs to be written under EOS, if you do not have a way to know if the state store is transactional or not. If a state store is transactional, the checkpoint file can be written during normal processing under EOS. If the state store is not transactional, the checkpoint file must not be written under EOS.

7.
My point was about not only considering the bytes in memory in config statestore.uncommitted.max.bytes, but also bytes that might be spilled on disk. Basically, I was wondering whether you should remove the "memory" in "Maximum number of memory bytes to be used to buffer uncommitted state-store records." My thinking was that even if a state store spills uncommitted bytes to disk, limiting the overall bytes might make sense. Thinking about it again and considering the recent discussions, it does not make too much sense anymore.
I like the name statestore.transaction.buffer.max.bytes that you proposed.

8.
A high-level description (without implementation details) of how Kafka Streams will manage the commit of changelog transactions, state store transactions and checkpointing would be great. Would be great if you could also add some sentences about the behavior in case of a failure. For instance how does a transactional state store recover after a failure or what happens with the transaction buffer, etc. (that is what I meant by "fail-over" in point 9.)

Best,
Bruno

On 21.06.23 18:50, Nick Telford wrote:
Hi Bruno,

1.
Isn't this exactly the same issue that WriteBatchWithIndex transactions
have, whereby exceeding (or likely to exceed) configured memory needs to
trigger an early commit?

2.
This is one of my big concerns. Ultimately, any approach based on cracking
open RocksDB internals and using it in ways it's not really designed for is
likely to have some unforseen performance or consistency issues.

3.
What's your motivation for removing these early commits? While not ideal, I
think they're a decent compromise to ensure consistency whilst maintaining
good and predictable performance.
All 3 of your suggested ideas seem *very* complicated, and might actually
make behaviour less predictable for users as a consequence.

I'm a bit concerned that the scope of this KIP is growing a bit out of
control. While it's good to discuss ideas for future improvements, I think
it's important to narrow the scope down to a design that achieves the most
pressing objectives (constant sized restorations during dirty
close/unexpected errors). Any design that this KIP produces can ultimately
be changed in the future, especially if the bulk of it is internal
behaviour.

I'm going to spend some time next week trying to re-work the original
WriteBatchWithIndex design to remove the newTransaction() method, such that
it's just an implementation detail of RocksDBStore. That way, if we want to
replace WBWI with something in the future, like the SST file management
outlined by John, then we can do so with little/no API changes.

Regards,

Nick

Reply via email to