Kafka's implementation is interleaving committed messages with uncommitted
messages at storage. Personally I think it is a very ugly design and
implementation.

Pulsar is a segment centric system, where we have a shared segment storage
- bookkeeper. I think a better direction is to leverage the segments (aka
ledgers)
for buffering uncommitted messages and commit the whole segment when the
whole transaction is committed.

A rough idea would be:

1) for any transaction, write the messages to a separate ledger (or
multiple separate ledger).
2) during the transaction, accumulates the messages in those ledgers.
3) when commit, merge the txn ledgers back to the main data ledger. the
merge can be done either adding a meta message where data is stored in the
txn ledger or actually copying the data to data ledger (depending on the
size of data accumulate in the transaction).
4) when abort, delete the txn ledger. No other additional work to be done.

This would be producing a much clear design than Kafka.

On Ivan's comments:

> Transactional acknowledgement also needs to be taken into account

I don't think we have to treat `transactional acknowledgement` as a special
case. currently `acknowledgment` are actually "append" operations into
cursor ledgers.
So the problem set can be reduced as `atomic append` to both data ledgers
and cursor ledgers. in that way, we can use one solution for handling
appending data and updating cursors.

Additionally, I think a related topic about transactions would be
supporting large sized message (e.g. >= 5MB). If we take the approach I
described above using a separated ledger for accumulating messages for a
transaction, that we are easy to model a large size message as a
transaction of chunked messages.

@Richard, @Ivan let me know what do you think. If you guys think the
direction I raised is a good one to go down, I am happy to write them down
into details, and drive the design and coordinate the implementations in
the community.

- Sijie

On Sat, Mar 2, 2019 at 9:45 AM Richard Yu <yohan.richard...@gmail.com>
wrote:

> Hi all,
>
> We might be able to get some ideas on implementing this from Kafka:
>
> https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka
>
> Obviously, there is some differences in Kafka and Pulsar internals but at
> some level, the implementation would be similar.
> It should help.
>
>
>
> On Thu, Feb 28, 2019 at 4:29 PM Richard Yu <yohan.richard...@gmail.com>
> wrote:
>
> > Hi,
> >
> > Per request, I've created a doc so we could get some more input in an
> > organized manner:
> >
> >
> https://docs.google.com/document/d/1mSUsJvPgThnWizeQqljKU5244BMEiYHabA6OP6QEHmQ/edit?usp=sharing
> >
> > And for Ivan's questions, I would answer accordingly.
> >
> > >By "set the message to unknown", do you mean the broker will cache the
> > >message, not writing it to any log?
> >
> > We wouldn't cache the message from my interpretation of the steps. What
> > the producer is first sending is a pre-processing message, not the real
> > message itself. This step basically notifies the broker that the message
> is
> > on its way. So all we have to do is store the message id and its
> > corresponding status in a map, and depending on the producer's response,
> > the status will change accordingly.
> >
> > > In designs we've discussed previously, this was handled
> > > by a component called the transaction coordinator, which is a logical
> > > component which each broker knows how to talk to. For a transaction
> > > the commit message is sent to the coordinator, which writes it to its
> > > own log, and then goes through each topic in the commit and marks the
> > > transaction as completed.
> >
> > I wasn't aware of previous discussions on this topic, but it seems pretty
> > good to me. It's certainly better than what I would come up with.
> > If there's any more things we need to talk about, I suppose we could move
> > it to the google doc to play around with.
> >
> > Hope we can get this PIP rolling.
> >
> >
> > On Thu, Feb 28, 2019 at 2:53 AM Sijie Guo <guosi...@gmail.com> wrote:
> >
> >> Richard,
> >>
> >> Thank you for putting this put and pushing the discussion forward.
> >>
> >> I think this is a very large feature. It might be worth creating a
> google
> >> doc for it (which is better for collaboration). And I believe Ivan has
> >> some
> >> thoughts as well. If you can put up a google doc (make it
> world-editable),
> >> Ivan can probably dump his thoughts there and we can finalize the
> >> discussion and break down into tasks. So the whole community can
> actually
> >> work together at collaborating this.
> >>
> >> Thanks,
> >> Sijie
> >>
> >> On Thu, Feb 28, 2019 at 1:08 PM Richard Yu <yohan.richard...@gmail.com>
> >> wrote:
> >>
> >> > Hi all,
> >> >
> >> > I would like to create a PIP for issue #2664 on Github. The details of
> >> the
> >> > PIP are below.
> >> > I hope we could discuss this thoroughly.
> >> >
> >> > Cheers,
> >> > Richard
> >> >
> >> > PIP-31: Add support for transactional messaging
> >> >
> >> > Motivation: Pulsar currently could improve upon their system of
> sending
> >> > packets of data by implementing transactional messaging. This system
> >> > enforces eventual consistency within the system, and allows operations
> >> to
> >> > be performed atomically.
> >> >
> >> > Proposal:
> >> >
> >> > As described in the issue, we would implement the following policy in
> >> > Producer and Pulsar Broker:
> >> > 1. The producer produces the pre-processing transaction message. At
> this
> >> > point, the broker will set the status of this message to unknown.
> >> > 2. After the local transaction is successfully executed, the commit
> >> message
> >> > is sent, otherwise the rollback message is sent.
> >> > 3. The broker receives the message. If it is a commit message, it
> >> modifies
> >> > the transaction status to commit, and then sends an actual message to
> >> the
> >> > consumer queue. At this time, the consumer can consume the message.
> >> > Otherwise, the transaction status is modified to rollback. The message
> >> will
> >> > be discarded.
> >> > 4. If at step 2, the producer is down or abnormal, at this time, the
> >> broker
> >> > will periodically ask the specific producer for the status of the
> >> message,
> >> > and update the status according to the producer's response, and
> process
> >> it
> >> > according to step 3, the action that comes down.
> >> >
> >> > Specific concerns:
> >> > There are a number of things we will improve upon or add:
> >> > - A configuration called ```maxMessageUnknownTime```. Consider this
> >> > scenario: the pre-processing transaction message is sent, but the
> >> commit or
> >> > rollback message is never received, which could mean that the status
> of
> >> a
> >> > message would be permanently unknown. To avoid this from happening, we
> >> > would need a config which limits the amount of time the status of a
> >> message
> >> > could be unknown (i.e. ```maxMessageUnknownTime```) After that, the
> >> message
> >> > would be discarded.
> >> > - Logging would be updated to log the status of a message i.e.
> UNKNOWN,
> >> > ROLLBACK, or COMMITTED. This would allow the user to know whether or
> >> not a
> >> > message had failed or fallen through.
> >> >
> >> > Possible Additional API:
> >> > - We would add a method which allows the user to query the state of
> the
> >> > message i.e. ```getStateOfMessage(long id)```
> >> >
> >>
> >
>

Reply via email to