Hi John,

Thanks for the review and feedback!

1. Custom Stores: I've been mulling over this problem myself. As it stands,
custom stores would essentially lose checkpointing with no indication that
they're expected to make changes, besides a line in the release notes. I
agree that the best solution would be to provide a default that checkpoints
to a file. The one thing I would change is that the checkpointing is to a
store-local file, instead of a per-Task file. This way the StateStore still
technically owns its own checkpointing (via a default implementation), and
the StateManager/Task execution engine doesn't need to know anything about
checkpointing, which greatly simplifies some of the logic.

2. OOME errors: The main reasons why I didn't explore a solution to this is
a) to keep this KIP as simple as possible, and b) because I'm not exactly
how to signal that a Task should commit prematurely. I'm confident it's
possible, and I think it's worth adding a section on handling this. Besides
my proposal to force an early commit once memory usage reaches a threshold,
is there any other approach that you might suggest for tackling this
problem?

3. ALOS: I can add in an explicit paragraph, but my assumption is that
since transactional behaviour comes at little/no cost, that it should be
available by default on all stores, irrespective of the processing mode.
While ALOS doesn't use transactions, the Task itself still "commits", so
the behaviour should be correct under ALOS too. I'm not convinced that it's
worth having both transactional/non-transactional stores available, as it
would considerably increase the complexity of the codebase, for very little
benefit.

4. Method deprecation: Are you referring to StateStore#getPosition()? As I
understand it, Position contains the position of the *source* topics,
whereas the commit offsets would be the *changelog* offsets. So it's still
necessary to retain the Position data, as well as the changelog offsets.
What I meant in the KIP is that Position offsets are currently stored in a
file, and since we can atomically store metadata along with the record
batch we commit to RocksDB, we can move our Position offsets in to this
metadata too, and gain the same transactional guarantees that we will for
changelog offsets, ensuring that the Position offsets are consistent with
the records that are read from the database.

Regards,
Nick

On Tue, 22 Nov 2022 at 16:25, John Roesler <vvcep...@apache.org> wrote:

> Thanks for publishing this alternative, Nick!
>
> The benchmark you mentioned in the KIP-844 discussion seems like a
> compelling reason to revisit the built-in transactionality mechanism. I
> also appreciate you analysis, showing that for most use cases, the write
> batch approach should be just fine.
>
> There are a couple of points that would hold me back from approving this
> KIP right now:
>
> 1. Loss of coverage for custom stores.
> The fact that you can plug in a (relatively) simple implementation of the
> XStateStore interfaces and automagically get a distributed database out of
> it is a significant benefit of Kafka Streams. I'd hate to lose it, so it
> would be better to spend some time and come up with a way to preserve that
> property. For example, can we provide a default implementation of
> `commit(..)` that re-implements the existing checkpoint-file approach? Or
> perhaps add an `isTransactional()` flag to the state store interface so
> that the runtime can decide whether to continue to manage checkpoint files
> vs delegating transactionality to the stores?
>
> 2. Guarding against OOME
> I appreciate your analysis, but I don't think it's sufficient to say that
> we will solve the memory problem later if it becomes necessary. The
> experience leading to that situation would be quite bad: Imagine, you
> upgrade to AK 3.next, your tests pass, so you deploy to production. That
> night, you get paged because your app is now crashing with OOMEs. As with
> all OOMEs, you'll have a really hard time finding the root cause, and once
> you do, you won't have a clear path to resolve the issue. You could only
> tune down the commit interval and cache buffer size until you stop getting
> crashes.
>
> FYI, I know of multiple cases where people run EOS with much larger commit
> intervals to get better batching than the default, so I don't think this
> pathological case would be as rare as you suspect.
>
> Given that we already have the rudiments of an idea of what we could do to
> prevent this downside, we should take the time to design a solution. We owe
> it to our users to ensure that awesome new features don't come with bitter
> pills unless we can't avoid it.
>
> 3. ALOS mode.
> On the other hand, I didn't see an indication of how stores will be
> handled under ALOS (aka non-EOS) mode. Theoretically, the transactionality
> of the store and the processing mode are orthogonal. A transactional store
> would serve ALOS just as well as a non-transactional one (if not better).
> Under ALOS, though, the default commit interval is five minutes, so the
> memory issue is far more pressing.
>
> As I see it, we have several options to resolve this point. We could
> demonstrate that transactional stores work just fine for ALOS and we can
> therefore just swap over unconditionally. We could also disable the
> transactional mechanism under ALOS so that stores operate just the same as
> they do today when run in ALOS mode. Finally, we could do the same as in
> KIP-844 and make transactional stores opt-in (it'd be better to avoid the
> extra opt-in mechanism, but it's a good get-out-of-jail-free card).
>
> 4. (minor point) Deprecation of methods
>
> You mentioned that the new `commit` method replaces flush,
> updateChangelogOffsets, and checkpoint. It seems to me that the point about
> atomicity and Position also suggests that it replaces the Position
> callbacks. However, the proposal only deprecates `flush`. Should we be
> deprecating other methods as well?
>
> Thanks again for the KIP! It's really nice that you and Alex will get the
> chance to collaborate on both directions so that we can get the best
> outcome for Streams and its users.
>
> -John
>
>
> On 2022/11/21 15:02:15 Nick Telford wrote:
> > Hi everyone,
> >
> > As I mentioned in the discussion thread for KIP-844, I've been working on
> > an alternative approach to achieving better transactional semantics for
> > Kafka Streams StateStores.
> >
> > I've published this separately as KIP-892: Transactional Semantics for
> > StateStores
> > <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
> >,
> > so that it can be discussed/reviewed separately from KIP-844.
> >
> > Alex: I'm especially interested in what you think!
> >
> > I have a nearly complete implementation of the changes outlined in this
> > KIP, please let me know if you'd like me to push them for review in
> advance
> > of a vote.
> >
> > Regards,
> >
> > Nick
> >
>

Reply via email to