Richard,

Sure, I can drive the PIP. I will try to write down the details in your PIP
google doc and we can go from there.

Thanks,
Sijie

On Mon, Mar 4, 2019 at 5:01 AM Richard Yu <yohan.richard...@gmail.com>
wrote:

> Hi Dave, Mattteo, and Sijie,
>
> Thanks for pitching in on the discussion!
> Sijie, it would be great if you could drive this PIP. To be frank, I don't
> know what the best direction is.
> Oh, and Ivan, if you have any other idea. Let us know. :)
>
> If there is any changes that need to be made, edit the document
>
>
> On Sat, Mar 2, 2019 at 11:11 PM Sijie Guo <guosi...@gmail.com> wrote:
>
> > Matteo, Dave,
> >
> > I think you are talking about different things. My comments to both:
> >
> > > Once there's support for transactions in messaging API, there will be
> > > no need for a base class for functions. Rather a config option will
> > > allow to enable transactional mode.
> >
> > Matteo, If I understand your comment correctly, you are talking about
> > functions using transactions for processing semantics. If so, yes that
> > would be the end goal.
> >
> > > Yes, that way there is no additional broker overhead and whatever
> happens
> > when a commit happens is under the control of those making the
> transaction.
> >
> > Dave, this sounds an interesting idea and it is definitely do-able.
> Because
> > Pulsar is a multi-layered system and it is built on top of a reliable
> > storage, so a lot of components are just "stateless", "logical" and not
> > bound to any physical machines. so when we implement a component /
> > functionality, we basically implement a logical unit. How to run the
> logic
> > unit can be very flexible. It can run as a separated service, or as part
> of
> > broker, or in functions.
> >
> > - Sijie
> >
> >
> > On Sun, Mar 3, 2019 at 10:52 AM Dave Fisher <dave2w...@comcast.net>
> wrote:
> >
> > > Hi -
> > >
> > > > On Mar 2, 2019, at 6:39 PM, Sijie Guo <guosi...@gmail.com> wrote:
> > > >
> > > > Dave,
> > > >
> > > > You mean implementing the transactions in pulsar function?
> > >
> > > Yes, that way there is no additional broker overhead and whatever
> happens
> > > when a commit happens is under the control of those making the
> > transaction.
> > >
> > > I’m not sure if it would work, but it seems that functions, spouts, and
> > > connectors make sense as opposed to burdening the highly performant
> > brokers.
> > >
> > > Regards,
> > > Dave
> > >
> > > >
> > > > - Sijie
> > > >
> > > >> On Sun, Mar 3, 2019 at 1:52 AM Dave Fisher <dave2w...@comcast.net>
> > > wrote:
> > > >>
> > > >> Hi -
> > > >>
> > > >> Is this a case where a Pulsar function base class for transactions
> > would
> > > >> help?
> > > >>
> > > >> Regards,
> > > >> Dave
> > > >>
> > > >> Sent from my iPhone
> > > >>
> > > >>> On Mar 2, 2019, at 2:39 AM, Sijie Guo <guosi...@gmail.com> wrote:
> > > >>>
> > > >>> Pravega's model is a better model than Kafka - it addressed the
> > > >>> interleaving problems. However Pravega's model is based on a giant
> > > >>> replicated log and rewrite the data to a second tiered storage for
> > > >>> persistence, which basically re-implemented bookkeeper's logic in
> > > >> broker. A
> > > >>> fundamental drawback of Pravega is write amplifications. The
> > > >> amplifications
> > > >>> of both network and IO bandwidth are huge. If you use bookkeeper
> both
> > > for
> > > >>> its first-and-second tier storage and assume the bookkeeper
> > replication
> > > >>> factor is 3, pravega requires 6x network bandwidth and 12x IO
> > > bandwidth.
> > > >>> For a given message, it needs to write 3 times into the journal,
> and
> > 3
> > > >>> times for persistent. The amplifications hugely limit the
> throughput
> > at
> > > >>> pravega "brokers".
> > > >>>
> > > >>> - Sijie
> > > >>>
> > > >>>
> > > >>>
> > > >>>> On Sat, Mar 2, 2019 at 6:13 PM Ali Ahmed <ahmal...@gmail.com>
> > wrote:
> > > >>>>
> > > >>>> I agree we many want to review pravega's past efforts in this area
> > > also.
> > > >>>>
> > > >>>>
> > > >>>>
> > > >>
> > >
> >
> https://github.com/pravega/pravega/blob/master/documentation/src/docs/transactions.md
> > > >>>>
> > > >>>>
> > > >>
> > >
> >
> https://github.com/pravega/pravega/blob/master/client/src/main/java/io/pravega/client/stream/Transaction.java
> > > >>>>
> > > >>>> -Ali
> > > >>>>
> > > >>>>> On Sat, Mar 2, 2019 at 1:56 AM Sijie Guo <guosi...@gmail.com>
> > wrote:
> > > >>>>>
> > > >>>>> Kafka's implementation is interleaving committed messages with
> > > >>>> uncommitted
> > > >>>>> messages at storage. Personally I think it is a very ugly design
> > and
> > > >>>>> implementation.
> > > >>>>>
> > > >>>>> Pulsar is a segment centric system, where we have a shared
> segment
> > > >>>> storage
> > > >>>>> - bookkeeper. I think a better direction is to leverage the
> > segments
> > > >> (aka
> > > >>>>> ledgers)
> > > >>>>> for buffering uncommitted messages and commit the whole segment
> > when
> > > >> the
> > > >>>>> whole transaction is committed.
> > > >>>>>
> > > >>>>> A rough idea would be:
> > > >>>>>
> > > >>>>> 1) for any transaction, write the messages to a separate ledger
> (or
> > > >>>>> multiple separate ledger).
> > > >>>>> 2) during the transaction, accumulates the messages in those
> > ledgers.
> > > >>>>> 3) when commit, merge the txn ledgers back to the main data
> ledger.
> > > the
> > > >>>>> merge can be done either adding a meta message where data is
> stored
> > > in
> > > >>>> the
> > > >>>>> txn ledger or actually copying the data to data ledger (depending
> > on
> > > >> the
> > > >>>>> size of data accumulate in the transaction).
> > > >>>>> 4) when abort, delete the txn ledger. No other additional work to
> > be
> > > >>>> done.
> > > >>>>>
> > > >>>>> This would be producing a much clear design than Kafka.
> > > >>>>>
> > > >>>>> On Ivan's comments:
> > > >>>>>
> > > >>>>>> Transactional acknowledgement also needs to be taken into
> account
> > > >>>>>
> > > >>>>> I don't think we have to treat `transactional acknowledgement`
> as a
> > > >>>> special
> > > >>>>> case. currently `acknowledgment` are actually "append" operations
> > > into
> > > >>>>> cursor ledgers.
> > > >>>>> So the problem set can be reduced as `atomic append` to both data
> > > >> ledgers
> > > >>>>> and cursor ledgers. in that way, we can use one solution for
> > handling
> > > >>>>> appending data and updating cursors.
> > > >>>>>
> > > >>>>> Additionally, I think a related topic about transactions would be
> > > >>>>> supporting large sized message (e.g. >= 5MB). If we take the
> > > approach I
> > > >>>>> described above using a separated ledger for accumulating
> messages
> > > for
> > > >> a
> > > >>>>> transaction, that we are easy to model a large size message as a
> > > >>>>> transaction of chunked messages.
> > > >>>>>
> > > >>>>> @Richard, @Ivan let me know what do you think. If you guys think
> > the
> > > >>>>> direction I raised is a good one to go down, I am happy to write
> > them
> > > >>>> down
> > > >>>>> into details, and drive the design and coordinate the
> > implementations
> > > >> in
> > > >>>>> the community.
> > > >>>>>
> > > >>>>> - Sijie
> > > >>>>>
> > > >>>>> On Sat, Mar 2, 2019 at 9:45 AM Richard Yu <
> > > yohan.richard...@gmail.com>
> > > >>>>> wrote:
> > > >>>>>
> > > >>>>>> Hi all,
> > > >>>>>>
> > > >>>>>> We might be able to get some ideas on implementing this from
> > Kafka:
> > > >>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka
> > > >>>>>>
> > > >>>>>> Obviously, there is some differences in Kafka and Pulsar
> internals
> > > but
> > > >>>> at
> > > >>>>>> some level, the implementation would be similar.
> > > >>>>>> It should help.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> On Thu, Feb 28, 2019 at 4:29 PM Richard Yu <
> > > >> yohan.richard...@gmail.com
> > > >>>>>
> > > >>>>>> wrote:
> > > >>>>>>
> > > >>>>>>> Hi,
> > > >>>>>>>
> > > >>>>>>> Per request, I've created a doc so we could get some more input
> > in
> > > an
> > > >>>>>>> organized manner:
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>
> > >
> >
> https://docs.google.com/document/d/1mSUsJvPgThnWizeQqljKU5244BMEiYHabA6OP6QEHmQ/edit?usp=sharing
> > > >>>>>>>
> > > >>>>>>> And for Ivan's questions, I would answer accordingly.
> > > >>>>>>>
> > > >>>>>>>> By "set the message to unknown", do you mean the broker will
> > cache
> > > >>>> the
> > > >>>>>>>> message, not writing it to any log?
> > > >>>>>>>
> > > >>>>>>> We wouldn't cache the message from my interpretation of the
> > steps.
> > > >>>> What
> > > >>>>>>> the producer is first sending is a pre-processing message, not
> > the
> > > >>>> real
> > > >>>>>>> message itself. This step basically notifies the broker that
> the
> > > >>>>> message
> > > >>>>>> is
> > > >>>>>>> on its way. So all we have to do is store the message id and
> its
> > > >>>>>>> corresponding status in a map, and depending on the producer's
> > > >>>>> response,
> > > >>>>>>> the status will change accordingly.
> > > >>>>>>>
> > > >>>>>>>> In designs we've discussed previously, this was handled
> > > >>>>>>>> by a component called the transaction coordinator, which is a
> > > >>>> logical
> > > >>>>>>>> component which each broker knows how to talk to. For a
> > > transaction
> > > >>>>>>>> the commit message is sent to the coordinator, which writes it
> > to
> > > >>>> its
> > > >>>>>>>> own log, and then goes through each topic in the commit and
> > marks
> > > >>>> the
> > > >>>>>>>> transaction as completed.
> > > >>>>>>>
> > > >>>>>>> I wasn't aware of previous discussions on this topic, but it
> > seems
> > > >>>>> pretty
> > > >>>>>>> good to me. It's certainly better than what I would come up
> with.
> > > >>>>>>> If there's any more things we need to talk about, I suppose we
> > > could
> > > >>>>> move
> > > >>>>>>> it to the google doc to play around with.
> > > >>>>>>>
> > > >>>>>>> Hope we can get this PIP rolling.
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>> On Thu, Feb 28, 2019 at 2:53 AM Sijie Guo <guosi...@gmail.com>
> > > >>>> wrote:
> > > >>>>>>>
> > > >>>>>>>> Richard,
> > > >>>>>>>>
> > > >>>>>>>> Thank you for putting this put and pushing the discussion
> > forward.
> > > >>>>>>>>
> > > >>>>>>>> I think this is a very large feature. It might be worth
> > creating a
> > > >>>>>> google
> > > >>>>>>>> doc for it (which is better for collaboration). And I believe
> > Ivan
> > > >>>> has
> > > >>>>>>>> some
> > > >>>>>>>> thoughts as well. If you can put up a google doc (make it
> > > >>>>>> world-editable),
> > > >>>>>>>> Ivan can probably dump his thoughts there and we can finalize
> > the
> > > >>>>>>>> discussion and break down into tasks. So the whole community
> can
> > > >>>>>> actually
> > > >>>>>>>> work together at collaborating this.
> > > >>>>>>>>
> > > >>>>>>>> Thanks,
> > > >>>>>>>> Sijie
> > > >>>>>>>>
> > > >>>>>>>> On Thu, Feb 28, 2019 at 1:08 PM Richard Yu <
> > > >>>>> yohan.richard...@gmail.com>
> > > >>>>>>>> wrote:
> > > >>>>>>>>
> > > >>>>>>>>> Hi all,
> > > >>>>>>>>>
> > > >>>>>>>>> I would like to create a PIP for issue #2664 on Github. The
> > > >>>> details
> > > >>>>> of
> > > >>>>>>>> the
> > > >>>>>>>>> PIP are below.
> > > >>>>>>>>> I hope we could discuss this thoroughly.
> > > >>>>>>>>>
> > > >>>>>>>>> Cheers,
> > > >>>>>>>>> Richard
> > > >>>>>>>>>
> > > >>>>>>>>> PIP-31: Add support for transactional messaging
> > > >>>>>>>>>
> > > >>>>>>>>> Motivation: Pulsar currently could improve upon their system
> of
> > > >>>>>> sending
> > > >>>>>>>>> packets of data by implementing transactional messaging. This
> > > >>>> system
> > > >>>>>>>>> enforces eventual consistency within the system, and allows
> > > >>>>> operations
> > > >>>>>>>> to
> > > >>>>>>>>> be performed atomically.
> > > >>>>>>>>>
> > > >>>>>>>>> Proposal:
> > > >>>>>>>>>
> > > >>>>>>>>> As described in the issue, we would implement the following
> > > policy
> > > >>>>> in
> > > >>>>>>>>> Producer and Pulsar Broker:
> > > >>>>>>>>> 1. The producer produces the pre-processing transaction
> > message.
> > > >>>> At
> > > >>>>>> this
> > > >>>>>>>>> point, the broker will set the status of this message to
> > unknown.
> > > >>>>>>>>> 2. After the local transaction is successfully executed, the
> > > >>>> commit
> > > >>>>>>>> message
> > > >>>>>>>>> is sent, otherwise the rollback message is sent.
> > > >>>>>>>>> 3. The broker receives the message. If it is a commit
> message,
> > it
> > > >>>>>>>> modifies
> > > >>>>>>>>> the transaction status to commit, and then sends an actual
> > > message
> > > >>>>> to
> > > >>>>>>>> the
> > > >>>>>>>>> consumer queue. At this time, the consumer can consume the
> > > >>>> message.
> > > >>>>>>>>> Otherwise, the transaction status is modified to rollback.
> The
> > > >>>>> message
> > > >>>>>>>> will
> > > >>>>>>>>> be discarded.
> > > >>>>>>>>> 4. If at step 2, the producer is down or abnormal, at this
> > time,
> > > >>>> the
> > > >>>>>>>> broker
> > > >>>>>>>>> will periodically ask the specific producer for the status of
> > the
> > > >>>>>>>> message,
> > > >>>>>>>>> and update the status according to the producer's response,
> and
> > > >>>>>> process
> > > >>>>>>>> it
> > > >>>>>>>>> according to step 3, the action that comes down.
> > > >>>>>>>>>
> > > >>>>>>>>> Specific concerns:
> > > >>>>>>>>> There are a number of things we will improve upon or add:
> > > >>>>>>>>> - A configuration called ```maxMessageUnknownTime```.
> Consider
> > > >>>> this
> > > >>>>>>>>> scenario: the pre-processing transaction message is sent, but
> > the
> > > >>>>>>>> commit or
> > > >>>>>>>>> rollback message is never received, which could mean that the
> > > >>>> status
> > > >>>>>> of
> > > >>>>>>>> a
> > > >>>>>>>>> message would be permanently unknown. To avoid this from
> > > >>>> happening,
> > > >>>>> we
> > > >>>>>>>>> would need a config which limits the amount of time the
> status
> > of
> > > >>>> a
> > > >>>>>>>> message
> > > >>>>>>>>> could be unknown (i.e. ```maxMessageUnknownTime```) After
> that,
> > > >>>> the
> > > >>>>>>>> message
> > > >>>>>>>>> would be discarded.
> > > >>>>>>>>> - Logging would be updated to log the status of a message
> i.e.
> > > >>>>>> UNKNOWN,
> > > >>>>>>>>> ROLLBACK, or COMMITTED. This would allow the user to know
> > whether
> > > >>>> or
> > > >>>>>>>> not a
> > > >>>>>>>>> message had failed or fallen through.
> > > >>>>>>>>>
> > > >>>>>>>>> Possible Additional API:
> > > >>>>>>>>> - We would add a method which allows the user to query the
> > state
> > > >>>> of
> > > >>>>>> the
> > > >>>>>>>>> message i.e. ```getStateOfMessage(long id)```
> > > >>>>>>>>>
> > > >>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>
> > > >>>>
> > > >>>> --
> > > >>>> -Ali
> > > >>>>
> > > >>
> > > >>
> > >
> > >
> >
>

Reply via email to