On Tue, Mar 5, 2019 at 12:37 AM Ivan Kelly <iv...@apache.org> wrote:

> I think we agree on the general shape of the design as follows.
>
> - Transactions store message data somewhere that it is not
> materialized to the client immediately.
> - On commit, a single message is written to some log
> - Commit messages are then written to the topics of the logs in the
> transaction, which point to the message data and which materialize it
> to the client?
>
> My replies inline assume the above, so if you have a different view of
> the general shape let me know.
>

Yes. We are on the same view of the general shape. I will write down the
details of my proposal and will share it with the community tomorrow.


>
> > Database is very different from a streaming system. Database storage is
> > optimized for randomized
> > accesses, where data tends to be organized in key + version ordering,
> where
> > interleaving committed
> > and uncommitted data is fine.
> > However in a streaming system, interleaving means you have to go
> > back-and-forth between transactions,
>
> Whether we put it interleaved or in a separate ledger, we are going to
> have to do a random read. The only way to avoid a random read is to
> rewrite all the transaction messages after the transaction has
> committed, which means write amplification.
>

There are two layered "random" read. One is "logically" at ledger level,
you need to have some sort of scanning or indexing to jump back-and-forth
between entries, this is at logical level; the other one is the "really" IO.

If you are putting data in a separate ledger, logically the dispatcher will
see entries in order. physically, bookies will optimize storing entries of
same ledgers. for accessing that data of same transaction are most likely
in order.

If you are putting data in an interleaved way, you have implemented logic
at the logical layer (broker) to jump back-and-forth; at storage layer,
since data are interleaved in same ledger, so the access pattern is also
bad. Even you said, the number of transactions can be bound, the access
pattern is not as good as putting in the first option.

Diagrams can easily drawn to compare the differences here. I will
incorporate into the proposal and show the differences.


>
> > I was thinking of one (or multiple) ledger per partition per transaction.
> > this is the simplest solution, where you can delete ledger to abort a
> > transaction. However it will put a pressure to metadata, but we can
> address
> > that by using table service to store those metadata as well as
> transaction
> > statuses.
>
> This is a huge amount of pressure on metadata. With the current
> metadata service, this is 4 ZK writes to commit a transaction (id,
> create, close and anchor it somewhere).
> Then you need to delete the data when the topic gets truncated. You
> need some way to correlate which data ledgers correspond to the
> messages being truncated from the topic, and then the metadata store
> needs to deal with a massive number of delete operations in a short
> time.

I also wouldn't consider table service mature enough to put ledger
> metadata on it yet. It's new, undocumented, and has very little
> production exposure so far. Making a hard dependency on table service,
> would delay transactions by at least a year. Putting transaction
> components themselves on table service is fine, as transactions are
> also a new service, but ledger metadata is the very core of the
> system.
>

I don't think it is a hard dependency. All these components should be done
by interfaces. There should be
no hard dependency on table service. The initial implementation can be
using current zookeeper based ledger storage,
so we can make sure all end-to-end logic are verified. At the same time,
other implementations can be worked on in parallel,
either on table service or on a separate implementation.


>
> > We have a few benefits 1) make sure the entries of a same
> > transaction in the same partitions are kind of continuously stored on
> > bookies. because they will be ordered by ledgerId + entryId.
>
> Clients will not care if the entries from the same transaction are
> contiguous on disk or not. Clients reading are reading from a topic,
> so unless there are multiple entries in the topic from the same
> transaction, they'll only read one entry at a time.
>

I think I know why do you think interleaving is okay now. In your mind,
transactions are carrying one message per partition.

I don't think that's true in streaming space.

A common case of transaction in streaming, is read-transfer-write: read a
batch of messages, process them and write the results to pulsar and
acknowledges the messages. If you are doing this in a 100ms window, the
data can still be large enough, especially the results can be multiple
times of the input messages. With that being said, at high throughput
transactional streaming, data can be large per transaction, continuously
storing entries of same transaction will have a huge benefit.

Another large category of use cases should be considered is "batch" data
processing. You definitely don't want your future "batch"-ish data
processing workload to jump back-and-forth in ledgers. In that way, entries
of same transaction in same partition stored continuously will huge
benefits.


>
> >2) the
> > metadata and transaction status part are handled by the table service,
> > where we are able to support large number of outstanding transactions as
> it
> > scales.
>
> How do you clean up old transactions from table service?
>

Transaction coordinators are responsible for committing, aborting and
cleaning up transactions.


>
> > We can share a ledger across transactions, however it will be similar as
> > interleaving transactions into the data ledgers. You will end up building
> > indexing, caching and compaction logic, which has already implemented on
> > bookies and table service.
>
> I think having a ledger, or a shadow topic, per topic would work fine.
> There's no need for indexing. We can already look up  an message by
> messageid. This message id should be part of the transaction commit
> message, and subsequently part of the commit marker written to each
> topic involved in the transaction.


The assumption you have here is all the message ids can be stored into
within one commit message or the commit markers.



> Caching is no different to any
> other choice of storage for the message data.


Caching behavior is very different from normal streaming case. Since
accessing those entries of same transaction will be jump back-and-forth in
the shadow topic.


> Compaction is unneeded.
>

How are you going to delete data of aborted transactions in any form of
interleaving storage?
If we don't compact, those data of aborted transactions will be left there
forever.

No matter what design we end up with, we will need to put a limit on
> the lifetime of transactions.

This will create an upper bound on the
> amount of extra time we need to retain the shadow topic relative to
> the main topic.
>

If you don't rewrite shadow topic to the main topic, you either have to do
compaction or retain the shadow topic forever, no?


>
> -Ivan
>

Reply via email to