Hi Asaf, thank you for the very detailed reply. > The problem we have today is that while we have sent a request to reset the subscription position, the broker decides to: > 1. Close the TCP connection which in turn causes the client to clear any pending messages it has in the queue. > 2. Continue to send messages from the previous position, up to a certain point where the broker "shifts gear" and starts sending messages from the new position.
> Since Pulsar doesn't follow a request-response model but has a bi-directional protocol, the client can send a command to fetch messages using a new session > sequence number, while the server can still send messages using the old session number. Using the Session Sequence Number the client can't tell the > difference between the messages being pushed from the server to it. I totally agree with you. I am aware of something wrong in the pip when I re-read this part of the code. > # What are the issues with this PIP? > 1. The PIP decides to solve the problem listed above *only* for exclusive > and failover subscriptions where you have only a single consumer. The > problem still remains at large with Shared or Key Shared subscriptions. > 2. The cost of solving a small portion of the problem is high: > Added Complexity - Adding another field to the protocol, and another > thing to check. I believe we should aim to reduce the cognitive load of the > developers of Pulsar. > 3. There are no rejected solutions - We always need to examine all > available options and list why we decided against them. > 4. Lack of background knowledge (context) - it's super hard IMO to grasp > the idea without so much context missing: The client-server protocol > pertaining to this PIP, including its async nature, what is an epoch and > why it was introduced, what are flow permits. I'm not saying explain all > pulsar in this doc, but just include a brief explanation of that terminology. > > # What We Suggest > > Rethink the solution. > 1. The consumer (one of many) will send a seek command to the broker, and > at the same time clear its internal queue and wait for a response from the > broker. > 2. The broker upon receiving the seek command, will > a. Stop sending dispatching messages to consumers. > b. Notify all consumers via a command (new) that the subscription > position was asked to be reset. Consumers receiving this command will clear > their internal queue. The broker will no longer close the TCP connection > (with its adverse effects on other consumers and produces "riding" on that > connection) > c. Reset the cursor to the newly requested position. > d. Continue dispatching messages from newly requested positions to consumers. > Good suggestions. I'll look into these issues and rethink the solution. I will rewrite this pip according to your suggestions. Thanks again for your review. Asaf Mesika <asaf.mes...@gmail.com> 于2022年9月7日周三 23:12写道: > Hi Qiang, > > We have a brainstorming session on this PIP over Zoom with Penghui, Hang, > and more people, and I'm jotting down here our feedback. > > Before I do that, I just want to write my own understanding of the > document, for other readers: > > # Context > Pulsar, as opposed to other distributed / streaming systems, took the > approach of a push model. The client (consumer that is) asks for 1000 > messages (that's the consumer's remaining capacity in its internal queue) > from the broker (that process is named flow permits). The broker was now > given permission to send 1000 messages to the client, hence utilizing the > TCP connection to send those 1000 messages as they were ready to be sent. > > The consumer has the ability to request the subscription to reset its > position to the requested new position. > The problem we have today is that while we have sent a request to reset the > subscription position, the broker decides to: > 1. Close the TCP connection which in turn causes the client to clear any > pending messages it has in the queue. > 2. Continue to send messages from the previous position, up to a certain > point where the broker "shifts gear" and starts sending messages from the > new position. > > So the problem is that you would expect that after the connection was > reset, only messages from the new position will be sent to the consumer, > but that doesn't happen. > > We have to keep in mind, that we have effectively two scenarios here from > the point of view of the consumer: > 1. Single consumer - It can be due to using an Exclusive subscription, or > being a consumer attached to a single topic since the subscription is of > type Failover. > 2. Multiple consumers - In a Shared or Key Shared subscription types. In > this case, one of those consumers can decide to reset the position of the > *subscription*. When that happens, the broker decides, again, to reset all > existing TCP connections to all consumers upon receiving the seek command, > and you would expect any messages sent afterward to be from the new > position, which again doesn't happen. > > Another really important piece of information we need to bring to the > context of the reader here is the notion of an epoch. First, the epoch in > Pulsar PIPs was introduced in PIP-84. The idea is that every time the > client starts a "session" of requesting and receiving messages in response, > the client will send a Session Sequence Number, and the server responds to > those message requests with the same session sequence number. Since Pulsar > doesn't follow a request-response model but has a bi-directional protocol, > the client can send a command to fetch messages using a new session > sequence number, while the server can still send messages using the old > session number. Using the Session Sequence Number the client can't tell the > difference between the messages being pushed from the server to it. That > Session Sequence Number has the one referred to as Epoch in PIP-84 and also > in this PIP. > The idea was somehow to demarcate the responses coming from the server > based on the commands the client sends as they are *independent* (async). > > # What are the issues with this PIP? > 1. The PIP decides to solve the problem listed above *only* for exclusive > and failover subscriptions where you have only a single consumer. The > problem still remains at large with Shared or Key Shared subscriptions. > 2. The cost of solving a small portion of the problem is high: > Added Complexity - Adding another field to the protocol, and another > thing to check. I believe we should aim to reduce the cognitive load of the > developers of Pulsar. > 3. There are no rejected solutions - We always need to examine all > available options and list why we decided against them. > 4. Lack of background knowledge (context) - it's super hard IMO to grasp > the idea without so much context missing: The client-server protocol > pertaining to this PIP, including its async nature, what is an epoch and > why it was introduced, what are flow permits. I'm not saying explain all > pulsar in this doc, but just include a brief explanation of that > terminology. > > # What We Suggest > > Rethink the solution. > 1. The consumer (one of many) will send a seek command to the broker, and > at the same time clear its internal queue and wait for a response from the > broker. > 2. The broker upon receiving the seek command, will > a. Stop sending dispatching messages to consumers. > b. Notify all consumers via a command (new) that the subscription > position was asked to be reset. Consumers receiving this command will clear > their internal queue. The broker will no longer close the TCP connection > (with its adverse effects on other consumers and produces "riding" on that > connection) > c. Reset the cursor to the newly requested position. > d. Continue dispatching messages from newly requested positions to > consumers. > > The disadvantages here are that we need to alter the client to get to know > a new command and act accordingly, yet I think that is accidental > complexity stemming from the client-server architecture of bi-directional > and not request response. > > Thanks, > > Asaf > > On Mon, Aug 1, 2022 at 6:43 AM Qiang Huang <qiang.huang1...@gmail.com> > wrote: > > > Sure. You can refer to pip-84: > > > > > https://github.com/apache/pulsar/wiki/PIP-84-:-Pulsar-client:-Redeliver-command-add-epoch > > . > > > > Zike Yang <z...@apache.org> 于2022年7月29日周五 10:22写道: > > > > > Hi, Qiang > > > > > > > It is necessary to check the current cursor status when handling > > > flowPermits > > > > request from the server side. If the server is handling seek request, > > it > > > > should ignore flowPermits request because the request is illegal. > > > > > > Thanks for your explanation. I think it's better to add this > > > explanation to the PIP. > > > > > > > The reconnected consumer can regard as a new consumer with new epoch. > > > > > > The consumer will reconnect to the broker during the seek operation. > > > And this will change the existing behavior. It doesn't seem to make > > > sense. Please correct me if I have misunderstood. > > > > > > Thanks, > > > Zike Yang > > > > > > On Wed, Jul 27, 2022 at 8:06 PM Qiang Huang <qiang.huang1...@gmail.com > > > > > wrote: > > > > > > > > Thanks Zike. > > > > > > - stage 1: Check the current cursor status when handling > > flowPermits > > > > from > > > > > > the server side. > > > > > > > > > > Could you explain more details on this step? It looks like there > is > > > > not much described above. What kind of status needs to be checked, > and > > > > what kind of behavior will the broker take? > > > > It is necessary to check the current cursor status when handling > > > flowPermits > > > > request from the server side. If the server is handling seek request, > > it > > > > should ignore flowPermits request because the request is illegal. > > > > > > > > > > > > > > 1. Consumer reconnect need reset epoch. > > > > >> Why do we need to reset the epoch when the consumer reconnects? > > > > The reconnected consumer can regard as a new consumer with new epoch. > > > > > > > > > -- > > BR, > > Qiang Huang > > > -- BR, Qiang Huang