Matteo, Dave, I think you are talking about different things. My comments to both:
> Once there's support for transactions in messaging API, there will be > no need for a base class for functions. Rather a config option will > allow to enable transactional mode. Matteo, If I understand your comment correctly, you are talking about functions using transactions for processing semantics. If so, yes that would be the end goal. > Yes, that way there is no additional broker overhead and whatever happens when a commit happens is under the control of those making the transaction. Dave, this sounds an interesting idea and it is definitely do-able. Because Pulsar is a multi-layered system and it is built on top of a reliable storage, so a lot of components are just "stateless", "logical" and not bound to any physical machines. so when we implement a component / functionality, we basically implement a logical unit. How to run the logic unit can be very flexible. It can run as a separated service, or as part of broker, or in functions. - Sijie On Sun, Mar 3, 2019 at 10:52 AM Dave Fisher <dave2w...@comcast.net> wrote: > Hi - > > > On Mar 2, 2019, at 6:39 PM, Sijie Guo <guosi...@gmail.com> wrote: > > > > Dave, > > > > You mean implementing the transactions in pulsar function? > > Yes, that way there is no additional broker overhead and whatever happens > when a commit happens is under the control of those making the transaction. > > I’m not sure if it would work, but it seems that functions, spouts, and > connectors make sense as opposed to burdening the highly performant brokers. > > Regards, > Dave > > > > > - Sijie > > > >> On Sun, Mar 3, 2019 at 1:52 AM Dave Fisher <dave2w...@comcast.net> > wrote: > >> > >> Hi - > >> > >> Is this a case where a Pulsar function base class for transactions would > >> help? > >> > >> Regards, > >> Dave > >> > >> Sent from my iPhone > >> > >>> On Mar 2, 2019, at 2:39 AM, Sijie Guo <guosi...@gmail.com> wrote: > >>> > >>> Pravega's model is a better model than Kafka - it addressed the > >>> interleaving problems. However Pravega's model is based on a giant > >>> replicated log and rewrite the data to a second tiered storage for > >>> persistence, which basically re-implemented bookkeeper's logic in > >> broker. A > >>> fundamental drawback of Pravega is write amplifications. The > >> amplifications > >>> of both network and IO bandwidth are huge. If you use bookkeeper both > for > >>> its first-and-second tier storage and assume the bookkeeper replication > >>> factor is 3, pravega requires 6x network bandwidth and 12x IO > bandwidth. > >>> For a given message, it needs to write 3 times into the journal, and 3 > >>> times for persistent. The amplifications hugely limit the throughput at > >>> pravega "brokers". > >>> > >>> - Sijie > >>> > >>> > >>> > >>>> On Sat, Mar 2, 2019 at 6:13 PM Ali Ahmed <ahmal...@gmail.com> wrote: > >>>> > >>>> I agree we many want to review pravega's past efforts in this area > also. > >>>> > >>>> > >>>> > >> > https://github.com/pravega/pravega/blob/master/documentation/src/docs/transactions.md > >>>> > >>>> > >> > https://github.com/pravega/pravega/blob/master/client/src/main/java/io/pravega/client/stream/Transaction.java > >>>> > >>>> -Ali > >>>> > >>>>> On Sat, Mar 2, 2019 at 1:56 AM Sijie Guo <guosi...@gmail.com> wrote: > >>>>> > >>>>> Kafka's implementation is interleaving committed messages with > >>>> uncommitted > >>>>> messages at storage. Personally I think it is a very ugly design and > >>>>> implementation. > >>>>> > >>>>> Pulsar is a segment centric system, where we have a shared segment > >>>> storage > >>>>> - bookkeeper. I think a better direction is to leverage the segments > >> (aka > >>>>> ledgers) > >>>>> for buffering uncommitted messages and commit the whole segment when > >> the > >>>>> whole transaction is committed. > >>>>> > >>>>> A rough idea would be: > >>>>> > >>>>> 1) for any transaction, write the messages to a separate ledger (or > >>>>> multiple separate ledger). > >>>>> 2) during the transaction, accumulates the messages in those ledgers. > >>>>> 3) when commit, merge the txn ledgers back to the main data ledger. > the > >>>>> merge can be done either adding a meta message where data is stored > in > >>>> the > >>>>> txn ledger or actually copying the data to data ledger (depending on > >> the > >>>>> size of data accumulate in the transaction). > >>>>> 4) when abort, delete the txn ledger. No other additional work to be > >>>> done. > >>>>> > >>>>> This would be producing a much clear design than Kafka. > >>>>> > >>>>> On Ivan's comments: > >>>>> > >>>>>> Transactional acknowledgement also needs to be taken into account > >>>>> > >>>>> I don't think we have to treat `transactional acknowledgement` as a > >>>> special > >>>>> case. currently `acknowledgment` are actually "append" operations > into > >>>>> cursor ledgers. > >>>>> So the problem set can be reduced as `atomic append` to both data > >> ledgers > >>>>> and cursor ledgers. in that way, we can use one solution for handling > >>>>> appending data and updating cursors. > >>>>> > >>>>> Additionally, I think a related topic about transactions would be > >>>>> supporting large sized message (e.g. >= 5MB). If we take the > approach I > >>>>> described above using a separated ledger for accumulating messages > for > >> a > >>>>> transaction, that we are easy to model a large size message as a > >>>>> transaction of chunked messages. > >>>>> > >>>>> @Richard, @Ivan let me know what do you think. If you guys think the > >>>>> direction I raised is a good one to go down, I am happy to write them > >>>> down > >>>>> into details, and drive the design and coordinate the implementations > >> in > >>>>> the community. > >>>>> > >>>>> - Sijie > >>>>> > >>>>> On Sat, Mar 2, 2019 at 9:45 AM Richard Yu < > yohan.richard...@gmail.com> > >>>>> wrote: > >>>>> > >>>>>> Hi all, > >>>>>> > >>>>>> We might be able to get some ideas on implementing this from Kafka: > >>>>>> > >>>>>> > >>>>> > >>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka > >>>>>> > >>>>>> Obviously, there is some differences in Kafka and Pulsar internals > but > >>>> at > >>>>>> some level, the implementation would be similar. > >>>>>> It should help. > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Thu, Feb 28, 2019 at 4:29 PM Richard Yu < > >> yohan.richard...@gmail.com > >>>>> > >>>>>> wrote: > >>>>>> > >>>>>>> Hi, > >>>>>>> > >>>>>>> Per request, I've created a doc so we could get some more input in > an > >>>>>>> organized manner: > >>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >> > https://docs.google.com/document/d/1mSUsJvPgThnWizeQqljKU5244BMEiYHabA6OP6QEHmQ/edit?usp=sharing > >>>>>>> > >>>>>>> And for Ivan's questions, I would answer accordingly. > >>>>>>> > >>>>>>>> By "set the message to unknown", do you mean the broker will cache > >>>> the > >>>>>>>> message, not writing it to any log? > >>>>>>> > >>>>>>> We wouldn't cache the message from my interpretation of the steps. > >>>> What > >>>>>>> the producer is first sending is a pre-processing message, not the > >>>> real > >>>>>>> message itself. This step basically notifies the broker that the > >>>>> message > >>>>>> is > >>>>>>> on its way. So all we have to do is store the message id and its > >>>>>>> corresponding status in a map, and depending on the producer's > >>>>> response, > >>>>>>> the status will change accordingly. > >>>>>>> > >>>>>>>> In designs we've discussed previously, this was handled > >>>>>>>> by a component called the transaction coordinator, which is a > >>>> logical > >>>>>>>> component which each broker knows how to talk to. For a > transaction > >>>>>>>> the commit message is sent to the coordinator, which writes it to > >>>> its > >>>>>>>> own log, and then goes through each topic in the commit and marks > >>>> the > >>>>>>>> transaction as completed. > >>>>>>> > >>>>>>> I wasn't aware of previous discussions on this topic, but it seems > >>>>> pretty > >>>>>>> good to me. It's certainly better than what I would come up with. > >>>>>>> If there's any more things we need to talk about, I suppose we > could > >>>>> move > >>>>>>> it to the google doc to play around with. > >>>>>>> > >>>>>>> Hope we can get this PIP rolling. > >>>>>>> > >>>>>>> > >>>>>>> On Thu, Feb 28, 2019 at 2:53 AM Sijie Guo <guosi...@gmail.com> > >>>> wrote: > >>>>>>> > >>>>>>>> Richard, > >>>>>>>> > >>>>>>>> Thank you for putting this put and pushing the discussion forward. > >>>>>>>> > >>>>>>>> I think this is a very large feature. It might be worth creating a > >>>>>> google > >>>>>>>> doc for it (which is better for collaboration). And I believe Ivan > >>>> has > >>>>>>>> some > >>>>>>>> thoughts as well. If you can put up a google doc (make it > >>>>>> world-editable), > >>>>>>>> Ivan can probably dump his thoughts there and we can finalize the > >>>>>>>> discussion and break down into tasks. So the whole community can > >>>>>> actually > >>>>>>>> work together at collaborating this. > >>>>>>>> > >>>>>>>> Thanks, > >>>>>>>> Sijie > >>>>>>>> > >>>>>>>> On Thu, Feb 28, 2019 at 1:08 PM Richard Yu < > >>>>> yohan.richard...@gmail.com> > >>>>>>>> wrote: > >>>>>>>> > >>>>>>>>> Hi all, > >>>>>>>>> > >>>>>>>>> I would like to create a PIP for issue #2664 on Github. The > >>>> details > >>>>> of > >>>>>>>> the > >>>>>>>>> PIP are below. > >>>>>>>>> I hope we could discuss this thoroughly. > >>>>>>>>> > >>>>>>>>> Cheers, > >>>>>>>>> Richard > >>>>>>>>> > >>>>>>>>> PIP-31: Add support for transactional messaging > >>>>>>>>> > >>>>>>>>> Motivation: Pulsar currently could improve upon their system of > >>>>>> sending > >>>>>>>>> packets of data by implementing transactional messaging. This > >>>> system > >>>>>>>>> enforces eventual consistency within the system, and allows > >>>>> operations > >>>>>>>> to > >>>>>>>>> be performed atomically. > >>>>>>>>> > >>>>>>>>> Proposal: > >>>>>>>>> > >>>>>>>>> As described in the issue, we would implement the following > policy > >>>>> in > >>>>>>>>> Producer and Pulsar Broker: > >>>>>>>>> 1. The producer produces the pre-processing transaction message. > >>>> At > >>>>>> this > >>>>>>>>> point, the broker will set the status of this message to unknown. > >>>>>>>>> 2. After the local transaction is successfully executed, the > >>>> commit > >>>>>>>> message > >>>>>>>>> is sent, otherwise the rollback message is sent. > >>>>>>>>> 3. The broker receives the message. If it is a commit message, it > >>>>>>>> modifies > >>>>>>>>> the transaction status to commit, and then sends an actual > message > >>>>> to > >>>>>>>> the > >>>>>>>>> consumer queue. At this time, the consumer can consume the > >>>> message. > >>>>>>>>> Otherwise, the transaction status is modified to rollback. The > >>>>> message > >>>>>>>> will > >>>>>>>>> be discarded. > >>>>>>>>> 4. If at step 2, the producer is down or abnormal, at this time, > >>>> the > >>>>>>>> broker > >>>>>>>>> will periodically ask the specific producer for the status of the > >>>>>>>> message, > >>>>>>>>> and update the status according to the producer's response, and > >>>>>> process > >>>>>>>> it > >>>>>>>>> according to step 3, the action that comes down. > >>>>>>>>> > >>>>>>>>> Specific concerns: > >>>>>>>>> There are a number of things we will improve upon or add: > >>>>>>>>> - A configuration called ```maxMessageUnknownTime```. Consider > >>>> this > >>>>>>>>> scenario: the pre-processing transaction message is sent, but the > >>>>>>>> commit or > >>>>>>>>> rollback message is never received, which could mean that the > >>>> status > >>>>>> of > >>>>>>>> a > >>>>>>>>> message would be permanently unknown. To avoid this from > >>>> happening, > >>>>> we > >>>>>>>>> would need a config which limits the amount of time the status of > >>>> a > >>>>>>>> message > >>>>>>>>> could be unknown (i.e. ```maxMessageUnknownTime```) After that, > >>>> the > >>>>>>>> message > >>>>>>>>> would be discarded. > >>>>>>>>> - Logging would be updated to log the status of a message i.e. > >>>>>> UNKNOWN, > >>>>>>>>> ROLLBACK, or COMMITTED. This would allow the user to know whether > >>>> or > >>>>>>>> not a > >>>>>>>>> message had failed or fallen through. > >>>>>>>>> > >>>>>>>>> Possible Additional API: > >>>>>>>>> - We would add a method which allows the user to query the state > >>>> of > >>>>>> the > >>>>>>>>> message i.e. ```getStateOfMessage(long id)``` > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>>> > >>>> -- > >>>> -Ali > >>>> > >> > >> > >