Hi Asaf, thank you for the very detailed reply.

> The problem we have today is that while we have sent a request to reset
the subscription position, the broker decides to:
> 1. Close the TCP connection which in turn causes the client to clear any
pending messages it has in the queue.
> 2. Continue to send messages from the previous position, up to a certain
point where the broker "shifts gear" and starts sending messages from the
new position.

> Since Pulsar doesn't follow a request-response model but has a
bi-directional protocol, the client can send a command to fetch messages
using a new session
> sequence number, while the server can still send messages using the old
session number. Using the Session Sequence Number the client can't tell the
> difference between the messages being pushed from the server to it.
I totally agree with you. I am aware of something wrong in the pip when I
re-read this part of the code.


> # What are the issues with this PIP?
> 1. The PIP decides to solve the problem listed above *only* for exclusive
> and failover subscriptions where you have only a single consumer. The
> problem still remains at large with Shared or Key Shared subscriptions.
> 2. The cost of solving a small portion of the problem is high:
>     Added Complexity - Adding another field to the protocol, and another
> thing to check. I believe we should aim to reduce the cognitive load of
the
> developers of Pulsar.
> 3. There are no rejected solutions - We always need to examine all
> available options and list why we decided against them.
> 4. Lack of background knowledge (context) - it's super hard IMO to grasp
> the idea without so much context missing: The client-server protocol
> pertaining to this PIP, including its async nature, what is an epoch and
> why it was introduced, what are flow permits. I'm not saying explain all
> pulsar in this doc, but just include a brief explanation of that
terminology.
>
> # What We Suggest
>
> Rethink the solution.
> 1. The consumer (one of many) will send a seek command to the broker, and
> at the same time clear its internal queue and wait for a response from the
> broker.
> 2. The broker upon receiving the seek command, will
>      a. Stop sending dispatching messages to consumers.
>      b. Notify all consumers via a command (new) that the subscription
> position was asked to be reset. Consumers receiving this command will
clear
> their internal queue. The broker will no longer close the TCP connection
> (with its adverse effects on other consumers and produces "riding" on that
> connection)
>      c. Reset the cursor to the newly requested position.
>      d. Continue dispatching messages from newly requested positions to
consumers.
>

Good suggestions. I'll look into these issues and rethink the solution. I
will rewrite this pip according to your suggestions.
Thanks again for your review.

Asaf Mesika <asaf.mes...@gmail.com> 于2022年9月7日周三 23:12写道:

> Hi Qiang,
>
> We have a brainstorming session on this PIP over Zoom with Penghui, Hang,
> and more people, and I'm jotting down here our feedback.
>
> Before I do that, I just want to write my own understanding of the
> document, for other readers:
>
> # Context
> Pulsar, as opposed to other distributed / streaming systems, took the
> approach of a push model. The client (consumer that is) asks for 1000
> messages (that's the consumer's remaining capacity in its internal queue)
> from the broker (that process is named flow permits). The broker was now
> given permission to send 1000 messages to the client, hence utilizing the
> TCP connection to send those 1000 messages as they were ready to be sent.
>
> The consumer has the ability to request the subscription to reset its
> position to the requested new position.
> The problem we have today is that while we have sent a request to reset the
> subscription position, the broker decides to:
> 1. Close the TCP connection which in turn causes the client to clear any
> pending messages it has in the queue.
> 2. Continue to send messages from the previous position, up to a certain
> point where the broker "shifts gear" and starts sending messages from the
> new position.
>
> So the problem is that you would expect that after the connection was
> reset, only messages from the new position will be sent to the consumer,
> but that doesn't happen.
>
> We have to keep in mind, that we have effectively two scenarios here from
> the point of view of the consumer:
> 1. Single consumer - It can be due to using an Exclusive subscription, or
> being a consumer attached to a single topic since the subscription is of
> type Failover.
> 2. Multiple consumers - In a Shared or Key Shared subscription types. In
> this case, one of those consumers can decide to reset the position of the
> *subscription*. When that happens, the broker decides, again, to reset all
> existing TCP connections to all consumers upon receiving the seek command,
> and you would expect any messages sent afterward to be from the new
> position, which again doesn't happen.
>
> Another really important piece of information we need to bring to the
> context of the reader here is the notion of an epoch. First, the epoch in
> Pulsar PIPs was introduced in PIP-84. The idea is that every time the
> client starts a "session" of requesting and receiving messages in response,
> the client will send a Session Sequence Number, and the server responds to
> those message requests with the same session sequence number. Since Pulsar
> doesn't follow a request-response model but has a bi-directional protocol,
> the client can send a command to fetch messages using a new session
> sequence number, while the server can still send messages using the old
> session number. Using the Session Sequence Number the client can't tell the
> difference between the messages being pushed from the server to it. That
> Session Sequence Number has the one referred to as Epoch in PIP-84 and also
> in this PIP.
> The idea was somehow to demarcate the responses coming from the server
> based on the commands the client sends as they are *independent* (async).
>
> # What are the issues with this PIP?
> 1. The PIP decides to solve the problem listed above *only* for exclusive
> and failover subscriptions where you have only a single consumer. The
> problem still remains at large with Shared or Key Shared subscriptions.
> 2. The cost of solving a small portion of the problem is high:
>     Added Complexity - Adding another field to the protocol, and another
> thing to check. I believe we should aim to reduce the cognitive load of the
> developers of Pulsar.
> 3. There are no rejected solutions - We always need to examine all
> available options and list why we decided against them.
> 4. Lack of background knowledge (context) - it's super hard IMO to grasp
> the idea without so much context missing: The client-server protocol
> pertaining to this PIP, including its async nature, what is an epoch and
> why it was introduced, what are flow permits. I'm not saying explain all
> pulsar in this doc, but just include a brief explanation of that
> terminology.
>
> # What We Suggest
>
> Rethink the solution.
> 1. The consumer (one of many) will send a seek command to the broker, and
> at the same time clear its internal queue and wait for a response from the
> broker.
> 2. The broker upon receiving the seek command, will
>      a. Stop sending dispatching messages to consumers.
>      b. Notify all consumers via a command (new) that the subscription
> position was asked to be reset. Consumers receiving this command will clear
> their internal queue. The broker will no longer close the TCP connection
> (with its adverse effects on other consumers and produces "riding" on that
> connection)
>      c. Reset the cursor to the newly requested position.
>      d. Continue dispatching messages from newly requested positions to
> consumers.
>
> The disadvantages here are that we need to alter the client to get to know
> a new command and act accordingly, yet I think that is accidental
> complexity stemming from the client-server architecture of bi-directional
> and not request response.
>
> Thanks,
>
> Asaf
>
> On Mon, Aug 1, 2022 at 6:43 AM Qiang Huang <qiang.huang1...@gmail.com>
> wrote:
>
> > Sure. You can refer to pip-84:
> >
> >
> https://github.com/apache/pulsar/wiki/PIP-84-:-Pulsar-client:-Redeliver-command-add-epoch
> > .
> >
> > Zike Yang <z...@apache.org> 于2022年7月29日周五 10:22写道:
> >
> > > Hi, Qiang
> > >
> > > > It is necessary to check the current cursor status when handling
> > > flowPermits
> > > > request from the server side. If the server is handling seek request,
> > it
> > > > should ignore flowPermits request because the request is illegal.
> > >
> > > Thanks for your explanation. I think it's better to add this
> > > explanation to the PIP.
> > >
> > > > The reconnected consumer can regard as a new consumer with new epoch.
> > >
> > > The consumer will reconnect to the broker during the seek operation.
> > > And this will change the existing behavior. It doesn't seem to make
> > > sense. Please correct me if I have misunderstood.
> > >
> > > Thanks,
> > > Zike Yang
> > >
> > > On Wed, Jul 27, 2022 at 8:06 PM Qiang Huang <qiang.huang1...@gmail.com
> >
> > > wrote:
> > > >
> > > > Thanks Zike.
> > > > > > - stage 1: Check the current cursor status when handling
> > flowPermits
> > > > from
> > > > > > the server side.
> > > >
> > > > > > Could you explain more details on this step? It looks like there
> is
> > > > not much described above. What kind of status needs to be checked,
> and
> > > > what kind of behavior will the broker take?
> > > > It is necessary to check the current cursor status when handling
> > > flowPermits
> > > > request from the server side. If the server is handling seek request,
> > it
> > > > should ignore flowPermits request because the request is illegal.
> > > >
> > > >
> > > > > > 1. Consumer reconnect need reset epoch.
> > > > >> Why do we need to reset the epoch when the consumer reconnects?
> > > > The reconnected consumer can regard as a new consumer with new epoch.
> > >
> >
> >
> > --
> > BR,
> > Qiang Huang
> >
>


-- 
BR,
Qiang Huang

Reply via email to