I agree we many want to review pravega's past efforts in this area also. https://github.com/pravega/pravega/blob/master/documentation/src/docs/transactions.md https://github.com/pravega/pravega/blob/master/client/src/main/java/io/pravega/client/stream/Transaction.java
-Ali On Sat, Mar 2, 2019 at 1:56 AM Sijie Guo <guosi...@gmail.com> wrote: > Kafka's implementation is interleaving committed messages with uncommitted > messages at storage. Personally I think it is a very ugly design and > implementation. > > Pulsar is a segment centric system, where we have a shared segment storage > - bookkeeper. I think a better direction is to leverage the segments (aka > ledgers) > for buffering uncommitted messages and commit the whole segment when the > whole transaction is committed. > > A rough idea would be: > > 1) for any transaction, write the messages to a separate ledger (or > multiple separate ledger). > 2) during the transaction, accumulates the messages in those ledgers. > 3) when commit, merge the txn ledgers back to the main data ledger. the > merge can be done either adding a meta message where data is stored in the > txn ledger or actually copying the data to data ledger (depending on the > size of data accumulate in the transaction). > 4) when abort, delete the txn ledger. No other additional work to be done. > > This would be producing a much clear design than Kafka. > > On Ivan's comments: > > > Transactional acknowledgement also needs to be taken into account > > I don't think we have to treat `transactional acknowledgement` as a special > case. currently `acknowledgment` are actually "append" operations into > cursor ledgers. > So the problem set can be reduced as `atomic append` to both data ledgers > and cursor ledgers. in that way, we can use one solution for handling > appending data and updating cursors. > > Additionally, I think a related topic about transactions would be > supporting large sized message (e.g. >= 5MB). If we take the approach I > described above using a separated ledger for accumulating messages for a > transaction, that we are easy to model a large size message as a > transaction of chunked messages. > > @Richard, @Ivan let me know what do you think. If you guys think the > direction I raised is a good one to go down, I am happy to write them down > into details, and drive the design and coordinate the implementations in > the community. > > - Sijie > > On Sat, Mar 2, 2019 at 9:45 AM Richard Yu <yohan.richard...@gmail.com> > wrote: > > > Hi all, > > > > We might be able to get some ideas on implementing this from Kafka: > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka > > > > Obviously, there is some differences in Kafka and Pulsar internals but at > > some level, the implementation would be similar. > > It should help. > > > > > > > > On Thu, Feb 28, 2019 at 4:29 PM Richard Yu <yohan.richard...@gmail.com> > > wrote: > > > > > Hi, > > > > > > Per request, I've created a doc so we could get some more input in an > > > organized manner: > > > > > > > > > https://docs.google.com/document/d/1mSUsJvPgThnWizeQqljKU5244BMEiYHabA6OP6QEHmQ/edit?usp=sharing > > > > > > And for Ivan's questions, I would answer accordingly. > > > > > > >By "set the message to unknown", do you mean the broker will cache the > > > >message, not writing it to any log? > > > > > > We wouldn't cache the message from my interpretation of the steps. What > > > the producer is first sending is a pre-processing message, not the real > > > message itself. This step basically notifies the broker that the > message > > is > > > on its way. So all we have to do is store the message id and its > > > corresponding status in a map, and depending on the producer's > response, > > > the status will change accordingly. > > > > > > > In designs we've discussed previously, this was handled > > > > by a component called the transaction coordinator, which is a logical > > > > component which each broker knows how to talk to. For a transaction > > > > the commit message is sent to the coordinator, which writes it to its > > > > own log, and then goes through each topic in the commit and marks the > > > > transaction as completed. > > > > > > I wasn't aware of previous discussions on this topic, but it seems > pretty > > > good to me. It's certainly better than what I would come up with. > > > If there's any more things we need to talk about, I suppose we could > move > > > it to the google doc to play around with. > > > > > > Hope we can get this PIP rolling. > > > > > > > > > On Thu, Feb 28, 2019 at 2:53 AM Sijie Guo <guosi...@gmail.com> wrote: > > > > > >> Richard, > > >> > > >> Thank you for putting this put and pushing the discussion forward. > > >> > > >> I think this is a very large feature. It might be worth creating a > > google > > >> doc for it (which is better for collaboration). And I believe Ivan has > > >> some > > >> thoughts as well. If you can put up a google doc (make it > > world-editable), > > >> Ivan can probably dump his thoughts there and we can finalize the > > >> discussion and break down into tasks. So the whole community can > > actually > > >> work together at collaborating this. > > >> > > >> Thanks, > > >> Sijie > > >> > > >> On Thu, Feb 28, 2019 at 1:08 PM Richard Yu < > yohan.richard...@gmail.com> > > >> wrote: > > >> > > >> > Hi all, > > >> > > > >> > I would like to create a PIP for issue #2664 on Github. The details > of > > >> the > > >> > PIP are below. > > >> > I hope we could discuss this thoroughly. > > >> > > > >> > Cheers, > > >> > Richard > > >> > > > >> > PIP-31: Add support for transactional messaging > > >> > > > >> > Motivation: Pulsar currently could improve upon their system of > > sending > > >> > packets of data by implementing transactional messaging. This system > > >> > enforces eventual consistency within the system, and allows > operations > > >> to > > >> > be performed atomically. > > >> > > > >> > Proposal: > > >> > > > >> > As described in the issue, we would implement the following policy > in > > >> > Producer and Pulsar Broker: > > >> > 1. The producer produces the pre-processing transaction message. At > > this > > >> > point, the broker will set the status of this message to unknown. > > >> > 2. After the local transaction is successfully executed, the commit > > >> message > > >> > is sent, otherwise the rollback message is sent. > > >> > 3. The broker receives the message. If it is a commit message, it > > >> modifies > > >> > the transaction status to commit, and then sends an actual message > to > > >> the > > >> > consumer queue. At this time, the consumer can consume the message. > > >> > Otherwise, the transaction status is modified to rollback. The > message > > >> will > > >> > be discarded. > > >> > 4. If at step 2, the producer is down or abnormal, at this time, the > > >> broker > > >> > will periodically ask the specific producer for the status of the > > >> message, > > >> > and update the status according to the producer's response, and > > process > > >> it > > >> > according to step 3, the action that comes down. > > >> > > > >> > Specific concerns: > > >> > There are a number of things we will improve upon or add: > > >> > - A configuration called ```maxMessageUnknownTime```. Consider this > > >> > scenario: the pre-processing transaction message is sent, but the > > >> commit or > > >> > rollback message is never received, which could mean that the status > > of > > >> a > > >> > message would be permanently unknown. To avoid this from happening, > we > > >> > would need a config which limits the amount of time the status of a > > >> message > > >> > could be unknown (i.e. ```maxMessageUnknownTime```) After that, the > > >> message > > >> > would be discarded. > > >> > - Logging would be updated to log the status of a message i.e. > > UNKNOWN, > > >> > ROLLBACK, or COMMITTED. This would allow the user to know whether or > > >> not a > > >> > message had failed or fallen through. > > >> > > > >> > Possible Additional API: > > >> > - We would add a method which allows the user to query the state of > > the > > >> > message i.e. ```getStateOfMessage(long id)``` > > >> > > > >> > > > > > > -- -Ali