Hi - Is this a case where a Pulsar function base class for transactions would help?
Regards, Dave Sent from my iPhone > On Mar 2, 2019, at 2:39 AM, Sijie Guo <guosi...@gmail.com> wrote: > > Pravega's model is a better model than Kafka - it addressed the > interleaving problems. However Pravega's model is based on a giant > replicated log and rewrite the data to a second tiered storage for > persistence, which basically re-implemented bookkeeper's logic in broker. A > fundamental drawback of Pravega is write amplifications. The amplifications > of both network and IO bandwidth are huge. If you use bookkeeper both for > its first-and-second tier storage and assume the bookkeeper replication > factor is 3, pravega requires 6x network bandwidth and 12x IO bandwidth. > For a given message, it needs to write 3 times into the journal, and 3 > times for persistent. The amplifications hugely limit the throughput at > pravega "brokers". > > - Sijie > > > >> On Sat, Mar 2, 2019 at 6:13 PM Ali Ahmed <ahmal...@gmail.com> wrote: >> >> I agree we many want to review pravega's past efforts in this area also. >> >> >> https://github.com/pravega/pravega/blob/master/documentation/src/docs/transactions.md >> >> https://github.com/pravega/pravega/blob/master/client/src/main/java/io/pravega/client/stream/Transaction.java >> >> -Ali >> >>> On Sat, Mar 2, 2019 at 1:56 AM Sijie Guo <guosi...@gmail.com> wrote: >>> >>> Kafka's implementation is interleaving committed messages with >> uncommitted >>> messages at storage. Personally I think it is a very ugly design and >>> implementation. >>> >>> Pulsar is a segment centric system, where we have a shared segment >> storage >>> - bookkeeper. I think a better direction is to leverage the segments (aka >>> ledgers) >>> for buffering uncommitted messages and commit the whole segment when the >>> whole transaction is committed. >>> >>> A rough idea would be: >>> >>> 1) for any transaction, write the messages to a separate ledger (or >>> multiple separate ledger). >>> 2) during the transaction, accumulates the messages in those ledgers. >>> 3) when commit, merge the txn ledgers back to the main data ledger. the >>> merge can be done either adding a meta message where data is stored in >> the >>> txn ledger or actually copying the data to data ledger (depending on the >>> size of data accumulate in the transaction). >>> 4) when abort, delete the txn ledger. No other additional work to be >> done. >>> >>> This would be producing a much clear design than Kafka. >>> >>> On Ivan's comments: >>> >>>> Transactional acknowledgement also needs to be taken into account >>> >>> I don't think we have to treat `transactional acknowledgement` as a >> special >>> case. currently `acknowledgment` are actually "append" operations into >>> cursor ledgers. >>> So the problem set can be reduced as `atomic append` to both data ledgers >>> and cursor ledgers. in that way, we can use one solution for handling >>> appending data and updating cursors. >>> >>> Additionally, I think a related topic about transactions would be >>> supporting large sized message (e.g. >= 5MB). If we take the approach I >>> described above using a separated ledger for accumulating messages for a >>> transaction, that we are easy to model a large size message as a >>> transaction of chunked messages. >>> >>> @Richard, @Ivan let me know what do you think. If you guys think the >>> direction I raised is a good one to go down, I am happy to write them >> down >>> into details, and drive the design and coordinate the implementations in >>> the community. >>> >>> - Sijie >>> >>> On Sat, Mar 2, 2019 at 9:45 AM Richard Yu <yohan.richard...@gmail.com> >>> wrote: >>> >>>> Hi all, >>>> >>>> We might be able to get some ideas on implementing this from Kafka: >>>> >>>> >>> >> https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka >>>> >>>> Obviously, there is some differences in Kafka and Pulsar internals but >> at >>>> some level, the implementation would be similar. >>>> It should help. >>>> >>>> >>>> >>>> On Thu, Feb 28, 2019 at 4:29 PM Richard Yu <yohan.richard...@gmail.com >>> >>>> wrote: >>>> >>>>> Hi, >>>>> >>>>> Per request, I've created a doc so we could get some more input in an >>>>> organized manner: >>>>> >>>>> >>>> >>> >> https://docs.google.com/document/d/1mSUsJvPgThnWizeQqljKU5244BMEiYHabA6OP6QEHmQ/edit?usp=sharing >>>>> >>>>> And for Ivan's questions, I would answer accordingly. >>>>> >>>>>> By "set the message to unknown", do you mean the broker will cache >> the >>>>>> message, not writing it to any log? >>>>> >>>>> We wouldn't cache the message from my interpretation of the steps. >> What >>>>> the producer is first sending is a pre-processing message, not the >> real >>>>> message itself. This step basically notifies the broker that the >>> message >>>> is >>>>> on its way. So all we have to do is store the message id and its >>>>> corresponding status in a map, and depending on the producer's >>> response, >>>>> the status will change accordingly. >>>>> >>>>>> In designs we've discussed previously, this was handled >>>>>> by a component called the transaction coordinator, which is a >> logical >>>>>> component which each broker knows how to talk to. For a transaction >>>>>> the commit message is sent to the coordinator, which writes it to >> its >>>>>> own log, and then goes through each topic in the commit and marks >> the >>>>>> transaction as completed. >>>>> >>>>> I wasn't aware of previous discussions on this topic, but it seems >>> pretty >>>>> good to me. It's certainly better than what I would come up with. >>>>> If there's any more things we need to talk about, I suppose we could >>> move >>>>> it to the google doc to play around with. >>>>> >>>>> Hope we can get this PIP rolling. >>>>> >>>>> >>>>> On Thu, Feb 28, 2019 at 2:53 AM Sijie Guo <guosi...@gmail.com> >> wrote: >>>>> >>>>>> Richard, >>>>>> >>>>>> Thank you for putting this put and pushing the discussion forward. >>>>>> >>>>>> I think this is a very large feature. It might be worth creating a >>>> google >>>>>> doc for it (which is better for collaboration). And I believe Ivan >> has >>>>>> some >>>>>> thoughts as well. If you can put up a google doc (make it >>>> world-editable), >>>>>> Ivan can probably dump his thoughts there and we can finalize the >>>>>> discussion and break down into tasks. So the whole community can >>>> actually >>>>>> work together at collaborating this. >>>>>> >>>>>> Thanks, >>>>>> Sijie >>>>>> >>>>>> On Thu, Feb 28, 2019 at 1:08 PM Richard Yu < >>> yohan.richard...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi all, >>>>>>> >>>>>>> I would like to create a PIP for issue #2664 on Github. The >> details >>> of >>>>>> the >>>>>>> PIP are below. >>>>>>> I hope we could discuss this thoroughly. >>>>>>> >>>>>>> Cheers, >>>>>>> Richard >>>>>>> >>>>>>> PIP-31: Add support for transactional messaging >>>>>>> >>>>>>> Motivation: Pulsar currently could improve upon their system of >>>> sending >>>>>>> packets of data by implementing transactional messaging. This >> system >>>>>>> enforces eventual consistency within the system, and allows >>> operations >>>>>> to >>>>>>> be performed atomically. >>>>>>> >>>>>>> Proposal: >>>>>>> >>>>>>> As described in the issue, we would implement the following policy >>> in >>>>>>> Producer and Pulsar Broker: >>>>>>> 1. The producer produces the pre-processing transaction message. >> At >>>> this >>>>>>> point, the broker will set the status of this message to unknown. >>>>>>> 2. After the local transaction is successfully executed, the >> commit >>>>>> message >>>>>>> is sent, otherwise the rollback message is sent. >>>>>>> 3. The broker receives the message. If it is a commit message, it >>>>>> modifies >>>>>>> the transaction status to commit, and then sends an actual message >>> to >>>>>> the >>>>>>> consumer queue. At this time, the consumer can consume the >> message. >>>>>>> Otherwise, the transaction status is modified to rollback. The >>> message >>>>>> will >>>>>>> be discarded. >>>>>>> 4. If at step 2, the producer is down or abnormal, at this time, >> the >>>>>> broker >>>>>>> will periodically ask the specific producer for the status of the >>>>>> message, >>>>>>> and update the status according to the producer's response, and >>>> process >>>>>> it >>>>>>> according to step 3, the action that comes down. >>>>>>> >>>>>>> Specific concerns: >>>>>>> There are a number of things we will improve upon or add: >>>>>>> - A configuration called ```maxMessageUnknownTime```. Consider >> this >>>>>>> scenario: the pre-processing transaction message is sent, but the >>>>>> commit or >>>>>>> rollback message is never received, which could mean that the >> status >>>> of >>>>>> a >>>>>>> message would be permanently unknown. To avoid this from >> happening, >>> we >>>>>>> would need a config which limits the amount of time the status of >> a >>>>>> message >>>>>>> could be unknown (i.e. ```maxMessageUnknownTime```) After that, >> the >>>>>> message >>>>>>> would be discarded. >>>>>>> - Logging would be updated to log the status of a message i.e. >>>> UNKNOWN, >>>>>>> ROLLBACK, or COMMITTED. This would allow the user to know whether >> or >>>>>> not a >>>>>>> message had failed or fallen through. >>>>>>> >>>>>>> Possible Additional API: >>>>>>> - We would add a method which allows the user to query the state >> of >>>> the >>>>>>> message i.e. ```getStateOfMessage(long id)``` >>>>>>> >>>>>> >>>>> >>>> >>> >> >> >> -- >> -Ali >>