Any updates on this?
On 2022/09/11 04:18:07 Qiang Huang wrote: > Hi Asaf, thank you for the very detailed reply. > > > The problem we have today is that while we have sent a request to reset > the subscription position, the broker decides to: > > 1. Close the TCP connection which in turn causes the client to clear any > pending messages it has in the queue. > > 2. Continue to send messages from the previous position, up to a certain > point where the broker "shifts gear" and starts sending messages from the > new position. > > > Since Pulsar doesn't follow a request-response model but has a > bi-directional protocol, the client can send a command to fetch messages > using a new session > > sequence number, while the server can still send messages using the old > session number. Using the Session Sequence Number the client can't tell the > > difference between the messages being pushed from the server to it. > I totally agree with you. I am aware of something wrong in the pip when I > re-read this part of the code. > > > > # What are the issues with this PIP? > > 1. The PIP decides to solve the problem listed above *only* for exclusive > > and failover subscriptions where you have only a single consumer. The > > problem still remains at large with Shared or Key Shared subscriptions. > > 2. The cost of solving a small portion of the problem is high: > > Added Complexity - Adding another field to the protocol, and another > > thing to check. I believe we should aim to reduce the cognitive load of > the > > developers of Pulsar. > > 3. There are no rejected solutions - We always need to examine all > > available options and list why we decided against them. > > 4. Lack of background knowledge (context) - it's super hard IMO to grasp > > the idea without so much context missing: The client-server protocol > > pertaining to this PIP, including its async nature, what is an epoch and > > why it was introduced, what are flow permits. I'm not saying explain all > > pulsar in this doc, but just include a brief explanation of that > terminology. > > > > # What We Suggest > > > > Rethink the solution. > > 1. The consumer (one of many) will send a seek command to the broker, and > > at the same time clear its internal queue and wait for a response from the > > broker. > > 2. The broker upon receiving the seek command, will > > a. Stop sending dispatching messages to consumers. > > b. Notify all consumers via a command (new) that the subscription > > position was asked to be reset. Consumers receiving this command will > clear > > their internal queue. The broker will no longer close the TCP connection > > (with its adverse effects on other consumers and produces "riding" on that > > connection) > > c. Reset the cursor to the newly requested position. > > d. Continue dispatching messages from newly requested positions to > consumers. > > > > Good suggestions. I'll look into these issues and rethink the solution. I > will rewrite this pip according to your suggestions. > Thanks again for your review. > > Asaf Mesika <asaf.mes...@gmail.com> 于2022年9月7日周三 23:12写道: > > > Hi Qiang, > > > > We have a brainstorming session on this PIP over Zoom with Penghui, Hang, > > and more people, and I'm jotting down here our feedback. > > > > Before I do that, I just want to write my own understanding of the > > document, for other readers: > > > > # Context > > Pulsar, as opposed to other distributed / streaming systems, took the > > approach of a push model. The client (consumer that is) asks for 1000 > > messages (that's the consumer's remaining capacity in its internal queue) > > from the broker (that process is named flow permits). The broker was now > > given permission to send 1000 messages to the client, hence utilizing the > > TCP connection to send those 1000 messages as they were ready to be sent. > > > > The consumer has the ability to request the subscription to reset its > > position to the requested new position. > > The problem we have today is that while we have sent a request to reset the > > subscription position, the broker decides to: > > 1. Close the TCP connection which in turn causes the client to clear any > > pending messages it has in the queue. > > 2. Continue to send messages from the previous position, up to a certain > > point where the broker "shifts gear" and starts sending messages from the > > new position. > > > > So the problem is that you would expect that after the connection was > > reset, only messages from the new position will be sent to the consumer, > > but that doesn't happen. > > > > We have to keep in mind, that we have effectively two scenarios here from > > the point of view of the consumer: > > 1. Single consumer - It can be due to using an Exclusive subscription, or > > being a consumer attached to a single topic since the subscription is of > > type Failover. > > 2. Multiple consumers - In a Shared or Key Shared subscription types. In > > this case, one of those consumers can decide to reset the position of the > > *subscription*. When that happens, the broker decides, again, to reset all > > existing TCP connections to all consumers upon receiving the seek command, > > and you would expect any messages sent afterward to be from the new > > position, which again doesn't happen. > > > > Another really important piece of information we need to bring to the > > context of the reader here is the notion of an epoch. First, the epoch in > > Pulsar PIPs was introduced in PIP-84. The idea is that every time the > > client starts a "session" of requesting and receiving messages in response, > > the client will send a Session Sequence Number, and the server responds to > > those message requests with the same session sequence number. Since Pulsar > > doesn't follow a request-response model but has a bi-directional protocol, > > the client can send a command to fetch messages using a new session > > sequence number, while the server can still send messages using the old > > session number. Using the Session Sequence Number the client can't tell the > > difference between the messages being pushed from the server to it. That > > Session Sequence Number has the one referred to as Epoch in PIP-84 and also > > in this PIP. > > The idea was somehow to demarcate the responses coming from the server > > based on the commands the client sends as they are *independent* (async). > > > > # What are the issues with this PIP? > > 1. The PIP decides to solve the problem listed above *only* for exclusive > > and failover subscriptions where you have only a single consumer. The > > problem still remains at large with Shared or Key Shared subscriptions. > > 2. The cost of solving a small portion of the problem is high: > > Added Complexity - Adding another field to the protocol, and another > > thing to check. I believe we should aim to reduce the cognitive load of the > > developers of Pulsar. > > 3. There are no rejected solutions - We always need to examine all > > available options and list why we decided against them. > > 4. Lack of background knowledge (context) - it's super hard IMO to grasp > > the idea without so much context missing: The client-server protocol > > pertaining to this PIP, including its async nature, what is an epoch and > > why it was introduced, what are flow permits. I'm not saying explain all > > pulsar in this doc, but just include a brief explanation of that > > terminology. > > > > # What We Suggest > > > > Rethink the solution. > > 1. The consumer (one of many) will send a seek command to the broker, and > > at the same time clear its internal queue and wait for a response from the > > broker. > > 2. The broker upon receiving the seek command, will > > a. Stop sending dispatching messages to consumers. > > b. Notify all consumers via a command (new) that the subscription > > position was asked to be reset. Consumers receiving this command will clear > > their internal queue. The broker will no longer close the TCP connection > > (with its adverse effects on other consumers and produces "riding" on that > > connection) > > c. Reset the cursor to the newly requested position. > > d. Continue dispatching messages from newly requested positions to > > consumers. > > > > The disadvantages here are that we need to alter the client to get to know > > a new command and act accordingly, yet I think that is accidental > > complexity stemming from the client-server architecture of bi-directional > > and not request response. > > > > Thanks, > > > > Asaf > > > > On Mon, Aug 1, 2022 at 6:43 AM Qiang Huang <qiang.huang1...@gmail.com> > > wrote: > > > > > Sure. You can refer to pip-84: > > > > > > > > https://github.com/apache/pulsar/wiki/PIP-84-:-Pulsar-client:-Redeliver-command-add-epoch > > > . > > > > > > Zike Yang <z...@apache.org> 于2022年7月29日周五 10:22写道: > > > > > > > Hi, Qiang > > > > > > > > > It is necessary to check the current cursor status when handling > > > > flowPermits > > > > > request from the server side. If the server is handling seek request, > > > it > > > > > should ignore flowPermits request because the request is illegal. > > > > > > > > Thanks for your explanation. I think it's better to add this > > > > explanation to the PIP. > > > > > > > > > The reconnected consumer can regard as a new consumer with new epoch. > > > > > > > > The consumer will reconnect to the broker during the seek operation. > > > > And this will change the existing behavior. It doesn't seem to make > > > > sense. Please correct me if I have misunderstood. > > > > > > > > Thanks, > > > > Zike Yang > > > > > > > > On Wed, Jul 27, 2022 at 8:06 PM Qiang Huang <qiang.huang1...@gmail.com > > > > > > > wrote: > > > > > > > > > > Thanks Zike. > > > > > > > - stage 1: Check the current cursor status when handling > > > flowPermits > > > > > from > > > > > > > the server side. > > > > > > > > > > > > Could you explain more details on this step? It looks like there > > is > > > > > not much described above. What kind of status needs to be checked, > > and > > > > > what kind of behavior will the broker take? > > > > > It is necessary to check the current cursor status when handling > > > > flowPermits > > > > > request from the server side. If the server is handling seek request, > > > it > > > > > should ignore flowPermits request because the request is illegal. > > > > > > > > > > > > > > > > > 1. Consumer reconnect need reset epoch. > > > > > >> Why do we need to reset the epoch when the consumer reconnects? > > > > > The reconnected consumer can regard as a new consumer with new epoch. > > > > > > > > > > > > > -- > > > BR, > > > Qiang Huang > > > > > > > > -- > BR, > Qiang Huang >