Hi all, We might be able to get some ideas on implementing this from Kafka: https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka
Obviously, there is some differences in Kafka and Pulsar internals but at some level, the implementation would be similar. It should help. On Thu, Feb 28, 2019 at 4:29 PM Richard Yu <yohan.richard...@gmail.com> wrote: > Hi, > > Per request, I've created a doc so we could get some more input in an > organized manner: > > https://docs.google.com/document/d/1mSUsJvPgThnWizeQqljKU5244BMEiYHabA6OP6QEHmQ/edit?usp=sharing > > And for Ivan's questions, I would answer accordingly. > > >By "set the message to unknown", do you mean the broker will cache the > >message, not writing it to any log? > > We wouldn't cache the message from my interpretation of the steps. What > the producer is first sending is a pre-processing message, not the real > message itself. This step basically notifies the broker that the message is > on its way. So all we have to do is store the message id and its > corresponding status in a map, and depending on the producer's response, > the status will change accordingly. > > > In designs we've discussed previously, this was handled > > by a component called the transaction coordinator, which is a logical > > component which each broker knows how to talk to. For a transaction > > the commit message is sent to the coordinator, which writes it to its > > own log, and then goes through each topic in the commit and marks the > > transaction as completed. > > I wasn't aware of previous discussions on this topic, but it seems pretty > good to me. It's certainly better than what I would come up with. > If there's any more things we need to talk about, I suppose we could move > it to the google doc to play around with. > > Hope we can get this PIP rolling. > > > On Thu, Feb 28, 2019 at 2:53 AM Sijie Guo <guosi...@gmail.com> wrote: > >> Richard, >> >> Thank you for putting this put and pushing the discussion forward. >> >> I think this is a very large feature. It might be worth creating a google >> doc for it (which is better for collaboration). And I believe Ivan has >> some >> thoughts as well. If you can put up a google doc (make it world-editable), >> Ivan can probably dump his thoughts there and we can finalize the >> discussion and break down into tasks. So the whole community can actually >> work together at collaborating this. >> >> Thanks, >> Sijie >> >> On Thu, Feb 28, 2019 at 1:08 PM Richard Yu <yohan.richard...@gmail.com> >> wrote: >> >> > Hi all, >> > >> > I would like to create a PIP for issue #2664 on Github. The details of >> the >> > PIP are below. >> > I hope we could discuss this thoroughly. >> > >> > Cheers, >> > Richard >> > >> > PIP-31: Add support for transactional messaging >> > >> > Motivation: Pulsar currently could improve upon their system of sending >> > packets of data by implementing transactional messaging. This system >> > enforces eventual consistency within the system, and allows operations >> to >> > be performed atomically. >> > >> > Proposal: >> > >> > As described in the issue, we would implement the following policy in >> > Producer and Pulsar Broker: >> > 1. The producer produces the pre-processing transaction message. At this >> > point, the broker will set the status of this message to unknown. >> > 2. After the local transaction is successfully executed, the commit >> message >> > is sent, otherwise the rollback message is sent. >> > 3. The broker receives the message. If it is a commit message, it >> modifies >> > the transaction status to commit, and then sends an actual message to >> the >> > consumer queue. At this time, the consumer can consume the message. >> > Otherwise, the transaction status is modified to rollback. The message >> will >> > be discarded. >> > 4. If at step 2, the producer is down or abnormal, at this time, the >> broker >> > will periodically ask the specific producer for the status of the >> message, >> > and update the status according to the producer's response, and process >> it >> > according to step 3, the action that comes down. >> > >> > Specific concerns: >> > There are a number of things we will improve upon or add: >> > - A configuration called ```maxMessageUnknownTime```. Consider this >> > scenario: the pre-processing transaction message is sent, but the >> commit or >> > rollback message is never received, which could mean that the status of >> a >> > message would be permanently unknown. To avoid this from happening, we >> > would need a config which limits the amount of time the status of a >> message >> > could be unknown (i.e. ```maxMessageUnknownTime```) After that, the >> message >> > would be discarded. >> > - Logging would be updated to log the status of a message i.e. UNKNOWN, >> > ROLLBACK, or COMMITTED. This would allow the user to know whether or >> not a >> > message had failed or fallen through. >> > >> > Possible Additional API: >> > - We would add a method which allows the user to query the state of the >> > message i.e. ```getStateOfMessage(long id)``` >> > >> >