Hi Dave, Mattteo, and Sijie,

Thanks for pitching in on the discussion!
Sijie, it would be great if you could drive this PIP. To be frank, I don't
know what the best direction is.
Oh, and Ivan, if you have any other idea. Let us know. :)

If there is any changes that need to be made, edit the document


On Sat, Mar 2, 2019 at 11:11 PM Sijie Guo <guosi...@gmail.com> wrote:

> Matteo, Dave,
>
> I think you are talking about different things. My comments to both:
>
> > Once there's support for transactions in messaging API, there will be
> > no need for a base class for functions. Rather a config option will
> > allow to enable transactional mode.
>
> Matteo, If I understand your comment correctly, you are talking about
> functions using transactions for processing semantics. If so, yes that
> would be the end goal.
>
> > Yes, that way there is no additional broker overhead and whatever happens
> when a commit happens is under the control of those making the transaction.
>
> Dave, this sounds an interesting idea and it is definitely do-able. Because
> Pulsar is a multi-layered system and it is built on top of a reliable
> storage, so a lot of components are just "stateless", "logical" and not
> bound to any physical machines. so when we implement a component /
> functionality, we basically implement a logical unit. How to run the logic
> unit can be very flexible. It can run as a separated service, or as part of
> broker, or in functions.
>
> - Sijie
>
>
> On Sun, Mar 3, 2019 at 10:52 AM Dave Fisher <dave2w...@comcast.net> wrote:
>
> > Hi -
> >
> > > On Mar 2, 2019, at 6:39 PM, Sijie Guo <guosi...@gmail.com> wrote:
> > >
> > > Dave,
> > >
> > > You mean implementing the transactions in pulsar function?
> >
> > Yes, that way there is no additional broker overhead and whatever happens
> > when a commit happens is under the control of those making the
> transaction.
> >
> > I’m not sure if it would work, but it seems that functions, spouts, and
> > connectors make sense as opposed to burdening the highly performant
> brokers.
> >
> > Regards,
> > Dave
> >
> > >
> > > - Sijie
> > >
> > >> On Sun, Mar 3, 2019 at 1:52 AM Dave Fisher <dave2w...@comcast.net>
> > wrote:
> > >>
> > >> Hi -
> > >>
> > >> Is this a case where a Pulsar function base class for transactions
> would
> > >> help?
> > >>
> > >> Regards,
> > >> Dave
> > >>
> > >> Sent from my iPhone
> > >>
> > >>> On Mar 2, 2019, at 2:39 AM, Sijie Guo <guosi...@gmail.com> wrote:
> > >>>
> > >>> Pravega's model is a better model than Kafka - it addressed the
> > >>> interleaving problems. However Pravega's model is based on a giant
> > >>> replicated log and rewrite the data to a second tiered storage for
> > >>> persistence, which basically re-implemented bookkeeper's logic in
> > >> broker. A
> > >>> fundamental drawback of Pravega is write amplifications. The
> > >> amplifications
> > >>> of both network and IO bandwidth are huge. If you use bookkeeper both
> > for
> > >>> its first-and-second tier storage and assume the bookkeeper
> replication
> > >>> factor is 3, pravega requires 6x network bandwidth and 12x IO
> > bandwidth.
> > >>> For a given message, it needs to write 3 times into the journal, and
> 3
> > >>> times for persistent. The amplifications hugely limit the throughput
> at
> > >>> pravega "brokers".
> > >>>
> > >>> - Sijie
> > >>>
> > >>>
> > >>>
> > >>>> On Sat, Mar 2, 2019 at 6:13 PM Ali Ahmed <ahmal...@gmail.com>
> wrote:
> > >>>>
> > >>>> I agree we many want to review pravega's past efforts in this area
> > also.
> > >>>>
> > >>>>
> > >>>>
> > >>
> >
> https://github.com/pravega/pravega/blob/master/documentation/src/docs/transactions.md
> > >>>>
> > >>>>
> > >>
> >
> https://github.com/pravega/pravega/blob/master/client/src/main/java/io/pravega/client/stream/Transaction.java
> > >>>>
> > >>>> -Ali
> > >>>>
> > >>>>> On Sat, Mar 2, 2019 at 1:56 AM Sijie Guo <guosi...@gmail.com>
> wrote:
> > >>>>>
> > >>>>> Kafka's implementation is interleaving committed messages with
> > >>>> uncommitted
> > >>>>> messages at storage. Personally I think it is a very ugly design
> and
> > >>>>> implementation.
> > >>>>>
> > >>>>> Pulsar is a segment centric system, where we have a shared segment
> > >>>> storage
> > >>>>> - bookkeeper. I think a better direction is to leverage the
> segments
> > >> (aka
> > >>>>> ledgers)
> > >>>>> for buffering uncommitted messages and commit the whole segment
> when
> > >> the
> > >>>>> whole transaction is committed.
> > >>>>>
> > >>>>> A rough idea would be:
> > >>>>>
> > >>>>> 1) for any transaction, write the messages to a separate ledger (or
> > >>>>> multiple separate ledger).
> > >>>>> 2) during the transaction, accumulates the messages in those
> ledgers.
> > >>>>> 3) when commit, merge the txn ledgers back to the main data ledger.
> > the
> > >>>>> merge can be done either adding a meta message where data is stored
> > in
> > >>>> the
> > >>>>> txn ledger or actually copying the data to data ledger (depending
> on
> > >> the
> > >>>>> size of data accumulate in the transaction).
> > >>>>> 4) when abort, delete the txn ledger. No other additional work to
> be
> > >>>> done.
> > >>>>>
> > >>>>> This would be producing a much clear design than Kafka.
> > >>>>>
> > >>>>> On Ivan's comments:
> > >>>>>
> > >>>>>> Transactional acknowledgement also needs to be taken into account
> > >>>>>
> > >>>>> I don't think we have to treat `transactional acknowledgement` as a
> > >>>> special
> > >>>>> case. currently `acknowledgment` are actually "append" operations
> > into
> > >>>>> cursor ledgers.
> > >>>>> So the problem set can be reduced as `atomic append` to both data
> > >> ledgers
> > >>>>> and cursor ledgers. in that way, we can use one solution for
> handling
> > >>>>> appending data and updating cursors.
> > >>>>>
> > >>>>> Additionally, I think a related topic about transactions would be
> > >>>>> supporting large sized message (e.g. >= 5MB). If we take the
> > approach I
> > >>>>> described above using a separated ledger for accumulating messages
> > for
> > >> a
> > >>>>> transaction, that we are easy to model a large size message as a
> > >>>>> transaction of chunked messages.
> > >>>>>
> > >>>>> @Richard, @Ivan let me know what do you think. If you guys think
> the
> > >>>>> direction I raised is a good one to go down, I am happy to write
> them
> > >>>> down
> > >>>>> into details, and drive the design and coordinate the
> implementations
> > >> in
> > >>>>> the community.
> > >>>>>
> > >>>>> - Sijie
> > >>>>>
> > >>>>> On Sat, Mar 2, 2019 at 9:45 AM Richard Yu <
> > yohan.richard...@gmail.com>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> Hi all,
> > >>>>>>
> > >>>>>> We might be able to get some ideas on implementing this from
> Kafka:
> > >>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka
> > >>>>>>
> > >>>>>> Obviously, there is some differences in Kafka and Pulsar internals
> > but
> > >>>> at
> > >>>>>> some level, the implementation would be similar.
> > >>>>>> It should help.
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> On Thu, Feb 28, 2019 at 4:29 PM Richard Yu <
> > >> yohan.richard...@gmail.com
> > >>>>>
> > >>>>>> wrote:
> > >>>>>>
> > >>>>>>> Hi,
> > >>>>>>>
> > >>>>>>> Per request, I've created a doc so we could get some more input
> in
> > an
> > >>>>>>> organized manner:
> > >>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>
> >
> https://docs.google.com/document/d/1mSUsJvPgThnWizeQqljKU5244BMEiYHabA6OP6QEHmQ/edit?usp=sharing
> > >>>>>>>
> > >>>>>>> And for Ivan's questions, I would answer accordingly.
> > >>>>>>>
> > >>>>>>>> By "set the message to unknown", do you mean the broker will
> cache
> > >>>> the
> > >>>>>>>> message, not writing it to any log?
> > >>>>>>>
> > >>>>>>> We wouldn't cache the message from my interpretation of the
> steps.
> > >>>> What
> > >>>>>>> the producer is first sending is a pre-processing message, not
> the
> > >>>> real
> > >>>>>>> message itself. This step basically notifies the broker that the
> > >>>>> message
> > >>>>>> is
> > >>>>>>> on its way. So all we have to do is store the message id and its
> > >>>>>>> corresponding status in a map, and depending on the producer's
> > >>>>> response,
> > >>>>>>> the status will change accordingly.
> > >>>>>>>
> > >>>>>>>> In designs we've discussed previously, this was handled
> > >>>>>>>> by a component called the transaction coordinator, which is a
> > >>>> logical
> > >>>>>>>> component which each broker knows how to talk to. For a
> > transaction
> > >>>>>>>> the commit message is sent to the coordinator, which writes it
> to
> > >>>> its
> > >>>>>>>> own log, and then goes through each topic in the commit and
> marks
> > >>>> the
> > >>>>>>>> transaction as completed.
> > >>>>>>>
> > >>>>>>> I wasn't aware of previous discussions on this topic, but it
> seems
> > >>>>> pretty
> > >>>>>>> good to me. It's certainly better than what I would come up with.
> > >>>>>>> If there's any more things we need to talk about, I suppose we
> > could
> > >>>>> move
> > >>>>>>> it to the google doc to play around with.
> > >>>>>>>
> > >>>>>>> Hope we can get this PIP rolling.
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> On Thu, Feb 28, 2019 at 2:53 AM Sijie Guo <guosi...@gmail.com>
> > >>>> wrote:
> > >>>>>>>
> > >>>>>>>> Richard,
> > >>>>>>>>
> > >>>>>>>> Thank you for putting this put and pushing the discussion
> forward.
> > >>>>>>>>
> > >>>>>>>> I think this is a very large feature. It might be worth
> creating a
> > >>>>>> google
> > >>>>>>>> doc for it (which is better for collaboration). And I believe
> Ivan
> > >>>> has
> > >>>>>>>> some
> > >>>>>>>> thoughts as well. If you can put up a google doc (make it
> > >>>>>> world-editable),
> > >>>>>>>> Ivan can probably dump his thoughts there and we can finalize
> the
> > >>>>>>>> discussion and break down into tasks. So the whole community can
> > >>>>>> actually
> > >>>>>>>> work together at collaborating this.
> > >>>>>>>>
> > >>>>>>>> Thanks,
> > >>>>>>>> Sijie
> > >>>>>>>>
> > >>>>>>>> On Thu, Feb 28, 2019 at 1:08 PM Richard Yu <
> > >>>>> yohan.richard...@gmail.com>
> > >>>>>>>> wrote:
> > >>>>>>>>
> > >>>>>>>>> Hi all,
> > >>>>>>>>>
> > >>>>>>>>> I would like to create a PIP for issue #2664 on Github. The
> > >>>> details
> > >>>>> of
> > >>>>>>>> the
> > >>>>>>>>> PIP are below.
> > >>>>>>>>> I hope we could discuss this thoroughly.
> > >>>>>>>>>
> > >>>>>>>>> Cheers,
> > >>>>>>>>> Richard
> > >>>>>>>>>
> > >>>>>>>>> PIP-31: Add support for transactional messaging
> > >>>>>>>>>
> > >>>>>>>>> Motivation: Pulsar currently could improve upon their system of
> > >>>>>> sending
> > >>>>>>>>> packets of data by implementing transactional messaging. This
> > >>>> system
> > >>>>>>>>> enforces eventual consistency within the system, and allows
> > >>>>> operations
> > >>>>>>>> to
> > >>>>>>>>> be performed atomically.
> > >>>>>>>>>
> > >>>>>>>>> Proposal:
> > >>>>>>>>>
> > >>>>>>>>> As described in the issue, we would implement the following
> > policy
> > >>>>> in
> > >>>>>>>>> Producer and Pulsar Broker:
> > >>>>>>>>> 1. The producer produces the pre-processing transaction
> message.
> > >>>> At
> > >>>>>> this
> > >>>>>>>>> point, the broker will set the status of this message to
> unknown.
> > >>>>>>>>> 2. After the local transaction is successfully executed, the
> > >>>> commit
> > >>>>>>>> message
> > >>>>>>>>> is sent, otherwise the rollback message is sent.
> > >>>>>>>>> 3. The broker receives the message. If it is a commit message,
> it
> > >>>>>>>> modifies
> > >>>>>>>>> the transaction status to commit, and then sends an actual
> > message
> > >>>>> to
> > >>>>>>>> the
> > >>>>>>>>> consumer queue. At this time, the consumer can consume the
> > >>>> message.
> > >>>>>>>>> Otherwise, the transaction status is modified to rollback. The
> > >>>>> message
> > >>>>>>>> will
> > >>>>>>>>> be discarded.
> > >>>>>>>>> 4. If at step 2, the producer is down or abnormal, at this
> time,
> > >>>> the
> > >>>>>>>> broker
> > >>>>>>>>> will periodically ask the specific producer for the status of
> the
> > >>>>>>>> message,
> > >>>>>>>>> and update the status according to the producer's response, and
> > >>>>>> process
> > >>>>>>>> it
> > >>>>>>>>> according to step 3, the action that comes down.
> > >>>>>>>>>
> > >>>>>>>>> Specific concerns:
> > >>>>>>>>> There are a number of things we will improve upon or add:
> > >>>>>>>>> - A configuration called ```maxMessageUnknownTime```. Consider
> > >>>> this
> > >>>>>>>>> scenario: the pre-processing transaction message is sent, but
> the
> > >>>>>>>> commit or
> > >>>>>>>>> rollback message is never received, which could mean that the
> > >>>> status
> > >>>>>> of
> > >>>>>>>> a
> > >>>>>>>>> message would be permanently unknown. To avoid this from
> > >>>> happening,
> > >>>>> we
> > >>>>>>>>> would need a config which limits the amount of time the status
> of
> > >>>> a
> > >>>>>>>> message
> > >>>>>>>>> could be unknown (i.e. ```maxMessageUnknownTime```) After that,
> > >>>> the
> > >>>>>>>> message
> > >>>>>>>>> would be discarded.
> > >>>>>>>>> - Logging would be updated to log the status of a message i.e.
> > >>>>>> UNKNOWN,
> > >>>>>>>>> ROLLBACK, or COMMITTED. This would allow the user to know
> whether
> > >>>> or
> > >>>>>>>> not a
> > >>>>>>>>> message had failed or fallen through.
> > >>>>>>>>>
> > >>>>>>>>> Possible Additional API:
> > >>>>>>>>> - We would add a method which allows the user to query the
> state
> > >>>> of
> > >>>>>> the
> > >>>>>>>>> message i.e. ```getStateOfMessage(long id)```
> > >>>>>>>>>
> > >>>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>>
> > >>>> --
> > >>>> -Ali
> > >>>>
> > >>
> > >>
> >
> >
>

Reply via email to