Once there's support for transactions in messaging API, there will be no need for a base class for functions. Rather a config option will allow to enable transactional mode. -- Matteo Merli <matteo.me...@gmail.com>
On Sat, Mar 2, 2019 at 6:39 PM Sijie Guo <guosi...@gmail.com> wrote: > > Dave, > > You mean implementing the transactions in pulsar function? > > - Sijie > > On Sun, Mar 3, 2019 at 1:52 AM Dave Fisher <dave2w...@comcast.net> wrote: > > > Hi - > > > > Is this a case where a Pulsar function base class for transactions would > > help? > > > > Regards, > > Dave > > > > Sent from my iPhone > > > > > On Mar 2, 2019, at 2:39 AM, Sijie Guo <guosi...@gmail.com> wrote: > > > > > > Pravega's model is a better model than Kafka - it addressed the > > > interleaving problems. However Pravega's model is based on a giant > > > replicated log and rewrite the data to a second tiered storage for > > > persistence, which basically re-implemented bookkeeper's logic in > > broker. A > > > fundamental drawback of Pravega is write amplifications. The > > amplifications > > > of both network and IO bandwidth are huge. If you use bookkeeper both for > > > its first-and-second tier storage and assume the bookkeeper replication > > > factor is 3, pravega requires 6x network bandwidth and 12x IO bandwidth. > > > For a given message, it needs to write 3 times into the journal, and 3 > > > times for persistent. The amplifications hugely limit the throughput at > > > pravega "brokers". > > > > > > - Sijie > > > > > > > > > > > >> On Sat, Mar 2, 2019 at 6:13 PM Ali Ahmed <ahmal...@gmail.com> wrote: > > >> > > >> I agree we many want to review pravega's past efforts in this area also. > > >> > > >> > > >> > > https://github.com/pravega/pravega/blob/master/documentation/src/docs/transactions.md > > >> > > >> > > https://github.com/pravega/pravega/blob/master/client/src/main/java/io/pravega/client/stream/Transaction.java > > >> > > >> -Ali > > >> > > >>> On Sat, Mar 2, 2019 at 1:56 AM Sijie Guo <guosi...@gmail.com> wrote: > > >>> > > >>> Kafka's implementation is interleaving committed messages with > > >> uncommitted > > >>> messages at storage. Personally I think it is a very ugly design and > > >>> implementation. > > >>> > > >>> Pulsar is a segment centric system, where we have a shared segment > > >> storage > > >>> - bookkeeper. I think a better direction is to leverage the segments > > (aka > > >>> ledgers) > > >>> for buffering uncommitted messages and commit the whole segment when > > the > > >>> whole transaction is committed. > > >>> > > >>> A rough idea would be: > > >>> > > >>> 1) for any transaction, write the messages to a separate ledger (or > > >>> multiple separate ledger). > > >>> 2) during the transaction, accumulates the messages in those ledgers. > > >>> 3) when commit, merge the txn ledgers back to the main data ledger. the > > >>> merge can be done either adding a meta message where data is stored in > > >> the > > >>> txn ledger or actually copying the data to data ledger (depending on > > the > > >>> size of data accumulate in the transaction). > > >>> 4) when abort, delete the txn ledger. No other additional work to be > > >> done. > > >>> > > >>> This would be producing a much clear design than Kafka. > > >>> > > >>> On Ivan's comments: > > >>> > > >>>> Transactional acknowledgement also needs to be taken into account > > >>> > > >>> I don't think we have to treat `transactional acknowledgement` as a > > >> special > > >>> case. currently `acknowledgment` are actually "append" operations into > > >>> cursor ledgers. > > >>> So the problem set can be reduced as `atomic append` to both data > > ledgers > > >>> and cursor ledgers. in that way, we can use one solution for handling > > >>> appending data and updating cursors. > > >>> > > >>> Additionally, I think a related topic about transactions would be > > >>> supporting large sized message (e.g. >= 5MB). If we take the approach I > > >>> described above using a separated ledger for accumulating messages for > > a > > >>> transaction, that we are easy to model a large size message as a > > >>> transaction of chunked messages. > > >>> > > >>> @Richard, @Ivan let me know what do you think. If you guys think the > > >>> direction I raised is a good one to go down, I am happy to write them > > >> down > > >>> into details, and drive the design and coordinate the implementations > > in > > >>> the community. > > >>> > > >>> - Sijie > > >>> > > >>> On Sat, Mar 2, 2019 at 9:45 AM Richard Yu <yohan.richard...@gmail.com> > > >>> wrote: > > >>> > > >>>> Hi all, > > >>>> > > >>>> We might be able to get some ideas on implementing this from Kafka: > > >>>> > > >>>> > > >>> > > >> > > https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka > > >>>> > > >>>> Obviously, there is some differences in Kafka and Pulsar internals but > > >> at > > >>>> some level, the implementation would be similar. > > >>>> It should help. > > >>>> > > >>>> > > >>>> > > >>>> On Thu, Feb 28, 2019 at 4:29 PM Richard Yu < > > yohan.richard...@gmail.com > > >>> > > >>>> wrote: > > >>>> > > >>>>> Hi, > > >>>>> > > >>>>> Per request, I've created a doc so we could get some more input in an > > >>>>> organized manner: > > >>>>> > > >>>>> > > >>>> > > >>> > > >> > > https://docs.google.com/document/d/1mSUsJvPgThnWizeQqljKU5244BMEiYHabA6OP6QEHmQ/edit?usp=sharing > > >>>>> > > >>>>> And for Ivan's questions, I would answer accordingly. > > >>>>> > > >>>>>> By "set the message to unknown", do you mean the broker will cache > > >> the > > >>>>>> message, not writing it to any log? > > >>>>> > > >>>>> We wouldn't cache the message from my interpretation of the steps. > > >> What > > >>>>> the producer is first sending is a pre-processing message, not the > > >> real > > >>>>> message itself. This step basically notifies the broker that the > > >>> message > > >>>> is > > >>>>> on its way. So all we have to do is store the message id and its > > >>>>> corresponding status in a map, and depending on the producer's > > >>> response, > > >>>>> the status will change accordingly. > > >>>>> > > >>>>>> In designs we've discussed previously, this was handled > > >>>>>> by a component called the transaction coordinator, which is a > > >> logical > > >>>>>> component which each broker knows how to talk to. For a transaction > > >>>>>> the commit message is sent to the coordinator, which writes it to > > >> its > > >>>>>> own log, and then goes through each topic in the commit and marks > > >> the > > >>>>>> transaction as completed. > > >>>>> > > >>>>> I wasn't aware of previous discussions on this topic, but it seems > > >>> pretty > > >>>>> good to me. It's certainly better than what I would come up with. > > >>>>> If there's any more things we need to talk about, I suppose we could > > >>> move > > >>>>> it to the google doc to play around with. > > >>>>> > > >>>>> Hope we can get this PIP rolling. > > >>>>> > > >>>>> > > >>>>> On Thu, Feb 28, 2019 at 2:53 AM Sijie Guo <guosi...@gmail.com> > > >> wrote: > > >>>>> > > >>>>>> Richard, > > >>>>>> > > >>>>>> Thank you for putting this put and pushing the discussion forward. > > >>>>>> > > >>>>>> I think this is a very large feature. It might be worth creating a > > >>>> google > > >>>>>> doc for it (which is better for collaboration). And I believe Ivan > > >> has > > >>>>>> some > > >>>>>> thoughts as well. If you can put up a google doc (make it > > >>>> world-editable), > > >>>>>> Ivan can probably dump his thoughts there and we can finalize the > > >>>>>> discussion and break down into tasks. So the whole community can > > >>>> actually > > >>>>>> work together at collaborating this. > > >>>>>> > > >>>>>> Thanks, > > >>>>>> Sijie > > >>>>>> > > >>>>>> On Thu, Feb 28, 2019 at 1:08 PM Richard Yu < > > >>> yohan.richard...@gmail.com> > > >>>>>> wrote: > > >>>>>> > > >>>>>>> Hi all, > > >>>>>>> > > >>>>>>> I would like to create a PIP for issue #2664 on Github. The > > >> details > > >>> of > > >>>>>> the > > >>>>>>> PIP are below. > > >>>>>>> I hope we could discuss this thoroughly. > > >>>>>>> > > >>>>>>> Cheers, > > >>>>>>> Richard > > >>>>>>> > > >>>>>>> PIP-31: Add support for transactional messaging > > >>>>>>> > > >>>>>>> Motivation: Pulsar currently could improve upon their system of > > >>>> sending > > >>>>>>> packets of data by implementing transactional messaging. This > > >> system > > >>>>>>> enforces eventual consistency within the system, and allows > > >>> operations > > >>>>>> to > > >>>>>>> be performed atomically. > > >>>>>>> > > >>>>>>> Proposal: > > >>>>>>> > > >>>>>>> As described in the issue, we would implement the following policy > > >>> in > > >>>>>>> Producer and Pulsar Broker: > > >>>>>>> 1. The producer produces the pre-processing transaction message. > > >> At > > >>>> this > > >>>>>>> point, the broker will set the status of this message to unknown. > > >>>>>>> 2. After the local transaction is successfully executed, the > > >> commit > > >>>>>> message > > >>>>>>> is sent, otherwise the rollback message is sent. > > >>>>>>> 3. The broker receives the message. If it is a commit message, it > > >>>>>> modifies > > >>>>>>> the transaction status to commit, and then sends an actual message > > >>> to > > >>>>>> the > > >>>>>>> consumer queue. At this time, the consumer can consume the > > >> message. > > >>>>>>> Otherwise, the transaction status is modified to rollback. The > > >>> message > > >>>>>> will > > >>>>>>> be discarded. > > >>>>>>> 4. If at step 2, the producer is down or abnormal, at this time, > > >> the > > >>>>>> broker > > >>>>>>> will periodically ask the specific producer for the status of the > > >>>>>> message, > > >>>>>>> and update the status according to the producer's response, and > > >>>> process > > >>>>>> it > > >>>>>>> according to step 3, the action that comes down. > > >>>>>>> > > >>>>>>> Specific concerns: > > >>>>>>> There are a number of things we will improve upon or add: > > >>>>>>> - A configuration called ```maxMessageUnknownTime```. Consider > > >> this > > >>>>>>> scenario: the pre-processing transaction message is sent, but the > > >>>>>> commit or > > >>>>>>> rollback message is never received, which could mean that the > > >> status > > >>>> of > > >>>>>> a > > >>>>>>> message would be permanently unknown. To avoid this from > > >> happening, > > >>> we > > >>>>>>> would need a config which limits the amount of time the status of > > >> a > > >>>>>> message > > >>>>>>> could be unknown (i.e. ```maxMessageUnknownTime```) After that, > > >> the > > >>>>>> message > > >>>>>>> would be discarded. > > >>>>>>> - Logging would be updated to log the status of a message i.e. > > >>>> UNKNOWN, > > >>>>>>> ROLLBACK, or COMMITTED. This would allow the user to know whether > > >> or > > >>>>>> not a > > >>>>>>> message had failed or fallen through. > > >>>>>>> > > >>>>>>> Possible Additional API: > > >>>>>>> - We would add a method which allows the user to query the state > > >> of > > >>>> the > > >>>>>>> message i.e. ```getStateOfMessage(long id)``` > > >>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>> > > >> > > >> > > >> -- > > >> -Ali > > >> > > > >