Hi all,

We might be able to get some ideas on implementing this from Kafka:
https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka

Obviously, there is some differences in Kafka and Pulsar internals but at
some level, the implementation would be similar.
It should help.



On Thu, Feb 28, 2019 at 4:29 PM Richard Yu <yohan.richard...@gmail.com>
wrote:

> Hi,
>
> Per request, I've created a doc so we could get some more input in an
> organized manner:
>
> https://docs.google.com/document/d/1mSUsJvPgThnWizeQqljKU5244BMEiYHabA6OP6QEHmQ/edit?usp=sharing
>
> And for Ivan's questions, I would answer accordingly.
>
> >By "set the message to unknown", do you mean the broker will cache the
> >message, not writing it to any log?
>
> We wouldn't cache the message from my interpretation of the steps. What
> the producer is first sending is a pre-processing message, not the real
> message itself. This step basically notifies the broker that the message is
> on its way. So all we have to do is store the message id and its
> corresponding status in a map, and depending on the producer's response,
> the status will change accordingly.
>
> > In designs we've discussed previously, this was handled
> > by a component called the transaction coordinator, which is a logical
> > component which each broker knows how to talk to. For a transaction
> > the commit message is sent to the coordinator, which writes it to its
> > own log, and then goes through each topic in the commit and marks the
> > transaction as completed.
>
> I wasn't aware of previous discussions on this topic, but it seems pretty
> good to me. It's certainly better than what I would come up with.
> If there's any more things we need to talk about, I suppose we could move
> it to the google doc to play around with.
>
> Hope we can get this PIP rolling.
>
>
> On Thu, Feb 28, 2019 at 2:53 AM Sijie Guo <guosi...@gmail.com> wrote:
>
>> Richard,
>>
>> Thank you for putting this put and pushing the discussion forward.
>>
>> I think this is a very large feature. It might be worth creating a google
>> doc for it (which is better for collaboration). And I believe Ivan has
>> some
>> thoughts as well. If you can put up a google doc (make it world-editable),
>> Ivan can probably dump his thoughts there and we can finalize the
>> discussion and break down into tasks. So the whole community can actually
>> work together at collaborating this.
>>
>> Thanks,
>> Sijie
>>
>> On Thu, Feb 28, 2019 at 1:08 PM Richard Yu <yohan.richard...@gmail.com>
>> wrote:
>>
>> > Hi all,
>> >
>> > I would like to create a PIP for issue #2664 on Github. The details of
>> the
>> > PIP are below.
>> > I hope we could discuss this thoroughly.
>> >
>> > Cheers,
>> > Richard
>> >
>> > PIP-31: Add support for transactional messaging
>> >
>> > Motivation: Pulsar currently could improve upon their system of sending
>> > packets of data by implementing transactional messaging. This system
>> > enforces eventual consistency within the system, and allows operations
>> to
>> > be performed atomically.
>> >
>> > Proposal:
>> >
>> > As described in the issue, we would implement the following policy in
>> > Producer and Pulsar Broker:
>> > 1. The producer produces the pre-processing transaction message. At this
>> > point, the broker will set the status of this message to unknown.
>> > 2. After the local transaction is successfully executed, the commit
>> message
>> > is sent, otherwise the rollback message is sent.
>> > 3. The broker receives the message. If it is a commit message, it
>> modifies
>> > the transaction status to commit, and then sends an actual message to
>> the
>> > consumer queue. At this time, the consumer can consume the message.
>> > Otherwise, the transaction status is modified to rollback. The message
>> will
>> > be discarded.
>> > 4. If at step 2, the producer is down or abnormal, at this time, the
>> broker
>> > will periodically ask the specific producer for the status of the
>> message,
>> > and update the status according to the producer's response, and process
>> it
>> > according to step 3, the action that comes down.
>> >
>> > Specific concerns:
>> > There are a number of things we will improve upon or add:
>> > - A configuration called ```maxMessageUnknownTime```. Consider this
>> > scenario: the pre-processing transaction message is sent, but the
>> commit or
>> > rollback message is never received, which could mean that the status of
>> a
>> > message would be permanently unknown. To avoid this from happening, we
>> > would need a config which limits the amount of time the status of a
>> message
>> > could be unknown (i.e. ```maxMessageUnknownTime```) After that, the
>> message
>> > would be discarded.
>> > - Logging would be updated to log the status of a message i.e. UNKNOWN,
>> > ROLLBACK, or COMMITTED. This would allow the user to know whether or
>> not a
>> > message had failed or fallen through.
>> >
>> > Possible Additional API:
>> > - We would add a method which allows the user to query the state of the
>> > message i.e. ```getStateOfMessage(long id)```
>> >
>>
>

Reply via email to