Richard, Sure, I can drive the PIP. I will try to write down the details in your PIP google doc and we can go from there.
Thanks, Sijie On Mon, Mar 4, 2019 at 5:01 AM Richard Yu <yohan.richard...@gmail.com> wrote: > Hi Dave, Mattteo, and Sijie, > > Thanks for pitching in on the discussion! > Sijie, it would be great if you could drive this PIP. To be frank, I don't > know what the best direction is. > Oh, and Ivan, if you have any other idea. Let us know. :) > > If there is any changes that need to be made, edit the document > > > On Sat, Mar 2, 2019 at 11:11 PM Sijie Guo <guosi...@gmail.com> wrote: > > > Matteo, Dave, > > > > I think you are talking about different things. My comments to both: > > > > > Once there's support for transactions in messaging API, there will be > > > no need for a base class for functions. Rather a config option will > > > allow to enable transactional mode. > > > > Matteo, If I understand your comment correctly, you are talking about > > functions using transactions for processing semantics. If so, yes that > > would be the end goal. > > > > > Yes, that way there is no additional broker overhead and whatever > happens > > when a commit happens is under the control of those making the > transaction. > > > > Dave, this sounds an interesting idea and it is definitely do-able. > Because > > Pulsar is a multi-layered system and it is built on top of a reliable > > storage, so a lot of components are just "stateless", "logical" and not > > bound to any physical machines. so when we implement a component / > > functionality, we basically implement a logical unit. How to run the > logic > > unit can be very flexible. It can run as a separated service, or as part > of > > broker, or in functions. > > > > - Sijie > > > > > > On Sun, Mar 3, 2019 at 10:52 AM Dave Fisher <dave2w...@comcast.net> > wrote: > > > > > Hi - > > > > > > > On Mar 2, 2019, at 6:39 PM, Sijie Guo <guosi...@gmail.com> wrote: > > > > > > > > Dave, > > > > > > > > You mean implementing the transactions in pulsar function? > > > > > > Yes, that way there is no additional broker overhead and whatever > happens > > > when a commit happens is under the control of those making the > > transaction. > > > > > > I’m not sure if it would work, but it seems that functions, spouts, and > > > connectors make sense as opposed to burdening the highly performant > > brokers. > > > > > > Regards, > > > Dave > > > > > > > > > > > - Sijie > > > > > > > >> On Sun, Mar 3, 2019 at 1:52 AM Dave Fisher <dave2w...@comcast.net> > > > wrote: > > > >> > > > >> Hi - > > > >> > > > >> Is this a case where a Pulsar function base class for transactions > > would > > > >> help? > > > >> > > > >> Regards, > > > >> Dave > > > >> > > > >> Sent from my iPhone > > > >> > > > >>> On Mar 2, 2019, at 2:39 AM, Sijie Guo <guosi...@gmail.com> wrote: > > > >>> > > > >>> Pravega's model is a better model than Kafka - it addressed the > > > >>> interleaving problems. However Pravega's model is based on a giant > > > >>> replicated log and rewrite the data to a second tiered storage for > > > >>> persistence, which basically re-implemented bookkeeper's logic in > > > >> broker. A > > > >>> fundamental drawback of Pravega is write amplifications. The > > > >> amplifications > > > >>> of both network and IO bandwidth are huge. If you use bookkeeper > both > > > for > > > >>> its first-and-second tier storage and assume the bookkeeper > > replication > > > >>> factor is 3, pravega requires 6x network bandwidth and 12x IO > > > bandwidth. > > > >>> For a given message, it needs to write 3 times into the journal, > and > > 3 > > > >>> times for persistent. The amplifications hugely limit the > throughput > > at > > > >>> pravega "brokers". > > > >>> > > > >>> - Sijie > > > >>> > > > >>> > > > >>> > > > >>>> On Sat, Mar 2, 2019 at 6:13 PM Ali Ahmed <ahmal...@gmail.com> > > wrote: > > > >>>> > > > >>>> I agree we many want to review pravega's past efforts in this area > > > also. > > > >>>> > > > >>>> > > > >>>> > > > >> > > > > > > https://github.com/pravega/pravega/blob/master/documentation/src/docs/transactions.md > > > >>>> > > > >>>> > > > >> > > > > > > https://github.com/pravega/pravega/blob/master/client/src/main/java/io/pravega/client/stream/Transaction.java > > > >>>> > > > >>>> -Ali > > > >>>> > > > >>>>> On Sat, Mar 2, 2019 at 1:56 AM Sijie Guo <guosi...@gmail.com> > > wrote: > > > >>>>> > > > >>>>> Kafka's implementation is interleaving committed messages with > > > >>>> uncommitted > > > >>>>> messages at storage. Personally I think it is a very ugly design > > and > > > >>>>> implementation. > > > >>>>> > > > >>>>> Pulsar is a segment centric system, where we have a shared > segment > > > >>>> storage > > > >>>>> - bookkeeper. I think a better direction is to leverage the > > segments > > > >> (aka > > > >>>>> ledgers) > > > >>>>> for buffering uncommitted messages and commit the whole segment > > when > > > >> the > > > >>>>> whole transaction is committed. > > > >>>>> > > > >>>>> A rough idea would be: > > > >>>>> > > > >>>>> 1) for any transaction, write the messages to a separate ledger > (or > > > >>>>> multiple separate ledger). > > > >>>>> 2) during the transaction, accumulates the messages in those > > ledgers. > > > >>>>> 3) when commit, merge the txn ledgers back to the main data > ledger. > > > the > > > >>>>> merge can be done either adding a meta message where data is > stored > > > in > > > >>>> the > > > >>>>> txn ledger or actually copying the data to data ledger (depending > > on > > > >> the > > > >>>>> size of data accumulate in the transaction). > > > >>>>> 4) when abort, delete the txn ledger. No other additional work to > > be > > > >>>> done. > > > >>>>> > > > >>>>> This would be producing a much clear design than Kafka. > > > >>>>> > > > >>>>> On Ivan's comments: > > > >>>>> > > > >>>>>> Transactional acknowledgement also needs to be taken into > account > > > >>>>> > > > >>>>> I don't think we have to treat `transactional acknowledgement` > as a > > > >>>> special > > > >>>>> case. currently `acknowledgment` are actually "append" operations > > > into > > > >>>>> cursor ledgers. > > > >>>>> So the problem set can be reduced as `atomic append` to both data > > > >> ledgers > > > >>>>> and cursor ledgers. in that way, we can use one solution for > > handling > > > >>>>> appending data and updating cursors. > > > >>>>> > > > >>>>> Additionally, I think a related topic about transactions would be > > > >>>>> supporting large sized message (e.g. >= 5MB). If we take the > > > approach I > > > >>>>> described above using a separated ledger for accumulating > messages > > > for > > > >> a > > > >>>>> transaction, that we are easy to model a large size message as a > > > >>>>> transaction of chunked messages. > > > >>>>> > > > >>>>> @Richard, @Ivan let me know what do you think. If you guys think > > the > > > >>>>> direction I raised is a good one to go down, I am happy to write > > them > > > >>>> down > > > >>>>> into details, and drive the design and coordinate the > > implementations > > > >> in > > > >>>>> the community. > > > >>>>> > > > >>>>> - Sijie > > > >>>>> > > > >>>>> On Sat, Mar 2, 2019 at 9:45 AM Richard Yu < > > > yohan.richard...@gmail.com> > > > >>>>> wrote: > > > >>>>> > > > >>>>>> Hi all, > > > >>>>>> > > > >>>>>> We might be able to get some ideas on implementing this from > > Kafka: > > > >>>>>> > > > >>>>>> > > > >>>>> > > > >>>> > > > >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka > > > >>>>>> > > > >>>>>> Obviously, there is some differences in Kafka and Pulsar > internals > > > but > > > >>>> at > > > >>>>>> some level, the implementation would be similar. > > > >>>>>> It should help. > > > >>>>>> > > > >>>>>> > > > >>>>>> > > > >>>>>> On Thu, Feb 28, 2019 at 4:29 PM Richard Yu < > > > >> yohan.richard...@gmail.com > > > >>>>> > > > >>>>>> wrote: > > > >>>>>> > > > >>>>>>> Hi, > > > >>>>>>> > > > >>>>>>> Per request, I've created a doc so we could get some more input > > in > > > an > > > >>>>>>> organized manner: > > > >>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>>> > > > >>>> > > > >> > > > > > > https://docs.google.com/document/d/1mSUsJvPgThnWizeQqljKU5244BMEiYHabA6OP6QEHmQ/edit?usp=sharing > > > >>>>>>> > > > >>>>>>> And for Ivan's questions, I would answer accordingly. > > > >>>>>>> > > > >>>>>>>> By "set the message to unknown", do you mean the broker will > > cache > > > >>>> the > > > >>>>>>>> message, not writing it to any log? > > > >>>>>>> > > > >>>>>>> We wouldn't cache the message from my interpretation of the > > steps. > > > >>>> What > > > >>>>>>> the producer is first sending is a pre-processing message, not > > the > > > >>>> real > > > >>>>>>> message itself. This step basically notifies the broker that > the > > > >>>>> message > > > >>>>>> is > > > >>>>>>> on its way. So all we have to do is store the message id and > its > > > >>>>>>> corresponding status in a map, and depending on the producer's > > > >>>>> response, > > > >>>>>>> the status will change accordingly. > > > >>>>>>> > > > >>>>>>>> In designs we've discussed previously, this was handled > > > >>>>>>>> by a component called the transaction coordinator, which is a > > > >>>> logical > > > >>>>>>>> component which each broker knows how to talk to. For a > > > transaction > > > >>>>>>>> the commit message is sent to the coordinator, which writes it > > to > > > >>>> its > > > >>>>>>>> own log, and then goes through each topic in the commit and > > marks > > > >>>> the > > > >>>>>>>> transaction as completed. > > > >>>>>>> > > > >>>>>>> I wasn't aware of previous discussions on this topic, but it > > seems > > > >>>>> pretty > > > >>>>>>> good to me. It's certainly better than what I would come up > with. > > > >>>>>>> If there's any more things we need to talk about, I suppose we > > > could > > > >>>>> move > > > >>>>>>> it to the google doc to play around with. > > > >>>>>>> > > > >>>>>>> Hope we can get this PIP rolling. > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> On Thu, Feb 28, 2019 at 2:53 AM Sijie Guo <guosi...@gmail.com> > > > >>>> wrote: > > > >>>>>>> > > > >>>>>>>> Richard, > > > >>>>>>>> > > > >>>>>>>> Thank you for putting this put and pushing the discussion > > forward. > > > >>>>>>>> > > > >>>>>>>> I think this is a very large feature. It might be worth > > creating a > > > >>>>>> google > > > >>>>>>>> doc for it (which is better for collaboration). And I believe > > Ivan > > > >>>> has > > > >>>>>>>> some > > > >>>>>>>> thoughts as well. If you can put up a google doc (make it > > > >>>>>> world-editable), > > > >>>>>>>> Ivan can probably dump his thoughts there and we can finalize > > the > > > >>>>>>>> discussion and break down into tasks. So the whole community > can > > > >>>>>> actually > > > >>>>>>>> work together at collaborating this. > > > >>>>>>>> > > > >>>>>>>> Thanks, > > > >>>>>>>> Sijie > > > >>>>>>>> > > > >>>>>>>> On Thu, Feb 28, 2019 at 1:08 PM Richard Yu < > > > >>>>> yohan.richard...@gmail.com> > > > >>>>>>>> wrote: > > > >>>>>>>> > > > >>>>>>>>> Hi all, > > > >>>>>>>>> > > > >>>>>>>>> I would like to create a PIP for issue #2664 on Github. The > > > >>>> details > > > >>>>> of > > > >>>>>>>> the > > > >>>>>>>>> PIP are below. > > > >>>>>>>>> I hope we could discuss this thoroughly. > > > >>>>>>>>> > > > >>>>>>>>> Cheers, > > > >>>>>>>>> Richard > > > >>>>>>>>> > > > >>>>>>>>> PIP-31: Add support for transactional messaging > > > >>>>>>>>> > > > >>>>>>>>> Motivation: Pulsar currently could improve upon their system > of > > > >>>>>> sending > > > >>>>>>>>> packets of data by implementing transactional messaging. This > > > >>>> system > > > >>>>>>>>> enforces eventual consistency within the system, and allows > > > >>>>> operations > > > >>>>>>>> to > > > >>>>>>>>> be performed atomically. > > > >>>>>>>>> > > > >>>>>>>>> Proposal: > > > >>>>>>>>> > > > >>>>>>>>> As described in the issue, we would implement the following > > > policy > > > >>>>> in > > > >>>>>>>>> Producer and Pulsar Broker: > > > >>>>>>>>> 1. The producer produces the pre-processing transaction > > message. > > > >>>> At > > > >>>>>> this > > > >>>>>>>>> point, the broker will set the status of this message to > > unknown. > > > >>>>>>>>> 2. After the local transaction is successfully executed, the > > > >>>> commit > > > >>>>>>>> message > > > >>>>>>>>> is sent, otherwise the rollback message is sent. > > > >>>>>>>>> 3. The broker receives the message. If it is a commit > message, > > it > > > >>>>>>>> modifies > > > >>>>>>>>> the transaction status to commit, and then sends an actual > > > message > > > >>>>> to > > > >>>>>>>> the > > > >>>>>>>>> consumer queue. At this time, the consumer can consume the > > > >>>> message. > > > >>>>>>>>> Otherwise, the transaction status is modified to rollback. > The > > > >>>>> message > > > >>>>>>>> will > > > >>>>>>>>> be discarded. > > > >>>>>>>>> 4. If at step 2, the producer is down or abnormal, at this > > time, > > > >>>> the > > > >>>>>>>> broker > > > >>>>>>>>> will periodically ask the specific producer for the status of > > the > > > >>>>>>>> message, > > > >>>>>>>>> and update the status according to the producer's response, > and > > > >>>>>> process > > > >>>>>>>> it > > > >>>>>>>>> according to step 3, the action that comes down. > > > >>>>>>>>> > > > >>>>>>>>> Specific concerns: > > > >>>>>>>>> There are a number of things we will improve upon or add: > > > >>>>>>>>> - A configuration called ```maxMessageUnknownTime```. > Consider > > > >>>> this > > > >>>>>>>>> scenario: the pre-processing transaction message is sent, but > > the > > > >>>>>>>> commit or > > > >>>>>>>>> rollback message is never received, which could mean that the > > > >>>> status > > > >>>>>> of > > > >>>>>>>> a > > > >>>>>>>>> message would be permanently unknown. To avoid this from > > > >>>> happening, > > > >>>>> we > > > >>>>>>>>> would need a config which limits the amount of time the > status > > of > > > >>>> a > > > >>>>>>>> message > > > >>>>>>>>> could be unknown (i.e. ```maxMessageUnknownTime```) After > that, > > > >>>> the > > > >>>>>>>> message > > > >>>>>>>>> would be discarded. > > > >>>>>>>>> - Logging would be updated to log the status of a message > i.e. > > > >>>>>> UNKNOWN, > > > >>>>>>>>> ROLLBACK, or COMMITTED. This would allow the user to know > > whether > > > >>>> or > > > >>>>>>>> not a > > > >>>>>>>>> message had failed or fallen through. > > > >>>>>>>>> > > > >>>>>>>>> Possible Additional API: > > > >>>>>>>>> - We would add a method which allows the user to query the > > state > > > >>>> of > > > >>>>>> the > > > >>>>>>>>> message i.e. ```getStateOfMessage(long id)``` > > > >>>>>>>>> > > > >>>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>>> > > > >>>> > > > >>>> > > > >>>> -- > > > >>>> -Ali > > > >>>> > > > >> > > > >> > > > > > > > > >