On Mon, Mar 4, 2019 at 6:15 PM Ivan Kelly <iv...@apache.org> wrote:

> > > Transactional acknowledgement also needs to be taken into account
> >
> > I don't think we have to treat `transactional acknowledgement` as a
> special
> > case. currently `acknowledgment` are actually "append" operations into
> > cursor ledgers.
> > So the problem set can be reduced as `atomic append` to both data ledgers
> > and cursor ledgers. in that way, we can use one solution for handling
> > appending data and updating cursors.
>
> Acknowledges are different though as they have to take conflicts into
> account. For publishing messages, transactions are only really a case
> of making sure if one message from the transaction is visible, then
> all messages from the transaction are visible, i.e. atomicity. There's
> no need for isolation or consistency in ACID terms because all these
> messages are independent. However, in the case of acknowledgements, we
> want to ensure that only one transaction can acknowledges a specific
> message (leaving cumulative ack aside for now). This means that before
> committing the transaction we need to check if the transaction can be
> committed. If any message acknowledged by the transaction has already
> been acknowledged at this point, then we need to abort the transaction
> as a conflict has occurred.
>

Sure for individual acks. Yes, we need the logic for checking conflicts.


>
> > Kafka's implementation is interleaving committed messages with
> uncommitted
> > messages at storage. Personally I think it is a very ugly design and
> > implementation.
>
> I don't think it's particularly ugly. Databases using MVCC do
> something similar.


Database is very different from a streaming system. Database storage is
optimized for randomized
accesses, where data tends to be organized in key + version ordering, where
interleaving committed
and uncommitted data is fine.

However in a streaming system, interleaving means you have to go
back-and-forth between transactions,
and you have to reply on either client side caching (make client
implementation much complicated), or
broker side caching (make dispatching much complicated).

Caching only works on one assumption, where transactions are live for a
very short time, and there are
no a lot of interleaving transactions. Long transactions mixing with
short-live transactions, or a lot of interleaving
transactions don't work out well.


> This has a nice property for the clients. Our
> transactional system should be optimistic, as the only cause for
> aborts will be client crashes and conflicts on acks, both of which
> should be rare events. Transactions will likely live for a very short
> time.


Kafka can optimize this on caching uncommitted data because Kafka's
consumption model is fairly straightforward
and simple. While Pulsar is relying on broker side dispatch, if we started
caching interleaving transactions at broker side,
that means we have to touch a lot on broker dispatching logic.


> So if the data is interleaved, it can be cached at the client,
> and the client can then surface it to the user when then commit
> arrives shortly afterwards, without any modification to the dispatch
> path.

It also gives clients the ability to read uncommitted data,
> though i don't know why they would want to.
>

Exposing uncommitted data is actually a very confused behavior to clients.
Even we can do so, I don't think we should expose to users.
I would prefer a much clean delivery semantic around transactions.


>
> > 1) for any transaction, write the messages to a separate ledger (or
> multiple separate ledger).
>
> When you say a separate ledger, are you talking about a ledger per
> transaction? Or is there one log for the system?
>

I was thinking of one (or multiple) ledger per partition per transaction.
this is the simplest solution, where you can delete ledger to abort a
transaction. However it will put a pressure to metadata, but we can address
that by using table service to store those metadata as well as transaction
statuses. We have a few benefits 1) make sure the entries of a same
transaction in the same partitions are kind of continuously stored on
bookies. because they will be ordered by ledgerId + entryId. 2) the
metadata and transaction status part are handled by the table service,
where we are able to support large number of outstanding transactions as it
scales.

We can share a ledger across transactions, however it will be similar as
interleaving transactions into the data ledgers. You will end up building
indexing, caching and compaction logic, which has already implemented on
bookies and table service.

- Sijie


>
> -Ivan
>

Reply via email to