Hello PengHui, On Mon, Jul 3, 2023 at 8:39 PM PengHui Li <peng...@apache.org> wrote:
> > Got it, for the Failover subscription, the new consumer caused the active > consumer > shift. I think we can make some improvements to this part to make sure the > new active > consumer will only get messages after the previous active consumer acked > all the received > message unless the previous active consumer disconnected. > > I think this will greatly help maintain the ordering guarantees per partition. > > If all the consumers with the highest priority are disconnected, then > the consumers with a lower priority will be peeked. The Shared subscription > have different behavior. It will select the lower priority consumer if all > highest > priority consumers don't have available permits. I think the challenge for > Failover subscription is the broker needs to shift the active consumer > according > to the available permits. But it could be considered in a different active > consumer assigner implementation like Kafka's consumer group coordinator, > you can have different policies. > > Right, in case of shared subs, the lower priority consumers are used more often since permits are considered and thus, slow consumers are detected quickly. In Failover, the current logic can lead to a single remaining active consumer consuming from all partitions, while multiple lower priority consumers are on standby. That single higher priority consumer may not be able to keep up with the topic throughput. We cannot also directly use the same behavior as Shared subscription here because that would again lead to out of order delivery of messages. I do not have a solution in mind here right now but I will come up with something so that the load balancing can be better, utilizing lower priority consumers as well. I was also thinking that from a user perspective, one would think that a lower priority consumer is meant for backup in case one active consumer goes down - which also doesn't work like that. Regards > Regards, > Penghui > > On Mon, Jul 3, 2023 at 7:52 PM Girish Sharma <scrapmachi...@gmail.com> > wrote: > > > Hello PengHui, > > Thank you for the reply. Adding comments inline below with a few > concerns. > > > > On Mon, Jul 3, 2023 at 4:38 PM PengHui Li <peng...@apache.org> wrote: > > > > > Hi Girish, > > > > > > Thanks for raising the discussion. > > > > > > I can confirm that your understanding is correct, and the document > > > is confusing. If there are four consumers connected to a partitioned > > topic > > > with two partitions, each partition will have four connected consumers > > but > > > only one active consumer. The document said two consumers are connected > > > to each partition is wrong. We will try to improve the document, and > your > > > contribution is welcome if you want to improve it. > > > > > > Yes, the part where it shows only 2 consumers are connected is > > misleading, > > but from information point of view, it is still okay to show only 2 in > the > > visualization, as one is active and other one is backup (next in line) > > > > The confusion comes where it tries to indicate that the active consumers > > are uniformly spread. i.e. in the example, consumers A and C are active > > while in reality, consumers A and B are active. > > Maybe there is scope of visualization improvement there. > > > > > > > > > For the consumer shift for the partition without active consumer > > failures. > > > I think it should be a load-balance consideration. Kafka has a consumer > > > group coordinator, which can balance traffic between consumers. But > > Pulsar > > > doesn't have. So Pulsar has to re-assign the active consumer when the > > > consumer > > > leaves, no matter whether the consumer is active or not. > > > > > > > From a code perspective, I do understand that it's tricky to ensure > minimal > > re-assignment. But this should be highlighted in the documentation as it > > has implications in terms of ordered consumption as described below. > > > > > > > Frankly, it's not the best policy for all the cases. IMO, Pulsar also > can > > > have different > > > policies for assigning active consumers for different requirements. Do > > you > > > have > > > a real case that the unnecessary consumer shift will impact? Which will > > > help us to > > > understand the value of introducing different policies. All I can think > > of > > > at the moment > > > are load balance (if the traffic of the partitions is far from each > > other) > > > and the duplicated > > > messages when switching the active consumer. > > > > > > > Right, so currently I do see these challenges: > > > > - Unlike KEY_SHARED, there is no logic to start sending data to newly > > assigned consumers *only after *the previous one acks to a certain > > checkpoint. > > - This in turn leads to chances of out of order consumption and > > duplicate consumption where the in queue messages of older > consumers > > may > > still be processed while the same messages are sent to new > consumers > > as > > well. > > - For any disconnected or newly added consumer, more than one > partition > > gets affected based on the index of the consumer which got removed. > > - What is the use of setting a consumer priority anything below the > > highest. The code seems to only consider the highest priority > consumers > > to > > spread active consumers, and ignore any consumer with priority set > > anything > > lower than the highest priority among the consumers. Which means those > > consumers would always sit idle until there is at least 1 consumer > with > > higher priority. Example, if ten consumers (consumer priority 1 > through > > 10) > > are connected to 10 partitions, all 10 partitions would only send data > > to > > just one of the consumers at any given time. > > > > Regards > > > > > > > > > Regards, > > > Penghui > > > > > > On Mon, Jul 3, 2023 at 2:49 PM Girish Sharma <scrapmachi...@gmail.com> > > > wrote: > > > > > > > Bumping this up. Would really like to discuss this in the community. > > > > > > > > Regards > > > > > > > > On Wed, Jun 28, 2023 at 11:49 PM Girish Sharma < > > scrapmachi...@gmail.com> > > > > wrote: > > > > > > > > > Hi everyone, I am trying to understand the failover subscription > > logic > > > a > > > > > bit more in detail. Specifically, the doc > > > > > <https://pulsar.apache.org/docs/3.0.x/concepts-messaging/#failover > > > > >mention > > > > > this part for partitioned topic: > > > > > > > > > > > > > > > > > > > > * If the number of partitions in a partitioned topic is less than > the > > > > > number of consumers:For example, in the diagram below, this > > partitioned > > > > > topic has 2 partitions and there are 4 consumers.Each partition > has 1 > > > > > active consumer and 1 stand-by consumer.* > > > > > > > > > > > > > > > - *For p0, consumer A is the master consumer, while consumer B > > would > > > > > be the next consumer in line to receive messages if consumer A > is > > > > > disconnected.* > > > > > - *For p1, consumer C is the master consumer, while consumer D > > would > > > > > be the next consumer in line to receive messages if consumer C > is > > > > > disconnected*. > > > > > > > > > > So, as per this, since all four (A,B,C,D) consumers make connection > > to > > > > > both partitions p0 and p1, the consumers array size in > > > > > AbstractDispatcherSingleActiveConsumer should be 4. Now based on > the > > > > > consumer index choosing logic spanning lines 126 - 130 > > > > > < > > > > > > > > > > https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java#L126-L130 > > > > > > > > > > , the consumer index assigned to p0 should be 0 (i.e. A) and to p1 > > > should > > > > > be 1 (i.e. B) . I am assuming here that all 4 consumers have the > same > > > > > priority.Now consider consumer B getting disconnected. remaining > > > consumer > > > > > array == (A,C,D) . In this case, p1 will get a new consumer using > > > logic 1 > > > > > % 3 = 1 index i.e. consumer C now. p0's consumer would remain same > > > i.e. 0 > > > > > % 3 = 0 i.e. A. > > > > > Now next consider that consumer A also goes down. remaining > consumer > > > > array > > > > > == (C,D) In this case, p0 will get a new consumer -> 0%2 = 0 i.e. > > > > > consumer C and p1 would now be shifted to 1%2 = 1 Consumer D . Even > > > > > though p1's active consumer was untouched, p1 got a consumer > > shift.So I > > > > > have couple of questions - > > > > > > > > > > - Am I missing something? Is my understanding of logic correct? > > > > > - If yes, why does the doc say what it says? And why change p1's > > > > > consumer uselessly in above example > > > > > > > > > > > > > > > Regards > > > > > -- > > > > > Girish Sharma > > > > > > > > > > > > > > > > > -- > > > > Girish Sharma > > > > > > > > > > > > > -- > > Girish Sharma > > > -- Girish Sharma